Split reader interface from logger interface
Implement new reader interface on jsonfile. Moves jsonlog decoding from daemon to jsonfile logger. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
d241d2f36c
commit
c0391bf554
13 changed files with 726 additions and 266 deletions
|
@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
|
|||
follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
|
||||
since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
|
||||
times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
|
||||
tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
|
||||
tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
|
||||
cmd.Require(flag.Exact, 1)
|
||||
|
||||
cmd.ParseFlags(args, true)
|
||||
|
|
|
@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
|
|||
closeNotifier = notifier.CloseNotify()
|
||||
}
|
||||
|
||||
c, err := s.daemon.Get(vars["name"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
outStream := ioutils.NewWriteFlusher(w)
|
||||
// write an empty chunk of data (this is to ensure that the
|
||||
// HTTP Response is sent immediatly, even if the container has
|
||||
// not yet produced any data)
|
||||
outStream.Write(nil)
|
||||
|
||||
logsConfig := &daemon.ContainerLogsConfig{
|
||||
Follow: boolValue(r, "follow"),
|
||||
Timestamps: boolValue(r, "timestamps"),
|
||||
|
@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
|
|||
Tail: r.Form.Get("tail"),
|
||||
UseStdout: stdout,
|
||||
UseStderr: stderr,
|
||||
OutStream: ioutils.NewWriteFlusher(w),
|
||||
OutStream: outStream,
|
||||
Stop: closeNotifier,
|
||||
}
|
||||
|
||||
if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
|
||||
if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
|
||||
fmt.Fprintf(w, "Error running logs job: %s\n", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/docker/docker/pkg/broadcastwriter"
|
||||
"github.com/docker/docker/pkg/fileutils"
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/docker/docker/pkg/jsonlog"
|
||||
"github.com/docker/docker/pkg/mount"
|
||||
"github.com/docker/docker/pkg/nat"
|
||||
"github.com/docker/docker/pkg/promise"
|
||||
|
@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
|
|||
}
|
||||
|
||||
func (container *Container) getLogger() (logger.Logger, error) {
|
||||
if container.logDriver != nil && container.IsRunning() {
|
||||
return container.logDriver, nil
|
||||
}
|
||||
cfg := container.getLogConfig()
|
||||
if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
|
||||
return nil, err
|
||||
|
@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
|
|||
}
|
||||
|
||||
func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
|
||||
|
||||
if logs {
|
||||
logDriver, err := c.getLogger()
|
||||
if err != nil {
|
||||
logrus.Errorf("Error obtaining the logger %v", err)
|
||||
return err
|
||||
}
|
||||
if _, ok := logDriver.(logger.Reader); !ok {
|
||||
logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
|
||||
} else {
|
||||
if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
|
||||
logrus.Errorf("Error reading logs %v", err)
|
||||
} else {
|
||||
dec := json.NewDecoder(cLog)
|
||||
for {
|
||||
l := &jsonlog.JSONLog{}
|
||||
cLog, ok := logDriver.(logger.LogReader)
|
||||
if !ok {
|
||||
return logger.ErrReadLogsNotSupported
|
||||
}
|
||||
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
|
||||
|
||||
if err := dec.Decode(l); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
logrus.Errorf("Error streaming logs: %s", err)
|
||||
break
|
||||
}
|
||||
if l.Stream == "stdout" && stdout != nil {
|
||||
io.WriteString(stdout, l.Log)
|
||||
}
|
||||
if l.Stream == "stderr" && stderr != nil {
|
||||
io.WriteString(stderr, l.Log)
|
||||
}
|
||||
LogLoop:
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-logs.Msg:
|
||||
if !ok {
|
||||
break LogLoop
|
||||
}
|
||||
if msg.Source == "stdout" && stdout != nil {
|
||||
stdout.Write(msg.Line)
|
||||
}
|
||||
if msg.Source == "stderr" && stderr != nil {
|
||||
stderr.Write(msg.Line)
|
||||
}
|
||||
case err := <-logs.Err:
|
||||
logrus.Errorf("Error streaming logs: %v", err)
|
||||
break LogLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ type Context struct {
|
|||
LogPath string
|
||||
}
|
||||
|
||||
// Hostname returns the hostname from the underlying OS
|
||||
func (ctx *Context) Hostname() (string, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
|
@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
|
|||
return hostname, nil
|
||||
}
|
||||
|
||||
// Command returns the command that the container being logged was started with
|
||||
func (ctx *Context) Command() string {
|
||||
terms := []string{ctx.ContainerEntrypoint}
|
||||
for _, arg := range ctx.ContainerArgs {
|
||||
|
|
|
@ -2,32 +2,42 @@ package jsonfilelog
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/fsnotify.v1"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
"github.com/docker/docker/pkg/jsonlog"
|
||||
"github.com/docker/docker/pkg/pubsub"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"github.com/docker/docker/pkg/timeutils"
|
||||
"github.com/docker/docker/pkg/units"
|
||||
)
|
||||
|
||||
const (
|
||||
Name = "json-file"
|
||||
Name = "json-file"
|
||||
maxJSONDecodeRetry = 10
|
||||
)
|
||||
|
||||
// JSONFileLogger is Logger implementation for default docker logging:
|
||||
// JSON objects to file
|
||||
type JSONFileLogger struct {
|
||||
buf *bytes.Buffer
|
||||
f *os.File // store for closing
|
||||
mu sync.Mutex // protects buffer
|
||||
capacity int64 //maximum size of each file
|
||||
n int //maximum number of files
|
||||
ctx logger.Context
|
||||
buf *bytes.Buffer
|
||||
f *os.File // store for closing
|
||||
mu sync.Mutex // protects buffer
|
||||
capacity int64 //maximum size of each file
|
||||
n int //maximum number of files
|
||||
ctx logger.Context
|
||||
readers map[*logger.LogWatcher]struct{} // stores the active log followers
|
||||
notifyRotate *pubsub.Publisher
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
|
|||
}
|
||||
}
|
||||
return &JSONFileLogger{
|
||||
f: log,
|
||||
buf: bytes.NewBuffer(nil),
|
||||
ctx: ctx,
|
||||
capacity: capval,
|
||||
n: maxFiles,
|
||||
f: log,
|
||||
buf: bytes.NewBuffer(nil),
|
||||
ctx: ctx,
|
||||
capacity: capval,
|
||||
n: maxFiles,
|
||||
readers: make(map[*logger.LogWatcher]struct{}),
|
||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
|
|||
return -1, err
|
||||
}
|
||||
l.f = file
|
||||
l.notifyRotate.Publish(struct{}{})
|
||||
}
|
||||
return writeToBuf(l)
|
||||
}
|
||||
|
@ -148,11 +161,11 @@ func backup(old, curr string) error {
|
|||
}
|
||||
}
|
||||
if _, err := os.Stat(curr); os.IsNotExist(err) {
|
||||
if f, err := os.Create(curr); err != nil {
|
||||
f, err := os.Create(curr)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
f.Close()
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
return os.Rename(curr, old)
|
||||
}
|
||||
|
@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
|
||||
pth := l.ctx.LogPath
|
||||
if len(args) > 0 {
|
||||
//check if args[0] is an integer index
|
||||
index, err := strconv.ParseInt(args[0], 0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if index > 0 {
|
||||
pth = pth + "." + args[0]
|
||||
}
|
||||
}
|
||||
return os.Open(pth)
|
||||
}
|
||||
|
||||
func (l *JSONFileLogger) LogPath() string {
|
||||
return l.ctx.LogPath
|
||||
}
|
||||
|
||||
// Close closes underlying file
|
||||
// Close closes underlying file and signals all readers to stop
|
||||
func (l *JSONFileLogger) Close() error {
|
||||
return l.f.Close()
|
||||
l.mu.Lock()
|
||||
err := l.f.Close()
|
||||
for r := range l.readers {
|
||||
r.Close()
|
||||
delete(l.readers, r)
|
||||
}
|
||||
l.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Name returns name of this logger
|
||||
func (l *JSONFileLogger) Name() string {
|
||||
return Name
|
||||
}
|
||||
|
||||
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
||||
l.Reset()
|
||||
if err := dec.Decode(l); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg := &logger.Message{
|
||||
Source: l.Stream,
|
||||
Timestamp: l.Created,
|
||||
Line: []byte(l.Log),
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
// Reads from the log file
|
||||
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||
logWatcher := logger.NewLogWatcher()
|
||||
|
||||
go l.readLogs(logWatcher, config)
|
||||
return logWatcher
|
||||
}
|
||||
|
||||
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||
defer close(logWatcher.Msg)
|
||||
|
||||
pth := l.ctx.LogPath
|
||||
var files []io.ReadSeeker
|
||||
for i := l.n; i > 1; i-- {
|
||||
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
logWatcher.Err <- err
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
defer f.Close()
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
latestFile, err := os.Open(pth)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer latestFile.Close()
|
||||
|
||||
files = append(files, latestFile)
|
||||
tailer := ioutils.MultiReadSeeker(files...)
|
||||
|
||||
if config.Tail != 0 {
|
||||
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
||||
}
|
||||
|
||||
if !config.Follow {
|
||||
return
|
||||
}
|
||||
if config.Tail == 0 {
|
||||
latestFile.Seek(0, os.SEEK_END)
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
l.readers[logWatcher] = struct{}{}
|
||||
l.mu.Unlock()
|
||||
|
||||
notifyRotate := l.notifyRotate.Subscribe()
|
||||
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
||||
|
||||
l.mu.Lock()
|
||||
delete(l.readers, logWatcher)
|
||||
l.mu.Unlock()
|
||||
|
||||
l.notifyRotate.Evict(notifyRotate)
|
||||
}
|
||||
|
||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
||||
var rdr io.Reader = f
|
||||
if tail > 0 {
|
||||
ls, err := tailfile.TailFile(f, tail)
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
||||
}
|
||||
dec := json.NewDecoder(rdr)
|
||||
l := &jsonlog.JSONLog{}
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
logWatcher.Err <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
}
|
||||
}
|
||||
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
||||
dec := json.NewDecoder(f)
|
||||
l := &jsonlog.JSONLog{}
|
||||
fileWatcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
defer fileWatcher.Close()
|
||||
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
|
||||
var retries int
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
// try again because this shouldn't happen
|
||||
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
||||
dec = json.NewDecoder(f)
|
||||
retries += 1
|
||||
continue
|
||||
}
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-fileWatcher.Events:
|
||||
dec = json.NewDecoder(f)
|
||||
continue
|
||||
case <-fileWatcher.Errors:
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
case <-logWatcher.WatchClose():
|
||||
return
|
||||
case <-notifyRotate:
|
||||
fileWatcher.Remove(f.Name())
|
||||
|
||||
f, err = os.Open(f.Name())
|
||||
if err != nil {
|
||||
logWatcher.Err <- err
|
||||
return
|
||||
}
|
||||
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||
logWatcher.Err <- err
|
||||
}
|
||||
dec = json.NewDecoder(f)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
retries = 0 // reset retries since we've succeeded
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-logWatcher.WatchClose():
|
||||
logWatcher.Msg <- msg
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,11 +2,19 @@ package logger
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/pkg/timeutils"
|
||||
)
|
||||
|
||||
var ReadLogsNotSupported = errors.New("configured logging reader does not support reading")
|
||||
// ErrReadLogsNotSupported is returned when the logger does not support reading logs
|
||||
var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading")
|
||||
|
||||
const (
|
||||
// TimeFormat is the time format used for timestamps sent to log readers
|
||||
TimeFormat = timeutils.RFC3339NanoFixed
|
||||
logWatcherBufferSize = 4096
|
||||
)
|
||||
|
||||
// Message is datastructure that represents record from some container
|
||||
type Message struct {
|
||||
|
@ -16,14 +24,51 @@ type Message struct {
|
|||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Logger is interface for docker logging drivers
|
||||
// Logger is the interface for docker logging drivers
|
||||
type Logger interface {
|
||||
Log(*Message) error
|
||||
Name() string
|
||||
Close() error
|
||||
}
|
||||
|
||||
//Reader is an interface for docker logging drivers that support reading
|
||||
type Reader interface {
|
||||
ReadLog(args ...string) (io.Reader, error)
|
||||
// ReadConfig is the configuration passed into ReadLogs
|
||||
type ReadConfig struct {
|
||||
Since time.Time
|
||||
Tail int
|
||||
Follow bool
|
||||
}
|
||||
|
||||
// LogReader is the interface for reading log messages for loggers that support reading
|
||||
type LogReader interface {
|
||||
// Read logs from underlying logging backend
|
||||
ReadLogs(ReadConfig) *LogWatcher
|
||||
}
|
||||
|
||||
// LogWatcher is used when consuming logs read from the LogReader interface
|
||||
type LogWatcher struct {
|
||||
// For sending log messages to a reader
|
||||
Msg chan *Message
|
||||
// For sending error messages that occur while while reading logs
|
||||
Err chan error
|
||||
closeNotifier chan struct{}
|
||||
}
|
||||
|
||||
// NewLogWatcher returns a new LogWatcher.
|
||||
func NewLogWatcher() *LogWatcher {
|
||||
return &LogWatcher{
|
||||
Msg: make(chan *Message, logWatcherBufferSize),
|
||||
Err: make(chan error, 1),
|
||||
closeNotifier: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Close notifies the underlying log reader to stop
|
||||
func (w *LogWatcher) Close() {
|
||||
close(w.closeNotifier)
|
||||
}
|
||||
|
||||
// WatchClose returns a channel receiver that receives notification when the watcher has been closed
|
||||
// This should only be called from one goroutine
|
||||
func (w *LogWatcher) WatchClose() <-chan struct{} {
|
||||
return w.closeNotifier
|
||||
}
|
||||
|
|
230
daemon/logs.go
230
daemon/logs.go
|
@ -1,23 +1,14 @@
|
|||
package daemon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
"github.com/docker/docker/pkg/jsonlog"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/docker/docker/pkg/tailfile"
|
||||
"github.com/docker/docker/pkg/timeutils"
|
||||
)
|
||||
|
||||
type ContainerLogsConfig struct {
|
||||
|
@ -29,209 +20,64 @@ type ContainerLogsConfig struct {
|
|||
Stop <-chan bool
|
||||
}
|
||||
|
||||
func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error {
|
||||
var (
|
||||
lines = -1
|
||||
format string
|
||||
)
|
||||
func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error {
|
||||
if !(config.UseStdout || config.UseStderr) {
|
||||
return fmt.Errorf("You must choose at least one stream")
|
||||
}
|
||||
if config.Timestamps {
|
||||
format = timeutils.RFC3339NanoFixed
|
||||
}
|
||||
if config.Tail == "" {
|
||||
config.Tail = "latest"
|
||||
}
|
||||
|
||||
container, err := daemon.Get(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
outStream = config.OutStream
|
||||
errStream io.Writer
|
||||
)
|
||||
outStream := config.OutStream
|
||||
errStream := outStream
|
||||
if !container.Config.Tty {
|
||||
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
|
||||
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
|
||||
} else {
|
||||
errStream = outStream
|
||||
}
|
||||
|
||||
if container.LogDriverType() != jsonfilelog.Name {
|
||||
return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver")
|
||||
}
|
||||
|
||||
maxFile := 1
|
||||
container.readHostConfig()
|
||||
cfg := container.getLogConfig()
|
||||
conf := cfg.Config
|
||||
if val, ok := conf["max-file"]; ok {
|
||||
var err error
|
||||
maxFile, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error reading max-file value: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
logDriver, err := container.getLogger()
|
||||
cLog, err := container.getLogger()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, ok := logDriver.(logger.Reader)
|
||||
logReader, ok := cLog.(logger.LogReader)
|
||||
if !ok {
|
||||
logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name())
|
||||
} else {
|
||||
// json-file driver
|
||||
if config.Tail != "all" && config.Tail != "latest" {
|
||||
var err error
|
||||
lines, err = strconv.Atoi(config.Tail)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err)
|
||||
lines = -1
|
||||
}
|
||||
}
|
||||
|
||||
if lines != 0 {
|
||||
n := maxFile
|
||||
if config.Tail == "latest" && config.Since.IsZero() {
|
||||
n = 1
|
||||
}
|
||||
before := false
|
||||
for i := n; i > 0; i-- {
|
||||
if before {
|
||||
break
|
||||
}
|
||||
cLog, err := getReader(logDriver, i, n, lines)
|
||||
if err != nil {
|
||||
logrus.Debugf("Error reading %d log file: %v", i-1, err)
|
||||
continue
|
||||
}
|
||||
//if lines are specified, then iterate only once
|
||||
if lines > 0 {
|
||||
i = 1
|
||||
} else { // if lines are not specified, cLog is a file, It needs to be closed
|
||||
defer cLog.(*os.File).Close()
|
||||
}
|
||||
dec := json.NewDecoder(cLog)
|
||||
l := &jsonlog.JSONLog{}
|
||||
for {
|
||||
l.Reset()
|
||||
if err := dec.Decode(l); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
logrus.Errorf("Error streaming logs: %s", err)
|
||||
break
|
||||
}
|
||||
logLine := l.Log
|
||||
if !config.Since.IsZero() && l.Created.Before(config.Since) {
|
||||
continue
|
||||
}
|
||||
if config.Timestamps {
|
||||
// format can be "" or time format, so here can't be error
|
||||
logLine, _ = l.Format(format)
|
||||
}
|
||||
if l.Stream == "stdout" && config.UseStdout {
|
||||
io.WriteString(outStream, logLine)
|
||||
}
|
||||
if l.Stream == "stderr" && config.UseStderr {
|
||||
io.WriteString(errStream, logLine)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return logger.ErrReadLogsNotSupported
|
||||
}
|
||||
|
||||
if config.Follow && container.IsRunning() {
|
||||
chErrStderr := make(chan error)
|
||||
chErrStdout := make(chan error)
|
||||
var stdoutPipe, stderrPipe io.ReadCloser
|
||||
follow := config.Follow && container.IsRunning()
|
||||
tailLines, err := strconv.Atoi(config.Tail)
|
||||
if err != nil {
|
||||
tailLines = -1
|
||||
}
|
||||
|
||||
// write an empty chunk of data (this is to ensure that the
|
||||
// HTTP Response is sent immediatly, even if the container has
|
||||
// not yet produced any data)
|
||||
outStream.Write(nil)
|
||||
|
||||
if config.UseStdout {
|
||||
stdoutPipe = container.StdoutLogPipe()
|
||||
go func() {
|
||||
logrus.Debug("logs: stdout stream begin")
|
||||
chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
|
||||
logrus.Debug("logs: stdout stream end")
|
||||
}()
|
||||
}
|
||||
if config.UseStderr {
|
||||
stderrPipe = container.StderrLogPipe()
|
||||
go func() {
|
||||
logrus.Debug("logs: stderr stream begin")
|
||||
chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
|
||||
logrus.Debug("logs: stderr stream end")
|
||||
}()
|
||||
}
|
||||
logrus.Debug("logs: begin stream")
|
||||
readConfig := logger.ReadConfig{
|
||||
Since: config.Since,
|
||||
Tail: tailLines,
|
||||
Follow: follow,
|
||||
}
|
||||
logs := logReader.ReadLogs(readConfig)
|
||||
|
||||
for {
|
||||
select {
|
||||
case err = <-chErrStderr:
|
||||
if stdoutPipe != nil {
|
||||
stdoutPipe.Close()
|
||||
<-chErrStdout
|
||||
}
|
||||
case err = <-chErrStdout:
|
||||
if stderrPipe != nil {
|
||||
stderrPipe.Close()
|
||||
<-chErrStderr
|
||||
}
|
||||
case <-config.Stop:
|
||||
if stdoutPipe != nil {
|
||||
stdoutPipe.Close()
|
||||
<-chErrStdout
|
||||
}
|
||||
if stderrPipe != nil {
|
||||
stderrPipe.Close()
|
||||
<-chErrStderr
|
||||
}
|
||||
case err := <-logs.Err:
|
||||
logrus.Errorf("Error streaming logs: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil && err != io.EOF && err != io.ErrClosedPipe {
|
||||
if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
|
||||
logrus.Errorf("error streaming logs: %v", err)
|
||||
case <-config.Stop:
|
||||
logs.Close()
|
||||
return nil
|
||||
case msg, ok := <-logs.Msg:
|
||||
if !ok {
|
||||
logrus.Debugf("logs: end stream")
|
||||
return nil
|
||||
}
|
||||
logLine := msg.Line
|
||||
if config.Timestamps {
|
||||
logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
|
||||
}
|
||||
if msg.Source == "stdout" && config.UseStdout {
|
||||
outStream.Write(logLine)
|
||||
}
|
||||
if msg.Source == "stderr" && config.UseStderr {
|
||||
errStream.Write(logLine)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) {
|
||||
if lines <= 0 {
|
||||
index := strconv.Itoa(fileIndex - 1)
|
||||
cLog, err := logDriver.(logger.Reader).ReadLog(index)
|
||||
return cLog, err
|
||||
}
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
remaining := lines
|
||||
for i := 0; i < maxFiles; i++ {
|
||||
index := strconv.Itoa(i)
|
||||
cLog, err := logDriver.(logger.Reader).ReadLog(index)
|
||||
if err != nil {
|
||||
return buf, err
|
||||
}
|
||||
f := cLog.(*os.File)
|
||||
ls, err := tailfile.TailFile(f, remaining)
|
||||
if err != nil {
|
||||
return buf, err
|
||||
}
|
||||
tmp := bytes.NewBuffer([]byte{})
|
||||
for _, l := range ls {
|
||||
fmt.Fprintf(tmp, "%s\n", l)
|
||||
}
|
||||
tmp.ReadFrom(buf)
|
||||
buf = tmp
|
||||
if len(ls) == remaining {
|
||||
return buf, nil
|
||||
}
|
||||
remaining = remaining - len(ls)
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
|
|
@ -12,7 +12,10 @@ import (
|
|||
"github.com/docker/docker/runconfig"
|
||||
)
|
||||
|
||||
const defaultTimeIncrement = 100
|
||||
const (
|
||||
defaultTimeIncrement = 100
|
||||
loggerCloseTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// containerMonitor monitors the execution of a container's main process.
|
||||
// If a restart policy is specified for the container the monitor will ensure that the
|
||||
|
@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
|
|||
close(exit)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(loggerCloseTimeout):
|
||||
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
|
||||
case <-exit:
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from
|
|||
the container's `STDOUT` and `STDERR`.
|
||||
|
||||
Passing a negative number or a non-integer to `--tail` is invalid and the
|
||||
value is set to `latest` in that case.
|
||||
value is set to `all` in that case.
|
||||
|
||||
The `docker logs --timestamp` commands will add an RFC3339Nano
|
||||
timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each
|
||||
|
|
|
@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
|
|||
}()
|
||||
|
||||
logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
|
||||
|
||||
stdout, err := logCmd.StdoutPipe()
|
||||
c.Assert(err, check.IsNil)
|
||||
|
||||
if err := logCmd.Start(); err != nil {
|
||||
c.Fatal(err)
|
||||
}
|
||||
c.Assert(logCmd.Start(), check.IsNil)
|
||||
|
||||
// First read slowly
|
||||
bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)
|
||||
|
|
226
pkg/ioutils/multireader.go
Normal file
226
pkg/ioutils/multireader.go
Normal file
|
@ -0,0 +1,226 @@
|
|||
package ioutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type pos struct {
|
||||
idx int
|
||||
offset int64
|
||||
}
|
||||
|
||||
type multiReadSeeker struct {
|
||||
readers []io.ReadSeeker
|
||||
pos *pos
|
||||
posIdx map[io.ReadSeeker]int
|
||||
}
|
||||
|
||||
func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
var tmpOffset int64
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
for i, rdr := range r.readers {
|
||||
// get size of the current reader
|
||||
s, err := rdr.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if offset > tmpOffset+s {
|
||||
if i == len(r.readers)-1 {
|
||||
rdrOffset := s + (offset - tmpOffset)
|
||||
if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
r.pos = &pos{i, rdrOffset}
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
tmpOffset += s
|
||||
continue
|
||||
}
|
||||
|
||||
rdrOffset := offset - tmpOffset
|
||||
idx := i
|
||||
|
||||
rdr.Seek(rdrOffset, os.SEEK_SET)
|
||||
// make sure all following readers are at 0
|
||||
for _, rdr := range r.readers[i+1:] {
|
||||
rdr.Seek(0, os.SEEK_SET)
|
||||
}
|
||||
|
||||
if rdrOffset == s && i != len(r.readers)-1 {
|
||||
idx += 1
|
||||
rdrOffset = 0
|
||||
}
|
||||
r.pos = &pos{idx, rdrOffset}
|
||||
return offset, nil
|
||||
}
|
||||
case os.SEEK_END:
|
||||
for _, rdr := range r.readers {
|
||||
s, err := rdr.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
tmpOffset += s
|
||||
}
|
||||
r.Seek(tmpOffset+offset, os.SEEK_SET)
|
||||
return tmpOffset + offset, nil
|
||||
case os.SEEK_CUR:
|
||||
if r.pos == nil {
|
||||
return r.Seek(offset, os.SEEK_SET)
|
||||
}
|
||||
// Just return the current offset
|
||||
if offset == 0 {
|
||||
return r.getCurOffset()
|
||||
}
|
||||
|
||||
curOffset, err := r.getCurOffset()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
r.pos = &pos{r.posIdx[rdr], rdrOffset}
|
||||
return curOffset + offset, nil
|
||||
default:
|
||||
return -1, fmt.Errorf("Invalid whence: %d", whence)
|
||||
}
|
||||
|
||||
return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
|
||||
}
|
||||
|
||||
func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
|
||||
var rdr io.ReadSeeker
|
||||
var rdrOffset int64
|
||||
|
||||
for i, rdr := range r.readers {
|
||||
offsetTo, err := r.getOffsetToReader(rdr)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
if offsetTo > offset {
|
||||
rdr = r.readers[i-1]
|
||||
rdrOffset = offsetTo - offset
|
||||
break
|
||||
}
|
||||
|
||||
if rdr == r.readers[len(r.readers)-1] {
|
||||
rdrOffset = offsetTo + offset
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return rdr, rdrOffset, nil
|
||||
}
|
||||
|
||||
func (r *multiReadSeeker) getCurOffset() (int64, error) {
|
||||
var totalSize int64
|
||||
for _, rdr := range r.readers[:r.pos.idx+1] {
|
||||
if r.posIdx[rdr] == r.pos.idx {
|
||||
totalSize += r.pos.offset
|
||||
break
|
||||
}
|
||||
|
||||
size, err := getReadSeekerSize(rdr)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("error getting seeker size: %v", err)
|
||||
}
|
||||
totalSize += size
|
||||
}
|
||||
return totalSize, nil
|
||||
}
|
||||
|
||||
func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
|
||||
var offset int64
|
||||
for _, r := range r.readers {
|
||||
if r == rdr {
|
||||
break
|
||||
}
|
||||
|
||||
size, err := getReadSeekerSize(rdr)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
offset += size
|
||||
}
|
||||
return offset, nil
|
||||
}
|
||||
|
||||
func (r *multiReadSeeker) Read(b []byte) (int, error) {
|
||||
if r.pos == nil {
|
||||
r.pos = &pos{0, 0}
|
||||
}
|
||||
|
||||
bCap := int64(cap(b))
|
||||
buf := bytes.NewBuffer(nil)
|
||||
var rdr io.ReadSeeker
|
||||
|
||||
for _, rdr = range r.readers[r.pos.idx:] {
|
||||
readBytes, err := io.CopyN(buf, rdr, bCap)
|
||||
if err != nil && err != io.EOF {
|
||||
return -1, err
|
||||
}
|
||||
bCap -= readBytes
|
||||
|
||||
if bCap == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
r.pos = &pos{r.posIdx[rdr], rdrPos}
|
||||
return buf.Read(b)
|
||||
}
|
||||
|
||||
func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
|
||||
// save the current position
|
||||
pos, err := rdr.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// get the size
|
||||
size, err := rdr.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// reset the position
|
||||
if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
|
||||
// input readseekers. After calling this method the initial position is set to the
|
||||
// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
|
||||
// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
|
||||
// Seek can be used over the sum of lengths of all readseekers.
|
||||
//
|
||||
// When a MultiReadSeeker is used, no Read and Seek operations should be made on
|
||||
// its ReadSeeker components. Also, users should make no assumption on the state
|
||||
// of individual readseekers while the MultiReadSeeker is used.
|
||||
func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
|
||||
if len(readers) == 1 {
|
||||
return readers[0]
|
||||
}
|
||||
idx := make(map[io.ReadSeeker]int)
|
||||
for i, rdr := range readers {
|
||||
idx[rdr] = i
|
||||
}
|
||||
return &multiReadSeeker{
|
||||
readers: readers,
|
||||
posIdx: idx,
|
||||
}
|
||||
}
|
149
pkg/ioutils/multireader_test.go
Normal file
149
pkg/ioutils/multireader_test.go
Normal file
|
@ -0,0 +1,149 @@
|
|||
package ioutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMultiReadSeekerReadAll(t *testing.T) {
|
||||
str := "hello world"
|
||||
s1 := strings.NewReader(str + " 1")
|
||||
s2 := strings.NewReader(str + " 2")
|
||||
s3 := strings.NewReader(str + " 3")
|
||||
mr := MultiReadSeeker(s1, s2, s3)
|
||||
|
||||
expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
|
||||
|
||||
b, err := ioutil.ReadAll(mr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := "hello world 1hello world 2hello world 3"
|
||||
if string(b) != expected {
|
||||
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
|
||||
}
|
||||
|
||||
size, err := mr.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if size != expectedSize {
|
||||
t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
|
||||
}
|
||||
|
||||
// Reset the position and read again
|
||||
pos, err := mr.Seek(0, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if pos != 0 {
|
||||
t.Fatalf("expected position to be set to 0, got %d", pos)
|
||||
}
|
||||
|
||||
b, err = ioutil.ReadAll(mr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if string(b) != expected {
|
||||
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiReadSeekerReadEach(t *testing.T) {
|
||||
str := "hello world"
|
||||
s1 := strings.NewReader(str + " 1")
|
||||
s2 := strings.NewReader(str + " 2")
|
||||
s3 := strings.NewReader(str + " 3")
|
||||
mr := MultiReadSeeker(s1, s2, s3)
|
||||
|
||||
var totalBytes int64
|
||||
for i, s := range []*strings.Reader{s1, s2, s3} {
|
||||
sLen := int64(s.Len())
|
||||
buf := make([]byte, s.Len())
|
||||
expected := []byte(fmt.Sprintf("%s %d", str, i+1))
|
||||
|
||||
if _, err := mr.Read(buf); err != nil && err != io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, expected) {
|
||||
t.Fatalf("expected %q to be %q", string(buf), string(expected))
|
||||
}
|
||||
|
||||
pos, err := mr.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
t.Fatalf("iteration: %d, error: %v", i+1, err)
|
||||
}
|
||||
|
||||
// check that the total bytes read is the current position of the seeker
|
||||
totalBytes += sLen
|
||||
if pos != totalBytes {
|
||||
t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
|
||||
}
|
||||
|
||||
// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
|
||||
newPos, err := mr.Seek(pos, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if newPos != pos {
|
||||
t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
|
||||
str := "hello world"
|
||||
s1 := strings.NewReader(str + " 1")
|
||||
s2 := strings.NewReader(str + " 2")
|
||||
s3 := strings.NewReader(str + " 3")
|
||||
mr := MultiReadSeeker(s1, s2, s3)
|
||||
|
||||
buf := make([]byte, s1.Len()+3)
|
||||
_, err := mr.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
|
||||
expected := "hello world 1hel"
|
||||
if string(buf) != expected {
|
||||
t.Fatalf("expected %s to be %s", string(buf), expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiReadSeekerNegativeSeek(t *testing.T) {
|
||||
str := "hello world"
|
||||
s1 := strings.NewReader(str + " 1")
|
||||
s2 := strings.NewReader(str + " 2")
|
||||
s3 := strings.NewReader(str + " 3")
|
||||
mr := MultiReadSeeker(s1, s2, s3)
|
||||
|
||||
s1Len := s1.Len()
|
||||
s2Len := s2.Len()
|
||||
s3Len := s3.Len()
|
||||
|
||||
s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if s != int64(s1Len+s2Len) {
|
||||
t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
|
||||
}
|
||||
|
||||
buf := make([]byte, s3Len)
|
||||
if _, err := mr.Read(buf); err != nil && err != io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := fmt.Sprintf("%s %d", str, 3)
|
||||
if string(buf) != fmt.Sprintf("%s %d", str, 3) {
|
||||
t.Fatalf("expected %q to be %q", string(buf), expected)
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package tailfile
|
|||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
|
@ -12,7 +13,7 @@ var eol = []byte("\n")
|
|||
var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
|
||||
|
||||
//TailFile returns last n lines of file f
|
||||
func TailFile(f *os.File, n int) ([][]byte, error) {
|
||||
func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
|
||||
if n <= 0 {
|
||||
return nil, ErrNonPositiveLinesNumber
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue