Pārlūkot izejas kodu

Split reader interface from logger interface

Implement new reader interface on jsonfile.
Moves jsonlog decoding from daemon to jsonfile logger.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 10 gadi atpakaļ
vecāks
revīzija
c0391bf55

+ 1 - 1
api/client/logs.go

@@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
 	follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
 	since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
 	times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
-	tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
+	tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
 	cmd.Require(flag.Exact, 1)
 
 	cmd.ParseFlags(args, true)

+ 13 - 2
api/server/server.go

@@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
 		closeNotifier = notifier.CloseNotify()
 	}
 
+	c, err := s.daemon.Get(vars["name"])
+	if err != nil {
+		return err
+	}
+
+	outStream := ioutils.NewWriteFlusher(w)
+	// write an empty chunk of data (this is to ensure that the
+	// HTTP Response is sent immediatly, even if the container has
+	// not yet produced any data)
+	outStream.Write(nil)
+
 	logsConfig := &daemon.ContainerLogsConfig{
 		Follow:     boolValue(r, "follow"),
 		Timestamps: boolValue(r, "timestamps"),
@@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
 		Tail:       r.Form.Get("tail"),
 		UseStdout:  stdout,
 		UseStderr:  stderr,
-		OutStream:  ioutils.NewWriteFlusher(w),
+		OutStream:  outStream,
 		Stop:       closeNotifier,
 	}
 
-	if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
+	if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
 		fmt.Fprintf(w, "Error running logs job: %s\n", err)
 	}
 

+ 24 - 25
daemon/container.go

@@ -25,7 +25,6 @@ import (
 	"github.com/docker/docker/pkg/broadcastwriter"
 	"github.com/docker/docker/pkg/fileutils"
 	"github.com/docker/docker/pkg/ioutils"
-	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/mount"
 	"github.com/docker/docker/pkg/nat"
 	"github.com/docker/docker/pkg/promise"
@@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
 }
 
 func (container *Container) getLogger() (logger.Logger, error) {
+	if container.logDriver != nil && container.IsRunning() {
+		return container.logDriver, nil
+	}
 	cfg := container.getLogConfig()
 	if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
 		return nil, err
@@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
 }
 
 func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
