ソースを参照

Merge pull request #19498 from mountkin/refactor-jsonfilelog-reader

use pubsub instead of filenotify to follow json logs
Alexander Morozov 9 年 前
コミット
8545fdc6dd

+ 15 - 10
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -14,6 +14,7 @@ import (
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/loggerutils"
 	"github.com/docker/docker/pkg/jsonlog"
+	"github.com/docker/docker/pkg/pubsub"
 	"github.com/docker/go-units"
 )
 
@@ -22,12 +23,13 @@ const Name = "json-file"
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
-	buf     *bytes.Buffer
-	writer  *loggerutils.RotateFileWriter
-	mu      sync.Mutex
-	ctx     logger.Context
-	readers map[*logger.LogWatcher]struct{} // stores the active log followers
-	extra   []byte                          // json-encoded extra attributes
+	buf           *bytes.Buffer
+	writer        *loggerutils.RotateFileWriter
+	mu            sync.Mutex
+	ctx           logger.Context
+	readers       map[*logger.LogWatcher]struct{} // stores the active log followers
+	extra         []byte                          // json-encoded extra attributes
+	writeNotifier *pubsub.Publisher
 }
 
 func init() {
@@ -77,10 +79,11 @@ func New(ctx logger.Context) (logger.Logger, error) {
 	}
 
 	return &JSONFileLogger{
-		buf:     bytes.NewBuffer(nil),
-		writer:  writer,
-		readers: make(map[*logger.LogWatcher]struct{}),
-		extra:   extra,
+		buf:           bytes.NewBuffer(nil),
+		writer:        writer,
+		readers:       make(map[*logger.LogWatcher]struct{}),
+		extra:         extra,
+		writeNotifier: pubsub.NewPublisher(0, 10),
 	}, nil
 }
 
@@ -104,6 +107,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
 
 	l.buf.WriteByte('\n')
 	_, err = l.writer.Write(l.buf.Bytes())
+	l.writeNotifier.Publish(struct{}{})
 	l.buf.Reset()
 
 	return err
@@ -137,6 +141,7 @@ func (l *JSONFileLogger) Close() error {
 		r.Close()
 		delete(l.readers, r)
 	}
+	l.writeNotifier.Close()
 	l.mu.Unlock()
 	return err
 }

+ 62 - 80
daemon/logger/jsonfilelog/read.go

@@ -10,14 +10,11 @@ import (
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/pkg/filenotify"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/tailfile"
 )
 
-const maxJSONDecodeRetry = 20000
-
 func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
 	l.Reset()
 	if err := dec.Decode(l); err != nil {
@@ -35,7 +32,6 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
 // created by this driver.
 func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
 	logWatcher := logger.NewLogWatcher()
-
 	go l.readLogs(logWatcher, config)
 	return logWatcher
 }
@@ -85,7 +81,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
 	l.mu.Unlock()
 
 	notifyRotate := l.writer.NotifyRotate()
-	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
+	l.followLogs(latestFile, logWatcher, notifyRotate, config.Since)
 
 	l.mu.Lock()
 	delete(l.readers, logWatcher)
@@ -121,95 +117,81 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
 	}
 }
 
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
-	dec := json.NewDecoder(f)
-	l := &jsonlog.JSONLog{}
+func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
+	var (
+		rotated bool
 
-	fileWatcher, err := filenotify.New()
-	if err != nil {
-		logWatcher.Err <- err
-	}
-	defer fileWatcher.Close()
+		dec         = json.NewDecoder(f)
+		log         = &jsonlog.JSONLog{}
+		writeNotify = l.writeNotifier.Subscribe()
+		watchClose  = logWatcher.WatchClose()
+	)
 
-	var retries int
-	for {
-		msg, err := decodeLogLine(dec, l)
+	reopenLogFile := func() error {
+		f.Close()
+		f, err := os.Open(f.Name())
 		if err != nil {
-			if err != io.EOF {
-				// try again because this shouldn't happen
-				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
-					dec = json.NewDecoder(f)
-					retries++
-					continue
-				}
-
-				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-				// remaining data in the parser's buffer while an io.EOF occurs.
-				// If the json logger writes a partial json log entry to the disk
-				// while at the same time the decoder tries to decode it, the race condition happens.
-				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
-					reader := io.MultiReader(dec.Buffered(), f)
-					dec = json.NewDecoder(reader)
-					retries++
-					continue
-				}
-				logWatcher.Err <- err
-				return
-			}
+			return err
+		}
+		dec = json.NewDecoder(f)
+		rotated = true
+		return nil
+	}
 
