Browse Source

Extract rotate file writer from json log driver

Signed-off-by: Daehyeok Mun <daehyeok@gmail.com>
daehyeok mun 9 years ago
parent
commit
086c0b4a66

+ 23 - 97
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -7,14 +7,13 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"os"
 	"strconv"
 	"strconv"
 	"sync"
 	"sync"
 
 
 	"github.com/Sirupsen/logrus"
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/daemon/logger/loggerutils"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/jsonlog"
-	"github.com/docker/docker/pkg/pubsub"
 	"github.com/docker/docker/pkg/timeutils"
 	"github.com/docker/docker/pkg/timeutils"
 	"github.com/docker/docker/pkg/units"
 	"github.com/docker/docker/pkg/units"
 )
 )
@@ -24,15 +23,12 @@ const Name = "json-file"
 
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
 type JSONFileLogger struct {
-	buf          *bytes.Buffer
-	f            *os.File   // store for closing
-	mu           sync.Mutex // protects buffer
-	capacity     int64      //maximum size of each file
-	n            int        //maximum number of files
-	ctx          logger.Context
-	readers      map[*logger.LogWatcher]struct{} // stores the active log followers
-	notifyRotate *pubsub.Publisher
-	extra        []byte // json-encoded extra attributes
+	buf     *bytes.Buffer
+	writer  *loggerutils.RotateFileWriter
+	mu      sync.Mutex
+	ctx     logger.Context
+	readers map[*logger.LogWatcher]struct{} // stores the active log followers
+	extra   []byte                          // json-encoded extra attributes
 }
 }
 
 
 func init() {
 func init() {
@@ -47,10 +43,6 @@ func init() {
 // New creates new JSONFileLogger which writes to filename passed in
 // New creates new JSONFileLogger which writes to filename passed in
 // on given context.
 // on given context.
 func New(ctx logger.Context) (logger.Logger, error) {
 func New(ctx logger.Context) (logger.Logger, error) {
-	log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
-	if err != nil {
-		return nil, err
-	}
 	var capval int64 = -1
 	var capval int64 = -1
 	if capacity, ok := ctx.Config["max-size"]; ok {
 	if capacity, ok := ctx.Config["max-size"]; ok {
 		var err error
 		var err error
@@ -61,6 +53,7 @@ func New(ctx logger.Context) (logger.Logger, error) {
 	}
 	}
 	var maxFiles = 1
 	var maxFiles = 1
 	if maxFileString, ok := ctx.Config["max-file"]; ok {
 	if maxFileString, ok := ctx.Config["max-file"]; ok {
+		var err error
 		maxFiles, err = strconv.Atoi(maxFileString)
 		maxFiles, err = strconv.Atoi(maxFileString)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
@@ -70,6 +63,11 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		}
 		}
 	}
 	}
 
 
+	writer, err := loggerutils.NewRotateFileWriter(ctx.LogPath, capval, maxFiles)
+	if err != nil {
+		return nil, err
+	}
+
 	var extra []byte
 	var extra []byte
 	if attrs := ctx.ExtraAttributes(nil); len(attrs) > 0 {
 	if attrs := ctx.ExtraAttributes(nil); len(attrs) > 0 {
 		var err error
 		var err error
@@ -80,21 +78,15 @@ func New(ctx logger.Context) (logger.Logger, error) {
 	}
 	}
 
 
 	return &JSONFileLogger{
 	return &JSONFileLogger{
-		f:            log,
-		buf:          bytes.NewBuffer(nil),
-		ctx:          ctx,
-		capacity:     capval,
-		n:            maxFiles,
-		readers:      make(map[*logger.LogWatcher]struct{}),
-		notifyRotate: pubsub.NewPublisher(0, 1),
-		extra:        extra,
+		buf:     bytes.NewBuffer(nil),
+		writer:  writer,
+		readers: make(map[*logger.LogWatcher]struct{}),
+		extra:   extra,
 	}, nil
 	}, nil
 }
 }
 
 
 // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
 // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
 func (l *JSONFileLogger) Log(msg *logger.Message) error {
 func (l *JSONFileLogger) Log(msg *logger.Message) error {
-	l.mu.Lock()
-	defer l.mu.Unlock()
 
 
 	timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
 	timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
 	if err != nil {
 	if err != nil {
@@ -109,78 +101,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	l.buf.WriteByte('\n')
-	_, err = writeLog(l)
-	return err
-}
 
 
-func writeLog(l *JSONFileLogger) (int64, error) {
-	if l.capacity == -1 {
-		return writeToBuf(l)
-	}
-	meta, err := l.f.Stat()
-	if err != nil {
-		return -1, err
-	}
-	if meta.Size() >= l.capacity {
-		name := l.f.Name()
-		if err := l.f.Close(); err != nil {
-			return -1, err
-		}
-		if err := rotate(name, l.n); err != nil {
-			return -1, err
-		}
-		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666)
-		if err != nil {
-			return -1, err
-		}
-		l.f = file
-		l.notifyRotate.Publish(struct{}{})
-	}
-	return writeToBuf(l)
-}
-
-func writeToBuf(l *JSONFileLogger) (int64, error) {
-	i, err := l.buf.WriteTo(l.f)
-	if err != nil {
-		l.buf = bytes.NewBuffer(nil)
-	}
-	return i, err
-}
-
-func rotate(name string, n int) error {
-	if n < 2 {
-		return nil
-	}
-	for i := n - 1; i > 1; i-- {
-		oldFile := name + "." + strconv.Itoa(i)
-		replacingFile := name + "." + strconv.Itoa(i-1)
-		if err := backup(oldFile, replacingFile); err != nil {
-			return err
-		}
-	}
-	if err := backup(name+".1", name); err != nil {
-		return err
-	}
-	return nil
-}
+	l.buf.WriteByte('\n')
+	_, err = l.writer.Write(l.buf.Bytes())
+	l.buf.Reset()
 
 
-// backup renames a file from curr to old, creating an empty file curr if it does not exist.
-func backup(old, curr string) error {
-	if _, err := os.Stat(old); !os.IsNotExist(err) {
-		err := os.Remove(old)
-		if err != nil {
-			return err
-		}
-	}
-	if _, err := os.Stat(curr); os.IsNotExist(err) {
-		f, err := os.Create(curr)
-		if err != nil {
-			return err
-		}
-		f.Close()
-	}
-	return os.Rename(curr, old)
+	return err
 }
 }
 
 
 // ValidateLogOpt looks for json specific log options max-file & max-size.
 // ValidateLogOpt looks for json specific log options max-file & max-size.
@@ -200,13 +126,13 @@ func ValidateLogOpt(cfg map[string]string) error {
 
 
 // LogPath returns the location the given json logger logs to.
 // LogPath returns the location the given json logger logs to.
 func (l *JSONFileLogger) LogPath() string {
 func (l *JSONFileLogger) LogPath() string {
-	return l.ctx.LogPath
+	return l.writer.LogPath()
 }
 }
 
 
 // Close closes underlying file and signals all readers to stop.
 // Close closes underlying file and signals all readers to stop.
 func (l *JSONFileLogger) Close() error {
 func (l *JSONFileLogger) Close() error {
 	l.mu.Lock()
 	l.mu.Lock()
-	err := l.f.Close()
+	err := l.writer.Close()
 	for r := range l.readers {
 	for r := range l.readers {
 		r.Close()
 		r.Close()
 		delete(l.readers, r)
 		delete(l.readers, r)

+ 4 - 4
daemon/logger/jsonfilelog/read.go

@@ -43,9 +43,9 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
 func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
 	defer close(logWatcher.Msg)
 	defer close(logWatcher.Msg)
 
 
-	pth := l.ctx.LogPath
+	pth := l.writer.LogPath()
 	var files []io.ReadSeeker
 	var files []io.ReadSeeker
-	for i := l.n; i > 1; i-- {
+	for i := l.writer.MaxFiles(); i > 1; i-- {
 		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
 		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
 		if err != nil {
 		if err != nil {
 			if !os.IsNotExist(err) {
 			if !os.IsNotExist(err) {
@@ -84,14 +84,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 	l.readers[logWatcher] = struct{}{}
 	l.readers[logWatcher] = struct{}{}
 	l.mu.Unlock()
 	l.mu.Unlock()
 
 
-	notifyRotate := l.notifyRotate.Subscribe()
+	notifyRotate := l.writer.NotifyRotate()
 	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
 	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
 
 
 	l.mu.Lock()
 	l.mu.Lock()
 	delete(l.readers, logWatcher)
 	delete(l.readers, logWatcher)
 	l.mu.Unlock()
 	l.mu.Unlock()
 
 
-	l.notifyRotate.Evict(notifyRotate)
+	l.writer.NotifyRotateEvict(notifyRotate)
 }
 }
 
 
 func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
 func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {

+ 132 - 0
daemon/logger/loggerutils/rotatefilewriter.go

@@ -0,0 +1,132 @@
+package loggerutils
+
+import (
+	"os"
+	"strconv"
+	"sync"
+
+	"github.com/docker/docker/pkg/pubsub"
+)
+
+// RotateFileWriter is Logger implementation for default Docker logging.
+type RotateFileWriter struct {
+	f            *os.File // store for closing
+	mu           sync.Mutex
+	capacity     int64 //maximum size of each file
+	maxFiles     int   //maximum number of files
+	notifyRotate *pubsub.Publisher
+}
+
+//NewRotateFileWriter creates new RotateFileWriter
+func NewRotateFileWriter(logPath string, capacity int64, maxFiles int) (*RotateFileWriter, error) {
+	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
+	if err != nil {
+		return &RotateFileWriter{}, err
+	}
+
+	return &RotateFileWriter{
+		f:            log,
+		capacity:     capacity,
+		maxFiles:     maxFiles,
+		notifyRotate: pubsub.NewPublisher(0, 1),
+	}, nil
+}
+
+//WriteLog write log messge to File
+func (w *RotateFileWriter) Write(message []byte) (int, error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if err := w.checkCapacityAndRotate(); err != nil {
+		return -1, err
+	}
+
+	return w.f.Write(message)
+}
+
+func (w *RotateFileWriter) checkCapacityAndRotate() error {
+	if w.capacity == -1 {
+		return nil
+	}
+
+	meta, err := w.f.Stat()
+	if err != nil {
+		return err
+	}
+
+	if meta.Size() >= w.capacity {
+		name := w.f.Name()
+		if err := w.f.Close(); err != nil {
+			return err
+		}
+		if err := rotate(name, w.maxFiles); err != nil {
+			return err
+		}
+		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 06400)
+		if err != nil {
+			return err
+		}
+		w.f = file
+		w.notifyRotate.Publish(struct{}{})
+	}
+
+	return nil
+}
+
+func rotate(name string, maxFiles int) error {
+	if maxFiles < 2 {
+		return nil
+	}
+	for i := maxFiles - 1; i > 1; i-- {
+		toPath := name + "." + strconv.Itoa(i)
+		fromPath := name + "." + strconv.Itoa(i-1)
+		if err := backup(fromPath, toPath); err != nil && !os.IsNotExist(err) {
+			return err
+		}
+	}
+
+	if err := backup(name, name+".1"); err != nil {
+		return err
+	}
+	return nil
+}
+
+// backup renames a file from fromPath to toPath
+func backup(fromPath, toPath string) error {
+	if _, err := os.Stat(fromPath); os.IsNotExist(err) {
+		return err
+	}
+
+	if _, err := os.Stat(toPath); !os.IsNotExist(err) {
+		err := os.Remove(toPath)
+		if err != nil {
+			return err
+		}
+	}
+
+	return os.Rename(fromPath, toPath)
+}
+
+// LogPath returns the location the given wirter logs to.
+func (w *RotateFileWriter) LogPath() string {
+	return w.f.Name()
+}
+
+// MaxFiles return maximum number of files
+func (w *RotateFileWriter) MaxFiles() int {
+	return w.maxFiles
+}
+
+//NotifyRotate returns the new subscriber
+func (w *RotateFileWriter) NotifyRotate() chan interface{} {
+	return w.notifyRotate.Subscribe()
+}
+
+//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
+func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
+	w.notifyRotate.Evict(sub)
+}
+
+// Close closes underlying file and signals all readers to stop.
+func (w *RotateFileWriter) Close() error {
+	return w.f.Close()
+}