-
 	if logs {
 		logDriver, err := c.getLogger()
 		if err != nil {
-			logrus.Errorf("Error obtaining the logger %v", err)
 			return err
 		}
-		if _, ok := logDriver.(logger.Reader); !ok {
-			logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
-		} else {
-			if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
-				logrus.Errorf("Error reading logs %v", err)
-			} else {
-				dec := json.NewDecoder(cLog)
-				for {
-					l := &jsonlog.JSONLog{}
-
-					if err := dec.Decode(l); err == io.EOF {
-						break
-					} else if err != nil {
-						logrus.Errorf("Error streaming logs: %s", err)
-						break
-					}
-					if l.Stream == "stdout" && stdout != nil {
-						io.WriteString(stdout, l.Log)
-					}
-					if l.Stream == "stderr" && stderr != nil {
-						io.WriteString(stderr, l.Log)
-					}
+		cLog, ok := logDriver.(logger.LogReader)
+		if !ok {
+			return logger.ErrReadLogsNotSupported
+		}
+		logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
+
+	LogLoop:
+		for {
+			select {
+			case msg, ok := <-logs.Msg:
+				if !ok {
+					break LogLoop
+				}
+				if msg.Source == "stdout" && stdout != nil {
+					stdout.Write(msg.Line)
+				}
+				if msg.Source == "stderr" && stderr != nil {
+					stderr.Write(msg.Line)
 				}
+			case err := <-logs.Err:
+				logrus.Errorf("Error streaming logs: %v", err)
+				break LogLoop
 			}
 		}
 	}

+ 2 - 0
daemon/logger/factory.go

@@ -27,6 +27,7 @@ type Context struct {
 	LogPath             string
 }
 
+// Hostname returns the hostname from the underlying OS
 func (ctx *Context) Hostname() (string, error) {
 	hostname, err := os.Hostname()
 	if err != nil {
@@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
 	return hostname, nil
 }
 
+// Command returns the command that the container being logged was started with
 func (ctx *Context) Command() string {
 	terms := []string{ctx.ContainerEntrypoint}
 	for _, arg := range ctx.ContainerArgs {

+ 214 - 32
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -2,32 +2,42 @@ package jsonfilelog
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
 	"strconv"
 	"sync"
+	"time"
+
+	"gopkg.in/fsnotify.v1"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/jsonlog"
+	"github.com/docker/docker/pkg/pubsub"
+	"github.com/docker/docker/pkg/tailfile"
 	"github.com/docker/docker/pkg/timeutils"
 	"github.com/docker/docker/pkg/units"
 )
 
 const (
-	Name = "json-file"
+	Name               = "json-file"
+	maxJSONDecodeRetry = 10
 )
 
 // JSONFileLogger is Logger implementation for default docker logging:
 // JSON objects to file
 type JSONFileLogger struct {
-	buf      *bytes.Buffer
-	f        *os.File   // store for closing
-	mu       sync.Mutex // protects buffer
-	capacity int64      //maximum size of each file
-	n        int        //maximum number of files
-	ctx      logger.Context
+	buf          *bytes.Buffer
+	f            *os.File   // store for closing
+	mu           sync.Mutex // protects buffer
+	capacity     int64      //maximum size of each file
+	n            int        //maximum number of files
+	ctx          logger.Context
+	readers      map[*logger.LogWatcher]struct{} // stores the active log followers
+	notifyRotate *pubsub.Publisher
 }
 
 func init() {
@@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
 		}
 	}
 	return &JSONFileLogger{
-		f:        log,
-		buf:      bytes.NewBuffer(nil),
-		ctx:      ctx,
-		capacity: capval,
-		n:        maxFiles,
+		f:            log,
+		buf:          bytes.NewBuffer(nil),
+		ctx:          ctx,
+		capacity:     capval,
+		n:            maxFiles,
+		readers:      make(map[*logger.LogWatcher]struct{}),
+		notifyRotate: pubsub.NewPublisher(0, 1),
 	}, nil
 }
 
@@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
 			return -1, err
 		}
 		l.f = file
+		l.notifyRotate.Publish(struct{}{})
 	}
 	return writeToBuf(l)
 }
@@ -148,11 +161,11 @@ func backup(old, curr string) error {
 		}
 	}
 	if _, err := os.Stat(curr); os.IsNotExist(err) {
-		if f, err := os.Create(curr); err != nil {
+		f, err := os.Create(curr)
+		if err != nil {
 			return err
-		} else {
-			f.Close()
 		}
+		f.Close()
 	}
 	return os.Rename(curr, old)
 }
@@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
 	return nil
 }
 
-func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
-	pth := l.ctx.LogPath
-	if len(args) > 0 {
-		//check if args[0] is an integer index
-		index, err := strconv.ParseInt(args[0], 0, 0)
-		if err != nil {
-			return nil, err
-		}
-		if index > 0 {
-			pth = pth + "." + args[0]
-		}
-	}
-	return os.Open(pth)
-}
-
 func (l *JSONFileLogger) LogPath() string {
 	return l.ctx.LogPath
 }
 
-// Close closes underlying file
+// Close closes underlying file and signals all readers to stop
 func (l *JSONFileLogger) Close() error {
-	return l.f.Close()
+	l.mu.Lock()
+	err := l.f.Close()
+	for r := range l.readers {
+		r.Close()
+		delete(l.readers, r)
+	}
+	l.mu.Unlock()
+	return err
 }
 
 // Name returns name of this logger
 func (l *JSONFileLogger) Name() string {
 	return Name
 }
