Browse Source

Merge pull request #37576 from kolyshkin/logs-f-leak

daemon.ContainerLogs(): fix resource leak on follow
Sebastiaan van Stijn 6 years ago
parent
commit
77faf158f5

+ 1 - 1
daemon/attach.go

@@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
 			return logger.ErrReadLogsNotSupported{}
 		}
 		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
-		defer logs.Close()
+		defer logs.ConsumerGone()
 
 	LogLoop:
 		for {

+ 3 - 13
daemon/logger/adapter.go

@@ -93,21 +93,12 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
 
 		dec := logdriver.NewLogEntryDecoder(stream)
 		for {
-			select {
-			case <-watcher.WatchClose():
-				return
-			default:
-			}
-
 			var buf logdriver.LogEntry
 			if err := dec.Decode(&buf); err != nil {
 				if err == io.EOF {
 					return
 				}
-				select {
-				case watcher.Err <- errors.Wrap(err, "error decoding log message"):
-				case <-watcher.WatchClose():
-				}
+				watcher.Err <- errors.Wrap(err, "error decoding log message")
 				return
 			}
 
@@ -125,11 +116,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
 				return
 			}
 
+			// send the message unless the consumer is gone
 			select {
 			case watcher.Msg <- msg:
-			case <-watcher.WatchClose():
-				// make sure the message we consumed is sent
-				watcher.Msg <- msg
+			case <-watcher.WatchConsumerGone():
 				return
 			}
 		}

+ 1 - 1
daemon/logger/adapter_test.go

@@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
 		t.Fatal("timeout waiting for message channel to close")
 
 	}
-	lw.Close()
+	lw.ProducerGone()
 
 	lw = lr.ReadLogs(ReadConfig{Follow: true})
 	for _, x := range testMsg {

+ 2 - 2
daemon/logger/journald/read.go

@@ -165,7 +165,7 @@ func (s *journald) Close() error {
 	s.mu.Lock()
 	s.closed = true
 	for reader := range s.readers.readers {
-		reader.Close()
+		reader.ProducerGone()
 	}
 	s.mu.Unlock()
 	return nil
@@ -299,7 +299,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
 	// Wait until we're told to stop.
 	select {
 	case cursor = <-newCursor:
-	case <-logWatcher.WatchClose():
+	case <-logWatcher.WatchConsumerGone():
 		// Notify the other goroutine that its work is done.
 		C.close(pfd[1])
 		cursor = <-newCursor

+ 3 - 2
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -166,13 +166,14 @@ func ValidateLogOpt(cfg map[string]string) error {
 	return nil
 }
 
-// Close closes underlying file and signals all readers to stop.
+// Close closes underlying file and signals all the readers
+// that the logs producer is gone.
 func (l *JSONFileLogger) Close() error {
 	l.mu.Lock()
 	l.closed = true
 	err := l.writer.Close()
 	for r := range l.readers {
-		r.Close()
+		r.ProducerGone()
 		delete(l.readers, r)
 	}
 	l.mu.Unlock()

+ 1 - 2
daemon/logger/jsonfilelog/read_test.go

@@ -50,11 +50,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
 	}()
 
 	lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
-	watchClose := lw.WatchClose()
 	for {
 		select {
 		case <-lw.Msg:
-		case <-watchClose:
+		case <-lw.WatchProducerGone():
 			return
 		case err := <-chError:
 			if err != nil {

+ 1 - 1
daemon/logger/local/local.go

@@ -166,7 +166,7 @@ func (d *driver) Close() error {
 	d.closed = true
 	err := d.logfile.Close()
 	for r := range d.readers {
-		r.Close()
+		r.ProducerGone()
 		delete(d.readers, r)
 	}
 	d.mu.Unlock()

+ 32 - 15
daemon/logger/logger.go

@@ -104,33 +104,50 @@ type LogWatcher struct {
 	// For sending log messages to a reader.
 	Msg chan *Message
 	// For sending error messages that occur while while reading logs.
-	Err           chan error
-	closeOnce     sync.Once
-	closeNotifier chan struct{}
+	Err          chan error
+	producerOnce sync.Once
+	producerGone chan struct{}
+	consumerOnce sync.Once
+	consumerGone chan struct{}
 }
 
 // NewLogWatcher returns a new LogWatcher.
 func NewLogWatcher() *LogWatcher {
 	return &LogWatcher{
-		Msg:           make(chan *Message, logWatcherBufferSize),
-		Err:           make(chan error, 1),
-		closeNotifier: make(chan struct{}),
+		Msg:          make(chan *Message, logWatcherBufferSize),
+		Err:          make(chan error, 1),
+		producerGone: make(chan struct{}),
+		consumerGone: make(chan struct{}),
 	}
 }
 
-// Close notifies the underlying log reader to stop.
-func (w *LogWatcher) Close() {
+// ProducerGone notifies the underlying log reader that
+// the logs producer (a container) is gone.
+func (w *LogWatcher) ProducerGone() {
 	// only close if not already closed
-	w.closeOnce.Do(func() {
-		close(w.closeNotifier)
+	w.producerOnce.Do(func() {
+		close(w.producerGone)
 	})
 }
 
-// WatchClose returns a channel receiver that receives notification
-// when the watcher has been closed. This should only be called from
-// one goroutine.
-func (w *LogWatcher) WatchClose() <-chan struct{} {
-	return w.closeNotifier
+// WatchProducerGone returns a channel receiver that receives notification
+// once the logs producer (a container) is gone.
+func (w *LogWatcher) WatchProducerGone() <-chan struct{} {
+	return w.producerGone
+}
+
+// ConsumerGone notifies that the logs consumer is gone.
+func (w *LogWatcher) ConsumerGone() {
+	// only close if not already closed
+	w.consumerOnce.Do(func() {
+		close(w.consumerGone)
+	})
+}
+
+// WatchConsumerGone returns a channel receiver that receives notification
+// when the log watcher consumer is gone.
+func (w *LogWatcher) WatchConsumerGone() <-chan struct{} {
+	return w.consumerGone
 }
 
 // Capability defines the list of capabilities that a driver can implement

+ 10 - 31
daemon/logger/loggerutils/logfile.go

@@ -488,7 +488,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
 	go func() {
 		select {
 		case <-ctx.Done():
-		case <-watcher.WatchClose():
+		case <-watcher.WatchConsumerGone():
 			cancel()
 		}
 	}()
@@ -546,22 +546,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 	}
 	defer func() {
 		f.Close()
-		fileWatcher.Remove(name)
 		fileWatcher.Close()
 	}()
 
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go func() {
-		select {
-		case <-logWatcher.WatchClose():
-			fileWatcher.Remove(name)
-			cancel()
-		case <-ctx.Done():
-			return
-		}
-	}()
-
 	var retries int
 	handleRotate := func() error {
 		f.Close()
@@ -596,7 +583,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 			case fsnotify.Rename, fsnotify.Remove:
 				select {
 				case <-notifyRotate:
-				case <-ctx.Done():
+				case <-logWatcher.WatchProducerGone():
+					return errDone
+				case <-logWatcher.WatchConsumerGone():
 					return errDone
 				}
 				if err := handleRotate(); err != nil {
@@ -618,7 +607,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 				return errRetry
 			}
 			return err
-		case <-ctx.Done():
+		case <-logWatcher.WatchProducerGone():
+			return errDone
+		case <-logWatcher.WatchConsumerGone():
 			return errDone
 		}
 	}
@@ -664,23 +655,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
 		if !until.IsZero() && msg.Timestamp.After(until) {
 			return
 		}
+		// send the message, unless the consumer is gone
 		select {
 		case logWatcher.Msg <- msg:
-		case <-ctx.Done():
-			logWatcher.Msg <- msg
-			for {
-				msg, err := decodeLogLine()
-				if err != nil {
-					return
-				}
-				if !since.IsZero() && msg.Timestamp.Before(since) {
-					continue
-				}
-				if !until.IsZero() && msg.Timestamp.After(until) {
-					return
-				}
-				logWatcher.Msg <- msg
-			}
+		case <-logWatcher.WatchConsumerGone():
+			return
 		}
 	}
 }

+ 127 - 0
daemon/logger/loggerutils/logfile_test.go

@@ -4,6 +4,8 @@ import (
 	"bufio"
 	"context"
 	"io"
+	"io/ioutil"
+	"os"
 	"strings"
 	"testing"
 	"time"
@@ -74,3 +76,128 @@ func TestTailFiles(t *testing.T) {
 		assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
 	}
 }
+
+func TestFollowLogsConsumerGone(t *testing.T) {
+	lw := logger.NewLogWatcher()
+
+	f, err := ioutil.TempFile("", t.Name())
+	assert.NilError(t, err)
+	defer func() {
+		f.Close()
+		os.Remove(f.Name())
+	}()
+
+	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
+		return func() (*logger.Message, error) {
+			return &logger.Message{}, nil
+		}
+	}
+
+	followLogsDone := make(chan struct{})
+	var since, until time.Time
+	go func() {
+		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
+		close(followLogsDone)
+	}()
+
+	select {
+	case <-lw.Msg:
+	case err := <-lw.Err:
+		assert.NilError(t, err)
+	case <-followLogsDone:
+		t.Fatal("follow logs finished unexpectedly")
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for log message")
+	}
+
+	lw.ConsumerGone()
+	select {
+	case <-followLogsDone:
+	case <-time.After(20 * time.Second):
+		t.Fatal("timeout waiting for followLogs() to finish")
+	}
+}
+
+func TestFollowLogsProducerGone(t *testing.T) {
+	lw := logger.NewLogWatcher()
+
+	f, err := ioutil.TempFile("", t.Name())
+	assert.NilError(t, err)
+	defer os.Remove(f.Name())
+
+	var sent, received, closed int
+	makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
+		return func() (*logger.Message, error) {
+			if closed == 1 {
+				closed++
+				t.Logf("logDecode() closed after sending %d messages\n", sent)
+				return nil, io.EOF
+			} else if closed > 1 {
+				t.Fatal("logDecode() called after closing!")
+				return nil, io.EOF
+			}
+			sent++
+			return &logger.Message{}, nil
+		}
+	}
+	var since, until time.Time
+
+	followLogsDone := make(chan struct{})
+	go func() {
+		followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
+		close(followLogsDone)
+	}()
+
+	// read 1 message
+	select {
+	case <-lw.Msg:
+		received++
+	case err := <-lw.Err:
+		assert.NilError(t, err)
+	case <-followLogsDone:
+		t.Fatal("followLogs() finished unexpectedly")
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for log message")
+	}
+
+	// "stop" the "container"
+	closed = 1
+	lw.ProducerGone()
+
+	// should receive all the messages sent
+	readDone := make(chan struct{})
+	go func() {
+		defer close(readDone)
+		for {
+			select {
+			case <-lw.Msg:
+				received++
+				if received == sent {
+					return
+				}
+			case err := <-lw.Err:
+				assert.NilError(t, err)
+			}
+		}
+	}()
+	select {
+	case <-readDone:
+	case <-time.After(30 * time.Second):
+		t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
+	}
+
+	t.Logf("messages sent: %d, received: %d", sent, received)
+
+	// followLogs() should be done by now
+	select {
+	case <-followLogsDone:
+	case <-time.After(30 * time.Second):
+		t.Fatal("timeout waiting for followLogs() to finish")
+	}
+
+	select {
+	case <-lw.WatchConsumerGone():
+		t.Fatal("consumer should not have exited")
+	default:
+	}
+}

+ 4 - 4
daemon/logs.go

@@ -110,14 +110,16 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
 				}
 			}()
 		}
-		// set up some defers
-		defer logs.Close()
+		// signal that the log reader is gone
+		defer logs.ConsumerGone()
 
 		// close the messages channel. closing is the only way to signal above
 		// that we're doing with logs (other than context cancel i guess).
 		defer close(messageChan)
 
 		lg.Debug("begin logs")
+		defer lg.Debugf("end logs (%v)", ctx.Err())
+
 		for {
 			select {
 			// i do not believe as the system is currently designed any error
@@ -132,14 +134,12 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
 				}
 				return
 			case <-ctx.Done():
-				lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
 				return
 			case msg, ok := <-logs.Msg:
 				// there is some kind of pool or ring buffer in the logger that
 				// produces these messages, and a possible future optimization
 				// might be to use that pool and reuse message objects
 				if !ok {
-					lg.Debug("end logs")
 					return
 				}
 				m := msg.AsLogMessage() // just a pointer conversion, does not copy data