bsc/log/async_file_writer.go

261 lines
5.1 KiB
Go
Raw Permalink Normal View History

2020-05-20 11:47:42 +08:00
package log
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)
const backupTimeFormat = "2006-01-02_15"
type TimeTicker struct {
2020-05-20 11:47:42 +08:00
stop chan struct{}
C <-chan time.Time
}
// NewTimeTicker creates a TimeTicker that notifies based on rotateHours parameter.
// if rotateHours is 1 and current time is 11:32 it means that the ticker will tick at 12:00
// if rotateHours is 2 and current time is 09:12 means that the ticker will tick at 11:00
// specially, if rotateHours is 0, then no rotation
func NewTimeTicker(rotateHours uint) *TimeTicker {
ch := make(chan time.Time)
tt := TimeTicker{
2020-05-20 11:47:42 +08:00
stop: make(chan struct{}),
C: ch,
2020-05-20 11:47:42 +08:00
}
if rotateHours > 0 {
tt.startTicker(ch, rotateHours)
}
return &tt
2020-05-20 11:47:42 +08:00
}
func (tt *TimeTicker) Stop() {
tt.stop <- struct{}{}
2020-05-20 11:47:42 +08:00
}
func (tt *TimeTicker) startTicker(ch chan time.Time, rotateHours uint) {
2020-05-20 11:47:42 +08:00
go func() {
nextRotationHour := getNextRotationHour(time.Now(), rotateHours)
2020-05-20 11:47:42 +08:00
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
if t.Hour() == nextRotationHour {
2020-05-20 11:47:42 +08:00
ch <- t
nextRotationHour = getNextRotationHour(time.Now(), rotateHours)
2020-05-20 11:47:42 +08:00
}
case <-tt.stop:
2020-05-20 11:47:42 +08:00
return
}
}
}()
}
func getNextRotationHour(now time.Time, delta uint) int {
return now.Add(time.Hour * time.Duration(delta)).Hour()
2020-05-20 11:47:42 +08:00
}
type AsyncFileWriter struct {
filePath string
fd *os.File
wg sync.WaitGroup
started int32
buf chan []byte
stop chan struct{}
timeTicker *TimeTicker
rotateHours uint
maxBackups int
2020-05-20 11:47:42 +08:00
}
func NewAsyncFileWriter(filePath string, maxBytesSize int64, maxBackups int, rotateHours uint) *AsyncFileWriter {
2020-05-20 11:47:42 +08:00
absFilePath, err := filepath.Abs(filePath)
if err != nil {
panic(fmt.Sprintf("get file path of logger error. filePath=%s, err=%s", filePath, err))
}
return &AsyncFileWriter{
filePath: absFilePath,
buf: make(chan []byte, maxBytesSize),
stop: make(chan struct{}),
rotateHours: rotateHours,
maxBackups: maxBackups,
timeTicker: NewTimeTicker(rotateHours),
2020-05-20 11:47:42 +08:00
}
}
func (w *AsyncFileWriter) initLogFile() error {
var (
fd *os.File
err error
)
realFilePath := w.timeFilePath(w.filePath)
fd, err = os.OpenFile(realFilePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return err
}
w.fd = fd
_, err = os.Lstat(w.filePath)
if err == nil || os.IsExist(err) {
err = os.Remove(w.filePath)
if err != nil {
return err
}
}
err = os.Symlink(realFilePath, w.filePath)
if err != nil {
return err
}
return nil
}
func (w *AsyncFileWriter) Start() error {
if !atomic.CompareAndSwapInt32(&w.started, 0, 1) {
return errors.New("logger has already been started")
}
err := w.initLogFile()
if err != nil {
return err
}
w.wg.Add(1)
go func() {
defer func() {
atomic.StoreInt32(&w.started, 0)
w.flushBuffer()
w.flushAndClose()
w.wg.Done()
}()
for {
select {
case msg, ok := <-w.buf:
if !ok {
fmt.Fprintln(os.Stderr, "buf channel has been closed.")
return
}
w.SyncWrite(msg)
case <-w.stop:
return
}
}
}()
return nil
}
func (w *AsyncFileWriter) flushBuffer() {
for {
select {
case msg := <-w.buf:
w.SyncWrite(msg)
default:
return
}
}
}
func (w *AsyncFileWriter) SyncWrite(msg []byte) {
w.rotateFile()
if w.fd != nil {
w.fd.Write(msg)
}
}
func (w *AsyncFileWriter) rotateFile() {
select {
case <-w.timeTicker.C:
2020-05-20 11:47:42 +08:00
if err := w.flushAndClose(); err != nil {
fmt.Fprintf(os.Stderr, "flush and close file error. err=%s", err)
}
if err := w.initLogFile(); err != nil {
fmt.Fprintf(os.Stderr, "init log file error. err=%s", err)
}
if err := w.removeExpiredFile(); err != nil {
fmt.Fprintf(os.Stderr, "remove expired file error. err=%s", err)
}
2020-05-20 11:47:42 +08:00
default:
}
}
func (w *AsyncFileWriter) Stop() {
w.stop <- struct{}{}
w.wg.Wait()
w.timeTicker.Stop()
2020-05-20 11:47:42 +08:00
}
func (w *AsyncFileWriter) Write(msg []byte) (n int, err error) {
// TODO(wuzhenxing): for the underlying array may change, is there a better way to avoid copying slice?
buf := make([]byte, len(msg))
copy(buf, msg)
select {
case w.buf <- buf:
default:
}
return 0, nil
}
func (w *AsyncFileWriter) Flush() error {
if w.fd == nil {
return nil
}
return w.fd.Sync()
}
func (w *AsyncFileWriter) flushAndClose() error {
if w.fd == nil {
return nil
}
err := w.fd.Sync()
if err != nil {
return err
}
return w.fd.Close()
}
func (w *AsyncFileWriter) timeFilePath(filePath string) string {
return filePath + "." + time.Now().Format(backupTimeFormat)
}
func (w *AsyncFileWriter) getExpiredFile(filePath string, maxBackups int, rotateHours uint) string {
if rotateHours > 0 {
maxBackups = int(rotateHours) * maxBackups
}
return filePath + "." + time.Now().Add(-time.Hour*time.Duration(maxBackups)).Format(backupTimeFormat)
}
func (w *AsyncFileWriter) removeExpiredFile() error {
if w.maxBackups == 0 {
return nil
}
oldFilepath := w.getExpiredFile(w.filePath, w.maxBackups, w.rotateHours)
_, err := os.Stat(oldFilepath)
if os.IsNotExist(err) {
return nil
}
errRemove := os.Remove(oldFilepath)
if err != nil {
return errRemove
}
return err
2020-05-20 11:47:42 +08:00
}