Beego Logs 源码分析 中篇

柯里沙漏
• 阅读 3065

文件输出引擎使用到的读写锁 sync.RWMutex

读写锁是一种同步机制,允许多个读操作同时读取数据,但是只允许一个写操作写数据。锁的状态有三种:读模式加锁、写模式加锁、无锁。
  • 无锁。读/写进程都可以进入。
  • 读模式锁。读进程可以进入。写进程不可以进入。
  • 写模式锁。读/写进程都不可以进入。

就拿文件行数这个变量来看,如果开启了日志文件按小时按行数切割的功能,要先读取当前文件行数变量值。当并发情况下,多个 goroutine 在打日志,读取文件行数和修改文件行数便成为一对“读写”操作,所以需要用读写锁,读写锁对于读操作不会导致锁竞争和 goroutine 阻塞。


// WriteMsg write logger message into file.
func (w *fileLogWriter) WriteMsg(when time.Time, msg string, level int) error {
    ···
    if w.Rotate {
        w.RLock()
        if w.needRotateHourly(len(msg), h) {
            w.RUnlock()
            w.Lock()
            if w.needRotateHourly(len(msg), h) {
                if err := w.doRotate(when); err != nil {
                    fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
                }
            }
            w.Unlock()
        } else if w.needRotateDaily(len(msg), d) {
            w.RUnlock()
            w.Lock()
            if w.needRotateDaily(len(msg), d) {
                if err := w.doRotate(when); err != nil {
                    fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
                }
            }
            w.Unlock()
        } else {
            w.RUnlock()
        }
    }

    w.Lock()
    _, err := w.fileWriter.Write([]byte(msg))
    if err == nil {
        w.maxLinesCurLines++
        w.maxSizeCurSize += len(msg)
    }
    w.Unlock()
    ···
}

总结下 Goroutine 的使用

监听 msgChan

第一处是开启异步选项时,启动一个 goroutine 监听 msgChan 是否为空,发现不为空便取走日志信息进行输出。

// Async set the log to asynchronous and start the goroutine
func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger {
    ···
    go bl.startLogger()
    ···
}

// start logger chan reading.
// when chan is not empty, write logs.
func (bl *BeeLogger) startLogger() {
    gameOver := false
    for {
        select {
        case bm := <-bl.msgChan:
            bl.writeToLoggers(bm.when, bm.msg, bm.level)
            logMsgPool.Put(bm)
        ···
        }
        ···
    }
}

监听计时器实现日志文件按日期分割

文件输出引擎 file.go 文件中,初始化 fileWriter *os.File 时启动一个 goroutine 执行 dailyRotate() :

func (w *fileLogWriter) initFd() error {
    fd := w.fileWriter
    fInfo, err := fd.Stat()
    if err != nil {
        return fmt.Errorf("get stat err: %s", err)
    }
    w.maxSizeCurSize = int(fInfo.Size())
    w.dailyOpenTime = time.Now()
    w.dailyOpenDate = w.dailyOpenTime.Day()
    w.maxLinesCurLines = 0
    if w.Daily {
        go w.dailyRotate(w.dailyOpenTime) //   <------
    }
    if fInfo.Size() > 0 && w.MaxLines > 0 {
        count, err := w.lines()
        if err != nil {
            return err
        }
        w.maxLinesCurLines = count
    }
    return nil
}

dailyRotate() 方法中,tm 定时器时间一到,便会往 tm.C 通道发送当前时间,此时 a 语句便停止阻塞,可以继续往下执行。

func (w *fileLogWriter) dailyRotate(openTime time.Time) {
    y, m, d := openTime.Add(24 * time.Hour).Date()
    nextDay := time.Date(y, m, d, 0, 0, 0, 0, openTime.Location())
    tm := time.NewTimer(time.Duration(nextDay.UnixNano() - openTime.UnixNano() + 100))
    <-tm.C // <--- a 语句
    w.Lock()
    if w.needRotate(0, time.Now().Day()) {
        if err := w.doRotate(time.Now()); err != nil {
            fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
        }
    }
    w.Unlock()
}

开启新的 goroutine 删除失效的日志文件

因为删除文件涉及文件 IO 处理,为了避免阻塞主线程,便交由另外 goroutine 去做。,go w.deleteOldLog(),超过 MaxDays 的日志文件便是失效的。

// DoRotate means it need to write file in new file.
// new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size)
func (w *fileLogWriter) doRotate(logTime time.Time) error {
    ···
    err = os.Rename(w.Filename, fName)
    ···
    startLoggerErr := w.startLogger()
    go w.deleteOldLog()
    ···
}

func (w *fileLogWriter) deleteOldLog() {
    dir := filepath.Dir(w.Filename)
    filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
        defer func() {
            if r := recover(); r != nil {
                fmt.Fprintf(os.Stderr, "Unable to delete old log '%s', error: %v\n", path, r)
            }
        }()

        if info == nil {
            return
        }

        if !info.IsDir() && info.ModTime().Add(24*time.Hour*time.Duration(w.MaxDays)).Before(time.Now()) {
            if strings.HasPrefix(filepath.Base(path), filepath.Base(w.fileNameOnly)) &&
                strings.HasSuffix(filepath.Base(path), w.suffix) {
                os.Remove(path)
            }
        }
        return
    })
}