+
+func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
+	l.Reset()
+	if err := dec.Decode(l); err != nil {
+		return nil, err
+	}
+	msg := &logger.Message{
+		Source:    l.Stream,
+		Timestamp: l.Created,
+		Line:      []byte(l.Log),
+	}
+	return msg, nil
+}
+
+// Reads from the log file
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
+	logWatcher := logger.NewLogWatcher()
+
+	go l.readLogs(logWatcher, config)
+	return logWatcher
+}
+
+func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
+	defer close(logWatcher.Msg)
+
+	pth := l.ctx.LogPath
+	var files []io.ReadSeeker
+	for i := l.n; i > 1; i-- {
+		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
+		if err != nil {
+			if !os.IsNotExist(err) {
+				logWatcher.Err <- err
+				break
+			}
+			continue
+		}
+		defer f.Close()
+		files = append(files, f)
+	}
+
+	latestFile, err := os.Open(pth)
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+	defer latestFile.Close()
+
+	files = append(files, latestFile)
+	tailer := ioutils.MultiReadSeeker(files...)
+
+	if config.Tail != 0 {
+		tailFile(tailer, logWatcher, config.Tail, config.Since)
+	}
+
+	if !config.Follow {
+		return
+	}
+	if config.Tail == 0 {
+		latestFile.Seek(0, os.SEEK_END)
+	}
+
+	l.mu.Lock()
+	l.readers[logWatcher] = struct{}{}
+	l.mu.Unlock()
+
+	notifyRotate := l.notifyRotate.Subscribe()
+	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
+
+	l.mu.Lock()
+	delete(l.readers, logWatcher)
+	l.mu.Unlock()
+
+	l.notifyRotate.Evict(notifyRotate)
+}
+
+func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
+	var rdr io.Reader = f
+	if tail > 0 {
+		ls, err := tailfile.TailFile(f, tail)
+		if err != nil {
+			logWatcher.Err <- err
+			return
+		}
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
+	}
+	dec := json.NewDecoder(rdr)
+	l := &jsonlog.JSONLog{}
+	for {
+		msg, err := decodeLogLine(dec, l)
+		if err != nil {
+			if err != io.EOF {
+				logWatcher.Err <- err
+			}
+			return
+		}
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		logWatcher.Msg <- msg
+	}
+}
+
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
+	dec := json.NewDecoder(f)
+	l := &jsonlog.JSONLog{}
+	fileWatcher, err := fsnotify.NewWatcher()
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+	defer fileWatcher.Close()
+	if err := fileWatcher.Add(f.Name()); err != nil {
+		logWatcher.Err <- err
+		return
+	}
+
+	var retries int
+	for {
+		msg, err := decodeLogLine(dec, l)
+		if err != nil {
+			if err != io.EOF {
+				// try again because this shouldn't happen
+				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
+					dec = json.NewDecoder(f)
+					retries += 1
+					continue
+				}
+				logWatcher.Err <- err
+				return
+			}
+
+			select {
+			case <-fileWatcher.Events:
+				dec = json.NewDecoder(f)
+				continue
+			case <-fileWatcher.Errors:
+				logWatcher.Err <- err
+				return
+			case <-logWatcher.WatchClose():
+				return
+			case <-notifyRotate:
+				fileWatcher.Remove(f.Name())
+
+				f, err = os.Open(f.Name())
+				if err != nil {
+					logWatcher.Err <- err
+					return
+				}
+				if err := fileWatcher.Add(f.Name()); err != nil {
+					logWatcher.Err <- err
+				}
+				dec = json.NewDecoder(f)
+				continue
+			}
+		}
+
+		retries = 0 // reset retries since we've succeeded
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		select {
+		case logWatcher.Msg <- msg:
+		case <-logWatcher.WatchClose():
+			logWatcher.Msg <- msg
+			for {
+				msg, err := decodeLogLine(dec, l)
+				if err != nil {
+					return
+				}
+				if !since.IsZero() && msg.Timestamp.Before(since) {
+					continue
+				}
+				logWatcher.Msg <- msg
+			}
+		}
+	}
+}

+ 51 - 6
daemon/logger/logger.go

@@ -2,11 +2,19 @@ package logger
 
 import (
 	"errors"
-	"io"
 	"time"
+
+	"github.com/docker/docker/pkg/timeutils"
 )
 
-var ReadLogsNotSupported = errors.New("configured logging reader does not support reading")
+// ErrReadLogsNotSupported is returned when the logger does not support reading logs
+var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading")
+
+const (
+	// TimeFormat is the time format used for timestamps sent to log readers
+	TimeFormat           = timeutils.RFC3339NanoFixed
+	logWatcherBufferSize = 4096
+)
 
 // Message is datastructure that represents record from some container
 type Message struct {
@@ -16,14 +24,51 @@ type Message struct {
 	Timestamp   time.Time
 }
 
-// Logger is interface for docker logging drivers
+// Logger is the interface for docker logging drivers
 type Logger interface {
 	Log(*Message) error
 	Name() string
 	Close() error
 }
 
-//Reader is an interface for docker logging drivers that support reading
-type Reader interface {
-	ReadLog(args ...string) (io.Reader, error)
+// ReadConfig is the configuration passed into ReadLogs
+type ReadConfig struct {
+	Since  time.Time
+	Tail   int
+	Follow bool
+}
+
+// LogReader is the interface for reading log messages for loggers that support reading
+type LogReader interface {
+	// Read logs from underlying logging backend
+	ReadLogs(ReadConfig) *LogWatcher
+}
+
+// LogWatcher is used when consuming logs read from the LogReader interface
+type LogWatcher struct {
+	// For sending log messages to a reader
+	Msg chan *Message
+	// For sending error messages that occur while while reading logs
+	Err           chan error
+	closeNotifier chan struct{}
+}
+
+// NewLogWatcher returns a new LogWatcher.
+func NewLogWatcher() *LogWatcher {
+	return &LogWatcher{
+		Msg:           make(chan *Message, logWatcherBufferSize),
+		Err:           make(chan error, 1),
+		closeNotifier: make(chan struct{}),
+	}
+}
+
+// Close notifies the underlying log reader to stop
+func (w *LogWatcher) Close() {
+	close(w.closeNotifier)
+}
+
+// WatchClose returns a channel receiver that receives notification when the watcher has been closed
+// This should only be called from one goroutine
+func (w *LogWatcher) WatchClose() <-chan struct{} {
+	return w.closeNotifier
 }

+ 36 - 190
daemon/logs.go

@@ -1,23 +1,14 @@
 package daemon
 
 import (
-	"bytes"
-	"encoding/json"
 	"fmt"
 	"io"
-	"net"
-	"os"
 	"strconv"
-	"syscall"
 	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/daemon/logger/jsonfilelog"
-	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/stdcopy"
-	"github.com/docker/docker/pkg/tailfile"
-	"github.com/docker/docker/pkg/timeutils"
 )
 
 type ContainerLogsConfig struct {
@@ -29,209 +20,64 @@ type ContainerLogsConfig struct {
 	Stop                 <-chan bool
 }
 
-func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error {
-	var (
-		lines  = -1
-		format string
-	)
+func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error {
 	if !(config.UseStdout || config.UseStderr) {
 		return fmt.Errorf("You must choose at least one stream")
 	}
-	if config.Timestamps {
-		format = timeutils.RFC3339NanoFixed
-	}
-	if config.Tail == "" {
-		config.Tail = "latest"
-	}
-
-	container, err := daemon.Get(name)
-	if err != nil {
-		return err
-	}
 
-	var (
-		outStream = config.OutStream
-		errStream io.Writer
-	)
+	outStream := config.OutStream
+	errStream := outStream
 	if !container.Config.Tty {
 		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
 		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
-	} else {
-		errStream = outStream
-	}
-
-	if container.LogDriverType() != jsonfilelog.Name {
-		return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver")
-	}
-
-	maxFile := 1
-	container.readHostConfig()
-	cfg := container.getLogConfig()
-	conf := cfg.Config
-	if val, ok := conf["max-file"]; ok {
-		var err error
-		maxFile, err = strconv.Atoi(val)
-		if err != nil {
-			return fmt.Errorf("Error reading max-file value: %s", err)
-		}
 	}
 
-	logDriver, err := container.getLogger()
+	cLog, err := container.getLogger()
 	if err != nil {
 		return err
 	}
-	_, ok := logDriver.(logger.Reader)
+	logReader, ok := cLog.(logger.LogReader)
 	if !ok {
-		logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name())
-	} else {
-		// json-file driver
-		if config.Tail != "all" && config.Tail != "latest" {
-			var err error
-			lines, err = strconv.Atoi(config.Tail)
-			if err != nil {
-				logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err)
-				lines = -1
-			}
-		}
-
-		if lines != 0 {
-			n := maxFile
-			if config.Tail == "latest" && config.Since.IsZero() {
-				n = 1
-			}
-			before := false
-			for i := n; i > 0; i-- {
-				if before {
-					break
-				}
-				cLog, err := getReader(logDriver, i, n, lines)
-				if err != nil {
-					logrus.Debugf("Error reading %d log file: %v", i-1, err)
-					continue
-				}
-				//if lines are specified, then iterate only once
-				if lines > 0 {
-					i = 1
-				} else { // if lines are not specified, cLog is a file, It needs to be closed
-					defer cLog.(*os.File).Close()
-				}
-				dec := json.NewDecoder(cLog)
-				l := &jsonlog.JSONLog{}
-				for {
-					l.Reset()
-					if err := dec.Decode(l); err == io.EOF {
-						break
-					} else if err != nil {
-						logrus.Errorf("Error streaming logs: %s", err)
-						break
-					}
-					logLine := l.Log
-					if !config.Since.IsZero() && l.Created.Before(config.Since) {
-						continue
-					}
-					if config.Timestamps {
-						// format can be "" or time format, so here can't be error
-						logLine, _ = l.Format(format)
-					}
-					if l.Stream == "stdout" && config.UseStdout {
-						io.WriteString(outStream, logLine)
-					}
-					if l.Stream == "stderr" && config.UseStderr {
-						io.WriteString(errStream, logLine)
-					}
-				}
-			}
-		}
+		return logger.ErrReadLogsNotSupported
 	}
 
-	if config.Follow && container.IsRunning() {
-		chErrStderr := make(chan error)
-		chErrStdout := make(chan error)
-		var stdoutPipe, stderrPipe io.ReadCloser
-
-		// write an empty chunk of data (this is to ensure that the
-		// HTTP Response is sent immediatly, even if the container has
-		// not yet produced any data)
-		outStream.Write(nil)
+	follow := config.Follow && container.IsRunning()
+	tailLines, err := strconv.Atoi(config.Tail)
+	if err != nil {
+		tailLines = -1
+	}
 
-		if config.UseStdout {
-			stdoutPipe = container.StdoutLogPipe()
-			go func() {
-				logrus.Debug("logs: stdout stream begin")
-				chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
-				logrus.Debug("logs: stdout stream end")
-			}()
-		}
-		if config.UseStderr {
-			stderrPipe = container.StderrLogPipe()
-			go func() {
-				logrus.Debug("logs: stderr stream begin")
-				chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
-				logrus.Debug("logs: stderr stream end")
-			}()
-		}
+	logrus.Debug("logs: begin stream")
+	readConfig := logger.ReadConfig{
+		Since:  config.Since,
+		Tail:   tailLines,
+		Follow: follow,
+	}
+	logs := logReader.ReadLogs(readConfig)
 
+	for {
 		select {
-		case err = <-chErrStderr:
-			if stdoutPipe != nil {
-				stdoutPipe.Close()
-				<-chErrStdout
-			}
-		case err = <-chErrStdout:
-			if stderrPipe != nil {
-				stderrPipe.Close()
-				<-chErrStderr
-			}
+		case err := <-logs.Err:
+			logrus.Errorf("Error streaming logs: %v", err)
+			return nil
 		case <-config.Stop:
-			if stdoutPipe != nil {
-				stdoutPipe.Close()
-				<-chErrStdout
+			logs.Close()
+			return nil
+		case msg, ok := <-logs.Msg:
+			if !ok {
+				logrus.Debugf("logs: end stream")
+				return nil
 			}
-			if stderrPipe != nil {
-				stderrPipe.Close()
-				<-chErrStderr
+			logLine := msg.Line
+			if config.Timestamps {
+				logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
 			}
-			return nil
-		}
-
-		if err != nil && err != io.EOF && err != io.ErrClosedPipe {
-			if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
-				logrus.Errorf("error streaming logs: %v", err)
+			if msg.Source == "stdout" && config.UseStdout {
+				outStream.Write(logLine)
+			}
+			if msg.Source == "stderr" && config.UseStderr {
+				errStream.Write(logLine)
 			}
 		}
 	}
-	return nil
-}
-
-func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) {
-	if lines <= 0 {
-		index := strconv.Itoa(fileIndex - 1)
-		cLog, err := logDriver.(logger.Reader).ReadLog(index)
-		return cLog, err
-	}
-	buf := bytes.NewBuffer([]byte{})
-	remaining := lines
-	for i := 0; i < maxFiles; i++ {
-		index := strconv.Itoa(i)
-		cLog, err := logDriver.(logger.Reader).ReadLog(index)
-		if err != nil {
-			return buf, err
-		}
-		f := cLog.(*os.File)
-		ls, err := tailfile.TailFile(f, remaining)
-		if err != nil {
-			return buf, err
-		}
-		tmp := bytes.NewBuffer([]byte{})
-		for _, l := range ls {
-			fmt.Fprintf(tmp, "%s\n", l)
-		}
-		tmp.ReadFrom(buf)
-		buf = tmp
-		if len(ls) == remaining {
-			return buf, nil
-		}
-		remaining = remaining - len(ls)
-	}
-	return buf, nil
 }

+ 5 - 2
daemon/monitor.go

@@ -12,7 +12,10 @@ import (
 	"github.com/docker/docker/runconfig"
 )
 
-const defaultTimeIncrement = 100
+const (
+	defaultTimeIncrement = 100
+	loggerCloseTimeout   = 10 * time.Second
+)
 
 // containerMonitor monitors the execution of a container's main process.
 // If a restart policy is specified for the container the monitor will ensure that the
@@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
 				close(exit)
 			}()
 			select {
-			case <-time.After(1 * time.Second):
+			case <-time.After(loggerCloseTimeout):
 				logrus.Warnf("Logger didn't exit in time: logs may be truncated")
 			case <-exit:
 			}

+ 1 - 1
docs/reference/commandline/logs.md

@@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from
 the container's `STDOUT` and `STDERR`.
 
 Passing a negative number or a non-integer to `--tail` is invalid and the
-value is set to `latest` in that case.
+value is set to `all` in that case.
 
 The `docker logs --timestamp` commands will add an RFC3339Nano
 timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each

+ 1 - 5
integration-cli/docker_cli_logs_test.go

@@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
 	}()
 
 	logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
-
 	stdout, err := logCmd.StdoutPipe()
 	c.Assert(err, check.IsNil)
-
-	if err := logCmd.Start(); err != nil {
-		c.Fatal(err)
-	}
+	c.Assert(logCmd.Start(), check.IsNil)
 
 	// First read slowly
 	bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)

+ 226 - 0
pkg/ioutils/multireader.go

@@ -0,0 +1,226 @@
+package ioutils
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"os"
+)
+
+type pos struct {
+	idx    int
+	offset int64
+}
+
+type multiReadSeeker struct {
+	readers []io.ReadSeeker
+	pos     *pos
+	posIdx  map[io.ReadSeeker]int
+}
+
+func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
+	var tmpOffset int64
+	switch whence {
+	case os.SEEK_SET:
+		for i, rdr := range r.readers {
+			// get size of the current reader
+			s, err := rdr.Seek(0, os.SEEK_END)
+			if err != nil {
+				return -1, err
+			}
+
+			if offset > tmpOffset+s {
+				if i == len(r.readers)-1 {
+					rdrOffset := s + (offset - tmpOffset)
+					if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
+						return -1, err
+					}
+					r.pos = &pos{i, rdrOffset}
+					return offset, nil
+				}
+
+				tmpOffset += s
+				continue
+			}
+
+			rdrOffset := offset - tmpOffset
+			idx := i
+
+			rdr.Seek(rdrOffset, os.SEEK_SET)
+			// make sure all following readers are at 0
+			for _, rdr := range r.readers[i+1:] {
+				rdr.Seek(0, os.SEEK_SET)
+			}
+
+			if rdrOffset == s && i != len(r.readers)-1 {
+				idx += 1
+				rdrOffset = 0
+			}
+			r.pos = &pos{idx, rdrOffset}
+			return offset, nil
+		}
+	case os.SEEK_END:
+		for _, rdr := range r.readers {
+			s, err := rdr.Seek(0, os.SEEK_END)
+			if err != nil {
+				return -1, err
+			}
+			tmpOffset += s
+		}
+		r.Seek(tmpOffset+offset, os.SEEK_SET)
+		return tmpOffset + offset, nil
+	case os.SEEK_CUR:
+		if r.pos == nil {
+			return r.Seek(offset, os.SEEK_SET)
+		}
+		// Just return the current offset
+		if offset == 0 {
+			return r.getCurOffset()
+		}
+
+		curOffset, err := r.getCurOffset()
+		if err != nil {
+			return -1, err
+		}
+		rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
+		if err != nil {
+			return -1, err
+		}
+
+		r.pos = &pos{r.posIdx[rdr], rdrOffset}
+		return curOffset + offset, nil
+	default:
+		return -1, fmt.Errorf("Invalid whence: %d", whence)
+	}
+
+	return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
+}
+
+func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
+	var rdr io.ReadSeeker
+	var rdrOffset int64
+
+	for i, rdr := range r.readers {
+		offsetTo, err := r.getOffsetToReader(rdr)
+		if err != nil {
+			return nil, -1, err
+		}
+		if offsetTo > offset {
+			rdr = r.readers[i-1]
+			rdrOffset = offsetTo - offset
+			break
+		}
+
+		if rdr == r.readers[len(r.readers)-1] {
+			rdrOffset = offsetTo + offset
+			break
+		}
+	}
+
+	return rdr, rdrOffset, nil
+}
+
+func (r *multiReadSeeker) getCurOffset() (int64, error) {
+	var totalSize int64
+	for _, rdr := range r.readers[:r.pos.idx+1] {
+		if r.posIdx[rdr] == r.pos.idx {
+			totalSize += r.pos.offset
+			break
+		}
+
+		size, err := getReadSeekerSize(rdr)
+		if err != nil {
+			return -1, fmt.Errorf("error getting seeker size: %v", err)
+		}
+		totalSize += size
+	}
+	return totalSize, nil
+}
+
+func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
+	var offset int64
+	for _, r := range r.readers {
+		if r == rdr {
+			break
+		}
+
+		size, err := getReadSeekerSize(rdr)
+		if err != nil {
+			return -1, err
+		}
+		offset += size
+	}
+	return offset, nil
+}
+
+func (r *multiReadSeeker) Read(b []byte) (int, error) {
+	if r.pos == nil {
+		r.pos = &pos{0, 0}
+	}
+
+	bCap := int64(cap(b))
+	buf := bytes.NewBuffer(nil)
+	var rdr io.ReadSeeker
+
+	for _, rdr = range r.readers[r.pos.idx:] {
+		readBytes, err := io.CopyN(buf, rdr, bCap)
+		if err != nil && err != io.EOF {
+			return -1, err
+		}
+		bCap -= readBytes
+
+		if bCap == 0 {
+			break
+		}
+	}
+
+	rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return -1, err
+	}
+	r.pos = &pos{r.posIdx[rdr], rdrPos}
+	return buf.Read(b)
+}
+
+func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
+	// save the current position
+	pos, err := rdr.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return -1, err
+	}
+
+	// get the size
+	size, err := rdr.Seek(0, os.SEEK_END)
+	if err != nil {
+		return -1, err
+	}
+
+	// reset the position
+	if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
+		return -1, err
+	}
+	return size, nil
+}
+
+// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
+// input readseekers. After calling this method the initial position is set to the
+// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
+// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
+// Seek can be used over the sum of lengths of all readseekers.
+//
+// When a MultiReadSeeker is used, no Read and Seek operations should be made on
+// its ReadSeeker components. Also, users should make no assumption on the state
+// of individual readseekers while the MultiReadSeeker is used.
+func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
+	if len(readers) == 1 {
+		return readers[0]
+	}
+	idx := make(map[io.ReadSeeker]int)
+	for i, rdr := range readers {
+		idx[rdr] = i
+	}
+	return &multiReadSeeker{
+		readers: readers,
+		posIdx:  idx,
+	}
+}