-			logrus.WithField("logger", "json-file").Debugf("waiting for events")
-			if err := fileWatcher.Add(f.Name()); err != nil {
-				logrus.WithField("logger", "json-file").Warn("falling back to file poller")
-				fileWatcher.Close()
-				fileWatcher = filenotify.NewPollingWatcher()
-				if err := fileWatcher.Add(f.Name()); err != nil {
-					logrus.Errorf("error watching log file for modifications: %v", err)
-					logWatcher.Err <- err
-				}
+	readToEnd := func() error {
+		for {
+			msg, err := decodeLogLine(dec, log)
+			if err != nil {
+				return err
 			}
-			select {
-			case <-fileWatcher.Events():
-				dec = json.NewDecoder(f)
-				fileWatcher.Remove(f.Name())
-				continue
-			case <-fileWatcher.Errors():
-				fileWatcher.Remove(f.Name())
-				logWatcher.Err <- err
-				return
-			case <-logWatcher.WatchClose():
-				fileWatcher.Remove(f.Name())
-				return
-			case <-notifyRotate:
-				f, err = os.Open(f.Name())
-				if err != nil {
-					logWatcher.Err <- err
-					return
-				}
-
-				dec = json.NewDecoder(f)
-				fileWatcher.Remove(f.Name())
-				fileWatcher.Add(f.Name())
+			if !since.IsZero() && msg.Timestamp.Before(since) {
 				continue
 			}
+			logWatcher.Msg <- msg
 		}
+	}
 
-		retries = 0 // reset retries since we've succeeded
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
+	defer func() {
+		l.writeNotifier.Evict(writeNotify)
+		if rotated {
+			f.Close()
 		}
+	}()
+
+	for {
 		select {
-		case logWatcher.Msg <- msg:
-		case <-logWatcher.WatchClose():
-			logWatcher.Msg <- msg
-			for {
-				msg, err := decodeLogLine(dec, l)
-				if err != nil {
+		case <-watchClose:
+			readToEnd()
+			return
+		case <-notifyRotate:
+			readToEnd()
+			if err := reopenLogFile(); err != nil {
+				logWatcher.Err <- err
+				return
+			}
+		case _, ok := <-writeNotify:
+			if err := readToEnd(); err == io.EOF {
+				if !ok {
+					// The writer is closed, no new logs will be generated.
 					return
 				}
-				if !since.IsZero() && msg.Timestamp.Before(since) {
-					continue
+
+				select {
+				case <-notifyRotate:
+					if err := reopenLogFile(); err != nil {
+						logWatcher.Err <- err
+						return
+					}
+				default:
+					dec = json.NewDecoder(f)
 				}
-				logWatcher.Msg <- msg
+
+			} else if err == io.ErrUnexpectedEOF {
+				dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f))
+			} else {
+				logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err)
+				logWatcher.Err <- err
+				return
 			}
 		}
 	}

+ 9 - 3
pkg/pubsub/publisher.go

@@ -54,18 +54,24 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
 // Evict removes the specified subscriber from receiving any more messages.
 func (p *Publisher) Evict(sub chan interface{}) {
 	p.m.Lock()
-	delete(p.subscribers, sub)
-	close(sub)
+	if _, ok := p.subscribers[sub]; ok {
+		delete(p.subscribers, sub)
+		close(sub)
+	}
 	p.m.Unlock()
 }
 
 // Publish sends the data in v to all subscribers currently registered with the publisher.
 func (p *Publisher) Publish(v interface{}) {
 	p.m.RLock()
+	if len(p.subscribers) == 0 {
+		p.m.RUnlock()
+		return
+	}
+
 	wg := new(sync.WaitGroup)
 	for sub, topic := range p.subscribers {
 		wg.Add(1)
-
 		go p.sendTopic(sub, topic, v, wg)
 	}
 	wg.Wait()