使用 goto 语句保证即使发生错误也要重启 Logger

doRotate() 方法大体逻辑:

  • 重命名之前写入的日志文件,err = os.Rename(w.Filename, fName)

    • 首先找到 一个可用的 filename ,循环遍历1-999,如果找不到报错;
    • _,err:=os.Lstat(fName) :若以 fName 为名的文件不存在则返回 err 不为空。
    • os.Chmod(fName, os.FileMode(rotatePerm)) 修改文件权限。
  • 重新启动 Logger :

    • 一是启动 Logger ,w.startLogger()
    • 二是开启一个 goroutine 删除失效的日志文件。

注意到下面代码段中的 a 语句和 b 语句,它们并不是返回错误阻止代码继续执行,而是即使发生错误也会保证重启一个新的 Logger。如果是执行到 a 语句这种情况,有可能是该日志文件已经被别的程序删除或者其他原因导致文件不存在,但大可不必因为一个日志文件的丢失而阻止了新 Logger 的启动,简而言之,这个错误是可以忽略的。


// DoRotate means it need to write file in new file.
// new file name like xx.2013-01-01.log (daily) or xx.001.log (by line or size)
func (w *fileLogWriter) doRotate(logTime time.Time) error {
    // file exists
    // Find the next available number
    num := 1
    fName := ""
    rotatePerm, err := strconv.ParseInt(w.RotatePerm, 8, 64)
    if err != nil {
        return err
    }

    _, err = os.Lstat(w.Filename)
    if err != nil {
        //even if the file is not exist or other ,we should RESTART the logger
        goto RESTART_LOGGER // <------- a 语句
    }

    if w.MaxLines > 0 || w.MaxSize > 0 {
        for ; err == nil && num <= 999; num++ {
            fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", 
                    logTime.Format("2006-01-02"), num, w.suffix)
            _, err = os.Lstat(fName)
        }
    } else {
        fName = fmt.Sprintf("%s.%s%s", w.fileNameOnly, 
                w.dailyOpenTime.Format("2006-01-02"), w.suffix)
        _, err = os.Lstat(fName)
        for ; err == nil && num <= 999; num++ {
            fName = w.fileNameOnly + fmt.Sprintf(".%s.%03d%s", 
                    w.dailyOpenTime.Format("2006-01-02"), num, w.suffix)
            _, err = os.Lstat(fName)
        }
    }
    // return error if the last file checked still existed
    if err == nil { 
        return fmt.Errorf(
        "Rotate: Cannot find free log number to rename %s", w.Filename)
    }

    // close fileWriter before rename
    w.fileWriter.Close()

    // Rename the file to its new found name
    // even if occurs error,we MUST guarantee to  restart new logger
    err = os.Rename(w.Filename, fName)
    if err != nil {
        goto RESTART_LOGGER  // <------- b 语句 
    }

    err = os.Chmod(fName, os.FileMode(rotatePerm))

RESTART_LOGGER:   // <-------

    startLoggerErr := w.startLogger()
    go w.deleteOldLog()

    if startLoggerErr != nil {
        return fmt.Errorf("Rotate StartLogger: %s", startLoggerErr)
    }
    if err != nil {
        return fmt.Errorf("Rotate: %s", err)
    }
    return nil
}

涉及到 sync.WaitGroup 的使用

a 语句处,开启 goroutine 前计数器加一,执行完该 goroutine 后计数器减一,即 b 语句。

// Async set the log to asynchronous and start the goroutine
func (bl *BeeLogger) Async(msgLen ...int64) *BeeLogger {
    ···
    bl.wg.Add(1) // <----- a 语句
    go bl.startLogger()
    return bl
}

// start logger chan reading.
// when chan is not empty, write logs.
func (bl *BeeLogger) startLogger() {
    gameOver := false
    for {
        select {
        case bm := <-bl.msgChan:
            bl.writeToLoggers(bm.when, bm.msg, bm.level)
            logMsgPool.Put(bm)
        case sg := <-bl.signalChan:
            // Now should only send "flush" or "close" to bl.signalChan
            bl.flush()
            if sg == "close" {
                for _, l := range bl.outputs {
                    l.Destroy()
                }
                bl.outputs = nil
                gameOver = true
            }
            bl.wg.Done()  // <------ b 语句
        }
        if gameOver {
            break
        }
    }
}

分析并发执行下面 Flush() 方法的情况。假设有 A , B , C 三个 goroutine,并且假设 A 先执行到 e 语句,从
a 语句知道初始计数器为 1 ,所以 e 语句必须等到上述 startLogger-goroutine 执行 b 语句完毕后才停止阻塞。而后 A 再让计数器加一。因为 bl.signalChan 的缓存大小为1,所以 B,C 阻塞在 d 语句,等到 B,C 其中之一能执行 e 语句的时候计数器必然大于0,才不会导致永久阻塞。所以 f 语句要放在 e 语句之后。