+ 149 - 0
pkg/ioutils/multireader_test.go

@@ -0,0 +1,149 @@
+package ioutils
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"strings"
+	"testing"
+)
+
+func TestMultiReadSeekerReadAll(t *testing.T) {
+	str := "hello world"
+	s1 := strings.NewReader(str + " 1")
+	s2 := strings.NewReader(str + " 2")
+	s3 := strings.NewReader(str + " 3")
+	mr := MultiReadSeeker(s1, s2, s3)
+
+	expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
+
+	b, err := ioutil.ReadAll(mr)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	expected := "hello world 1hello world 2hello world 3"
+	if string(b) != expected {
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
+	}
+
+	size, err := mr.Seek(0, os.SEEK_END)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if size != expectedSize {
+		t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
+	}
+
+	// Reset the position and read again
+	pos, err := mr.Seek(0, os.SEEK_SET)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if pos != 0 {
+		t.Fatalf("expected position to be set to 0, got %d", pos)
+	}
+
+	b, err = ioutil.ReadAll(mr)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if string(b) != expected {
+		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
+	}
+}
+
+func TestMultiReadSeekerReadEach(t *testing.T) {
+	str := "hello world"
+	s1 := strings.NewReader(str + " 1")
+	s2 := strings.NewReader(str + " 2")
+	s3 := strings.NewReader(str + " 3")
+	mr := MultiReadSeeker(s1, s2, s3)
+
+	var totalBytes int64
+	for i, s := range []*strings.Reader{s1, s2, s3} {
+		sLen := int64(s.Len())
+		buf := make([]byte, s.Len())
+		expected := []byte(fmt.Sprintf("%s %d", str, i+1))
+
+		if _, err := mr.Read(buf); err != nil && err != io.EOF {
+			t.Fatal(err)
+		}
+
+		if !bytes.Equal(buf, expected) {
+			t.Fatalf("expected %q to be %q", string(buf), string(expected))
+		}
+
+		pos, err := mr.Seek(0, os.SEEK_CUR)
+		if err != nil {
+			t.Fatalf("iteration: %d, error: %v", i+1, err)
+		}
+
+		// check that the total bytes read is the current position of the seeker
+		totalBytes += sLen
+		if pos != totalBytes {
+			t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
+		}
+
+		// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
+		newPos, err := mr.Seek(pos, os.SEEK_SET)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if newPos != pos {
+			t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
+		}
+	}
+}
+
+func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
+	str := "hello world"
+	s1 := strings.NewReader(str + " 1")
+	s2 := strings.NewReader(str + " 2")
+	s3 := strings.NewReader(str + " 3")
+	mr := MultiReadSeeker(s1, s2, s3)
+
+	buf := make([]byte, s1.Len()+3)
+	_, err := mr.Read(buf)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
+	expected := "hello world 1hel"
+	if string(buf) != expected {
+		t.Fatalf("expected %s to be %s", string(buf), expected)
+	}
+}
+
+func TestMultiReadSeekerNegativeSeek(t *testing.T) {
+	str := "hello world"
+	s1 := strings.NewReader(str + " 1")
+	s2 := strings.NewReader(str + " 2")
+	s3 := strings.NewReader(str + " 3")
+	mr := MultiReadSeeker(s1, s2, s3)
+
+	s1Len := s1.Len()
+	s2Len := s2.Len()
+	s3Len := s3.Len()
+
+	s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if s != int64(s1Len+s2Len) {
+		t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
+	}
+
+	buf := make([]byte, s3Len)
+	if _, err := mr.Read(buf); err != nil && err != io.EOF {
+		t.Fatal(err)
+	}
+	expected := fmt.Sprintf("%s %d", str, 3)
+	if string(buf) != fmt.Sprintf("%s %d", str, 3) {
+		t.Fatalf("expected %q to be %q", string(buf), expected)
+	}
+}

+ 2 - 1
pkg/tailfile/tailfile.go

@@ -3,6 +3,7 @@ package tailfile
 import (
 	"bytes"
 	"errors"
+	"io"
 	"os"
 )
 
@@ -12,7 +13,7 @@ var eol = []byte("\n")
 var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
 
 //TailFile returns last n lines of file f
-func TailFile(f *os.File, n int) ([][]byte, error) {
+func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
 	if n <= 0 {
 		return nil, ErrNonPositiveLinesNumber
 	}