// Flush flush all chan data.
func (bl *BeeLogger) Flush() {
    if bl.asynchronous {
        bl.signalChan <- "flush"  // <------ d 语句
        bl.wg.Wait()  // <------ e 语句
        bl.wg.Add(1)  // <------ f 语句
        return
    }
    bl.flush()
}

因此再看下面的 Close() 方法,它是不能并发执行的,会导致 "panic: close of closed channel"错误。不过笔者暂时没懂为什么 beego logs 不把这里做一下改进,让 Close() 也支持并发调用不好吗?

// Close close logger, flush all chan data and destroy all adapters in BeeLogger.
func (bl *BeeLogger) Close() {
    if bl.asynchronous {
        bl.signalChan <- "close"
        bl.wg.Wait()   // <------ g 语句
        close(bl.msgChan)
    } else {
        bl.flush()
        for _, l := range bl.outputs {
            l.Destroy()
        }
        bl.outputs = nil
    }
    close(bl.signalChan)
}
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
4年前
Java并发系列4
今天讲另一个并发工具,叫读写锁。读写锁是一种分离锁,是锁应用中的一种优化手段。考虑读多写少的情况,这时如果我们用synchronized或ReentrantLock直接修饰读/写方法未尝不可,如:publicstaticclassRw{privateintval;publicsynchr
Wesley13 Wesley13
4年前
mysql 锁
第一章概述锁的分类:从对数据操作的粒度分表锁、行锁。从对数据的操作类型(读\\写)分读锁(共享锁)、写锁(排它锁)读锁(共享锁):针对同一份数据,多个读操作可以同时进行而不会互相影响。写锁(排它锁):当前写操作没完成前,它会阻断其他写锁和读锁。第二章 表锁(偏读)偏向MyISAM存储引擎,开销小,加
Wesley13 Wesley13
4年前
JAVA中 ReentrantReadWriteLock读写锁详系教程,包会
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Wesley13 Wesley13
4年前
Java中的读写锁ReadWriteLock
ReadWriteLock是JDK中的读写锁接口ReentrantReadWriteLock是ReadWriteLock的一种实现读写锁非常适合读多写少的场景。读写锁与互斥锁的一个重要区别是读写锁允许多个线程同时读共享变量,这是读写锁在读多写少的情况下性能较高的原因。读写锁的原则:多个线程可同时读共享变量只允许一
Wesley13 Wesley13
4年前
MySQL锁
<br1\.表锁表锁分为写锁,读锁,二者读读不阻塞,读写阻塞,写写阻塞<br<br2\.行锁行锁分为共享锁,排他锁,即读锁和写锁多粒度锁机制自动实现表、行锁共存,InnoDB内部有意向表锁意向共享锁(IS):事务在给一个数据行加共享锁前必须先取得该表的IS锁。
Stella981 Stella981
4年前
ReentrantReadWriteLock(读写锁)
ReentrantReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效的帮助减少锁的竞争,以此来提升系统的性能。用锁分离的机制来提升性能也非常好理解,比如线程A,B,C进行写操作,D,E,F进行读操作,如果使用ReentrantLock或者synchronized关键字,这些线程都是串行执行的,即每次都只有一个线程做操作。但是当D进行读
Stella981 Stella981
4年前
ReentrantReadWriteLock实现原理
  在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁Reentra
Wesley13 Wesley13
4年前
Java并发编程:Java中的锁和线程同步机制
锁的基础知识锁的类型锁从宏观上分类,只分为两种:悲观锁与乐观锁。乐观锁乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一
Stella981 Stella981
4年前
ReentrantReadWriteLock读写锁详解
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Wesley13 Wesley13
4年前
Java并发编程原理与实战十八:读写锁
ReadWriteLock也是一个接口,提供了readLock和writeLock两种锁的操作机制,一个资源可以被多个线程同时读,或者被一个线程写,但是不能同时存在读和写线程。基本规则:读读不互斥读写互斥写写互斥问题:既然读读不互斥,为何还要加读锁答:如果只是读,是不需要加锁的,加锁本身就有性能上的损耗如果读可以不是最新数据
Stella981 Stella981
4年前
PostgreSQL 使用advisory lock实现行级读写堵塞
背景PostgreSQL的读写是不冲突的,这听起来是件好事对吧,读和写相互不干扰,可以数据库提高读写并发能力。但是有些时候,用户也许想让读写冲突(需求:数据正在被更新或者删除时,不允许被读取)。那么有方法能实现读写冲突吗?PostgreSQL提供了一种锁advisorylock,可以实现读写堵塞的功能。使用advisoryloc
柯里沙漏
柯里沙漏
Lv1
门有车马客,驾言发故乡。
文章
4
粉丝
0
获赞
0