Split reader interface from logger interface
Implement new reader interface on jsonfile. Moves jsonlog decoding from daemon to jsonfile logger. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
d241d2f36c
commit
c0391bf554
13 changed files with 726 additions and 266 deletions
|
@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
|
||||||
follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
|
follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
|
||||||
since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
|
since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
|
||||||
times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
|
times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
|
||||||
tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
|
tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
|
||||||
cmd.Require(flag.Exact, 1)
|
cmd.Require(flag.Exact, 1)
|
||||||
|
|
||||||
cmd.ParseFlags(args, true)
|
cmd.ParseFlags(args, true)
|
||||||
|
|
|
@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
|
||||||
closeNotifier = notifier.CloseNotify()
|
closeNotifier = notifier.CloseNotify()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c, err := s.daemon.Get(vars["name"])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
outStream := ioutils.NewWriteFlusher(w)
|
||||||
|
// write an empty chunk of data (this is to ensure that the
|
||||||
|
// HTTP Response is sent immediatly, even if the container has
|
||||||
|
// not yet produced any data)
|
||||||
|
outStream.Write(nil)
|
||||||
|
|
||||||
logsConfig := &daemon.ContainerLogsConfig{
|
logsConfig := &daemon.ContainerLogsConfig{
|
||||||
Follow: boolValue(r, "follow"),
|
Follow: boolValue(r, "follow"),
|
||||||
Timestamps: boolValue(r, "timestamps"),
|
Timestamps: boolValue(r, "timestamps"),
|
||||||
|
@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
|
||||||
Tail: r.Form.Get("tail"),
|
Tail: r.Form.Get("tail"),
|
||||||
UseStdout: stdout,
|
UseStdout: stdout,
|
||||||
UseStderr: stderr,
|
UseStderr: stderr,
|
||||||
OutStream: ioutils.NewWriteFlusher(w),
|
OutStream: outStream,
|
||||||
Stop: closeNotifier,
|
Stop: closeNotifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
|
if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
|
||||||
fmt.Fprintf(w, "Error running logs job: %s\n", err)
|
fmt.Fprintf(w, "Error running logs job: %s\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"github.com/docker/docker/pkg/broadcastwriter"
|
"github.com/docker/docker/pkg/broadcastwriter"
|
||||||
"github.com/docker/docker/pkg/fileutils"
|
"github.com/docker/docker/pkg/fileutils"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
"github.com/docker/docker/pkg/jsonlog"
|
|
||||||
"github.com/docker/docker/pkg/mount"
|
"github.com/docker/docker/pkg/mount"
|
||||||
"github.com/docker/docker/pkg/nat"
|
"github.com/docker/docker/pkg/nat"
|
||||||
"github.com/docker/docker/pkg/promise"
|
"github.com/docker/docker/pkg/promise"
|
||||||
|
@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (container *Container) getLogger() (logger.Logger, error) {
|
func (container *Container) getLogger() (logger.Logger, error) {
|
||||||
|
if container.logDriver != nil && container.IsRunning() {
|
||||||
|
return container.logDriver, nil
|
||||||
|
}
|
||||||
cfg := container.getLogConfig()
|
cfg := container.getLogConfig()
|
||||||
if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
|
if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
|
func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
|
||||||
|
|
||||||
if logs {
|
if logs {
|
||||||
logDriver, err := c.getLogger()
|
logDriver, err := c.getLogger()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("Error obtaining the logger %v", err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, ok := logDriver.(logger.Reader); !ok {
|
cLog, ok := logDriver.(logger.LogReader)
|
||||||
logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
|
if !ok {
|
||||||
} else {
|
return logger.ErrReadLogsNotSupported
|
||||||
if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
|
}
|
||||||
logrus.Errorf("Error reading logs %v", err)
|
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
|
||||||
} else {
|
|
||||||
dec := json.NewDecoder(cLog)
|
|
||||||
for {
|
|
||||||
l := &jsonlog.JSONLog{}
|
|
||||||
|
|
||||||
if err := dec.Decode(l); err == io.EOF {
|
LogLoop:
|
||||||
break
|
for {
|
||||||
} else if err != nil {
|
select {
|
||||||
logrus.Errorf("Error streaming logs: %s", err)
|
case msg, ok := <-logs.Msg:
|
||||||
break
|
if !ok {
|
||||||
}
|
break LogLoop
|
||||||
if l.Stream == "stdout" && stdout != nil {
|
|
||||||
io.WriteString(stdout, l.Log)
|
|
||||||
}
|
|
||||||
if l.Stream == "stderr" && stderr != nil {
|
|
||||||
io.WriteString(stderr, l.Log)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if msg.Source == "stdout" && stdout != nil {
|
||||||
|
stdout.Write(msg.Line)
|
||||||
|
}
|
||||||
|
if msg.Source == "stderr" && stderr != nil {
|
||||||
|
stderr.Write(msg.Line)
|
||||||
|
}
|
||||||
|
case err := <-logs.Err:
|
||||||
|
logrus.Errorf("Error streaming logs: %v", err)
|
||||||
|
break LogLoop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ type Context struct {
|
||||||
LogPath string
|
LogPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hostname returns the hostname from the underlying OS
|
||||||
func (ctx *Context) Hostname() (string, error) {
|
func (ctx *Context) Hostname() (string, error) {
|
||||||
hostname, err := os.Hostname()
|
hostname, err := os.Hostname()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
|
||||||
return hostname, nil
|
return hostname, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Command returns the command that the container being logged was started with
|
||||||
func (ctx *Context) Command() string {
|
func (ctx *Context) Command() string {
|
||||||
terms := []string{ctx.ContainerEntrypoint}
|
terms := []string{ctx.ContainerEntrypoint}
|
||||||
for _, arg := range ctx.ContainerArgs {
|
for _, arg := range ctx.ContainerArgs {
|
||||||
|
|
|
@ -2,32 +2,42 @@ package jsonfilelog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/fsnotify.v1"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/daemon/logger"
|
"github.com/docker/docker/daemon/logger"
|
||||||
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
"github.com/docker/docker/pkg/jsonlog"
|
"github.com/docker/docker/pkg/jsonlog"
|
||||||
|
"github.com/docker/docker/pkg/pubsub"
|
||||||
|
"github.com/docker/docker/pkg/tailfile"
|
||||||
"github.com/docker/docker/pkg/timeutils"
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
"github.com/docker/docker/pkg/units"
|
"github.com/docker/docker/pkg/units"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Name = "json-file"
|
Name = "json-file"
|
||||||
|
maxJSONDecodeRetry = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// JSONFileLogger is Logger implementation for default docker logging:
|
// JSONFileLogger is Logger implementation for default docker logging:
|
||||||
// JSON objects to file
|
// JSON objects to file
|
||||||
type JSONFileLogger struct {
|
type JSONFileLogger struct {
|
||||||
buf *bytes.Buffer
|
buf *bytes.Buffer
|
||||||
f *os.File // store for closing
|
f *os.File // store for closing
|
||||||
mu sync.Mutex // protects buffer
|
mu sync.Mutex // protects buffer
|
||||||
capacity int64 //maximum size of each file
|
capacity int64 //maximum size of each file
|
||||||
n int //maximum number of files
|
n int //maximum number of files
|
||||||
ctx logger.Context
|
ctx logger.Context
|
||||||
|
readers map[*logger.LogWatcher]struct{} // stores the active log followers
|
||||||
|
notifyRotate *pubsub.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &JSONFileLogger{
|
return &JSONFileLogger{
|
||||||
f: log,
|
f: log,
|
||||||
buf: bytes.NewBuffer(nil),
|
buf: bytes.NewBuffer(nil),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
capacity: capval,
|
capacity: capval,
|
||||||
n: maxFiles,
|
n: maxFiles,
|
||||||
|
readers: make(map[*logger.LogWatcher]struct{}),
|
||||||
|
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
l.f = file
|
l.f = file
|
||||||
|
l.notifyRotate.Publish(struct{}{})
|
||||||
}
|
}
|
||||||
return writeToBuf(l)
|
return writeToBuf(l)
|
||||||
}
|
}
|
||||||
|
@ -148,11 +161,11 @@ func backup(old, curr string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, err := os.Stat(curr); os.IsNotExist(err) {
|
if _, err := os.Stat(curr); os.IsNotExist(err) {
|
||||||
if f, err := os.Create(curr); err != nil {
|
f, err := os.Create(curr)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
|
||||||
f.Close()
|
|
||||||
}
|
}
|
||||||
|
f.Close()
|
||||||
}
|
}
|
||||||
return os.Rename(curr, old)
|
return os.Rename(curr, old)
|
||||||
}
|
}
|
||||||
|
@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
|
|
||||||
pth := l.ctx.LogPath
|
|
||||||
if len(args) > 0 {
|
|
||||||
//check if args[0] is an integer index
|
|
||||||
index, err := strconv.ParseInt(args[0], 0, 0)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if index > 0 {
|
|
||||||
pth = pth + "." + args[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return os.Open(pth)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *JSONFileLogger) LogPath() string {
|
func (l *JSONFileLogger) LogPath() string {
|
||||||
return l.ctx.LogPath
|
return l.ctx.LogPath
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes underlying file
|
// Close closes underlying file and signals all readers to stop
|
||||||
func (l *JSONFileLogger) Close() error {
|
func (l *JSONFileLogger) Close() error {
|
||||||
return l.f.Close()
|
l.mu.Lock()
|
||||||
|
err := l.f.Close()
|
||||||
|
for r := range l.readers {
|
||||||
|
r.Close()
|
||||||
|
delete(l.readers, r)
|
||||||
|
}
|
||||||
|
l.mu.Unlock()
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns name of this logger
|
// Name returns name of this logger
|
||||||
func (l *JSONFileLogger) Name() string {
|
func (l *JSONFileLogger) Name() string {
|
||||||
return Name
|
return Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
||||||
|
l.Reset()
|
||||||
|
if err := dec.Decode(l); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
msg := &logger.Message{
|
||||||
|
Source: l.Stream,
|
||||||
|
Timestamp: l.Created,
|
||||||
|
Line: []byte(l.Log),
|
||||||
|
}
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads from the log file
|
||||||
|
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||||
|
logWatcher := logger.NewLogWatcher()
|
||||||
|
|
||||||
|
go l.readLogs(logWatcher, config)
|
||||||
|
return logWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||||
|
defer close(logWatcher.Msg)
|
||||||
|
|
||||||
|
pth := l.ctx.LogPath
|
||||||
|
var files []io.ReadSeeker
|
||||||
|
for i := l.n; i > 1; i-- {
|
||||||
|
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
files = append(files, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
latestFile, err := os.Open(pth)
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer latestFile.Close()
|
||||||
|
|
||||||
|
files = append(files, latestFile)
|
||||||
|
tailer := ioutils.MultiReadSeeker(files...)
|
||||||
|
|
||||||
|
if config.Tail != 0 {
|
||||||
|
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !config.Follow {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if config.Tail == 0 {
|
||||||
|
latestFile.Seek(0, os.SEEK_END)
|
||||||
|
}
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
|
l.readers[logWatcher] = struct{}{}
|
||||||
|
l.mu.Unlock()
|
||||||
|
|
||||||
|
notifyRotate := l.notifyRotate.Subscribe()
|
||||||
|
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
|
delete(l.readers, logWatcher)
|
||||||
|
l.mu.Unlock()
|
||||||
|
|
||||||
|
l.notifyRotate.Evict(notifyRotate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
||||||
|
var rdr io.Reader = f
|
||||||
|
if tail > 0 {
|
||||||
|
ls, err := tailfile.TailFile(f, tail)
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
||||||
|
}
|
||||||
|
dec := json.NewDecoder(rdr)
|
||||||
|
l := &jsonlog.JSONLog{}
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
||||||
|
dec := json.NewDecoder(f)
|
||||||
|
l := &jsonlog.JSONLog{}
|
||||||
|
fileWatcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer fileWatcher.Close()
|
||||||
|
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var retries int
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
// try again because this shouldn't happen
|
||||||
|
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
retries += 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-fileWatcher.Events:
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
continue
|
||||||
|
case <-fileWatcher.Errors:
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
case <-logWatcher.WatchClose():
|
||||||
|
return
|
||||||
|
case <-notifyRotate:
|
||||||
|
fileWatcher.Remove(f.Name())
|
||||||
|
|
||||||
|
f, err = os.Open(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
}
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retries = 0 // reset retries since we've succeeded
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case logWatcher.Msg <- msg:
|
||||||
|
case <-logWatcher.WatchClose():
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,11 +2,19 @@ package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ReadLogsNotSupported = errors.New("configured logging reader does not support reading")
|
// ErrReadLogsNotSupported is returned when the logger does not support reading logs
|
||||||
|
var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading")
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TimeFormat is the time format used for timestamps sent to log readers
|
||||||
|
TimeFormat = timeutils.RFC3339NanoFixed
|
||||||
|
logWatcherBufferSize = 4096
|
||||||
|
)
|
||||||
|
|
||||||
// Message is datastructure that represents record from some container
|
// Message is datastructure that represents record from some container
|
||||||
type Message struct {
|
type Message struct {
|
||||||
|
@ -16,14 +24,51 @@ type Message struct {
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logger is interface for docker logging drivers
|
// Logger is the interface for docker logging drivers
|
||||||
type Logger interface {
|
type Logger interface {
|
||||||
Log(*Message) error
|
Log(*Message) error
|
||||||
Name() string
|
Name() string
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
//Reader is an interface for docker logging drivers that support reading
|
// ReadConfig is the configuration passed into ReadLogs
|
||||||
type Reader interface {
|
type ReadConfig struct {
|
||||||
ReadLog(args ...string) (io.Reader, error)
|
Since time.Time
|
||||||
|
Tail int
|
||||||
|
Follow bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogReader is the interface for reading log messages for loggers that support reading
|
||||||
|
type LogReader interface {
|
||||||
|
// Read logs from underlying logging backend
|
||||||
|
ReadLogs(ReadConfig) *LogWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogWatcher is used when consuming logs read from the LogReader interface
|
||||||
|
type LogWatcher struct {
|
||||||
|
// For sending log messages to a reader
|
||||||
|
Msg chan *Message
|
||||||
|
// For sending error messages that occur while while reading logs
|
||||||
|
Err chan error
|
||||||
|
closeNotifier chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLogWatcher returns a new LogWatcher.
|
||||||
|
func NewLogWatcher() *LogWatcher {
|
||||||
|
return &LogWatcher{
|
||||||
|
Msg: make(chan *Message, logWatcherBufferSize),
|
||||||
|
Err: make(chan error, 1),
|
||||||
|
closeNotifier: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close notifies the underlying log reader to stop
|
||||||
|
func (w *LogWatcher) Close() {
|
||||||
|
close(w.closeNotifier)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WatchClose returns a channel receiver that receives notification when the watcher has been closed
|
||||||
|
// This should only be called from one goroutine
|
||||||
|
func (w *LogWatcher) WatchClose() <-chan struct{} {
|
||||||
|
return w.closeNotifier
|
||||||
}
|
}
|
||||||
|
|
230
daemon/logs.go
230
daemon/logs.go
|
@ -1,23 +1,14 @@
|
||||||
package daemon
|
package daemon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/daemon/logger"
|
"github.com/docker/docker/daemon/logger"
|
||||||
"github.com/docker/docker/daemon/logger/jsonfilelog"
|
|
||||||
"github.com/docker/docker/pkg/jsonlog"
|
|
||||||
"github.com/docker/docker/pkg/stdcopy"
|
"github.com/docker/docker/pkg/stdcopy"
|
||||||
"github.com/docker/docker/pkg/tailfile"
|
|
||||||
"github.com/docker/docker/pkg/timeutils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContainerLogsConfig struct {
|
type ContainerLogsConfig struct {
|
||||||
|
@ -29,209 +20,64 @@ type ContainerLogsConfig struct {
|
||||||
Stop <-chan bool
|
Stop <-chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error {
|
func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error {
|
||||||
var (
|
|
||||||
lines = -1
|
|
||||||
format string
|
|
||||||
)
|
|
||||||
if !(config.UseStdout || config.UseStderr) {
|
if !(config.UseStdout || config.UseStderr) {
|
||||||
return fmt.Errorf("You must choose at least one stream")
|
return fmt.Errorf("You must choose at least one stream")
|
||||||
}
|
}
|
||||||
if config.Timestamps {
|
|
||||||
format = timeutils.RFC3339NanoFixed
|
|
||||||
}
|
|
||||||
if config.Tail == "" {
|
|
||||||
config.Tail = "latest"
|
|
||||||
}
|
|
||||||
|
|
||||||
container, err := daemon.Get(name)
|
outStream := config.OutStream
|
||||||
if err != nil {
|
errStream := outStream
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
outStream = config.OutStream
|
|
||||||
errStream io.Writer
|
|
||||||
)
|
|
||||||
if !container.Config.Tty {
|
if !container.Config.Tty {
|
||||||
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
|
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
|
||||||
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
|
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
|
||||||
} else {
|
|
||||||
errStream = outStream
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if container.LogDriverType() != jsonfilelog.Name {
|
cLog, err := container.getLogger()
|
||||||
return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver")
|
|
||||||
}
|
|
||||||
|
|
||||||
maxFile := 1
|
|
||||||
container.readHostConfig()
|
|
||||||
cfg := container.getLogConfig()
|
|
||||||
conf := cfg.Config
|
|
||||||
if val, ok := conf["max-file"]; ok {
|
|
||||||
var err error
|
|
||||||
maxFile, err = strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error reading max-file value: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logDriver, err := container.getLogger()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, ok := logDriver.(logger.Reader)
|
logReader, ok := cLog.(logger.LogReader)
|
||||||
if !ok {
|
if !ok {
|
||||||
logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name())
|
return logger.ErrReadLogsNotSupported
|
||||||
} else {
|
|
||||||
// json-file driver
|
|
||||||
if config.Tail != "all" && config.Tail != "latest" {
|
|
||||||
var err error
|
|
||||||
lines, err = strconv.Atoi(config.Tail)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err)
|
|
||||||
lines = -1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if lines != 0 {
|
|
||||||
n := maxFile
|
|
||||||
if config.Tail == "latest" && config.Since.IsZero() {
|
|
||||||
n = 1
|
|
||||||
}
|
|
||||||
before := false
|
|
||||||
for i := n; i > 0; i-- {
|
|
||||||
if before {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cLog, err := getReader(logDriver, i, n, lines)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Debugf("Error reading %d log file: %v", i-1, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
//if lines are specified, then iterate only once
|
|
||||||
if lines > 0 {
|
|
||||||
i = 1
|
|
||||||
} else { // if lines are not specified, cLog is a file, It needs to be closed
|
|
||||||
defer cLog.(*os.File).Close()
|
|
||||||
}
|
|
||||||
dec := json.NewDecoder(cLog)
|
|
||||||
l := &jsonlog.JSONLog{}
|
|
||||||
for {
|
|
||||||
l.Reset()
|
|
||||||
if err := dec.Decode(l); err == io.EOF {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
logrus.Errorf("Error streaming logs: %s", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
logLine := l.Log
|
|
||||||
if !config.Since.IsZero() && l.Created.Before(config.Since) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if config.Timestamps {
|
|
||||||
// format can be "" or time format, so here can't be error
|
|
||||||
logLine, _ = l.Format(format)
|
|
||||||
}
|
|
||||||
if l.Stream == "stdout" && config.UseStdout {
|
|
||||||
io.WriteString(outStream, logLine)
|
|
||||||
}
|
|
||||||
if l.Stream == "stderr" && config.UseStderr {
|
|
||||||
io.WriteString(errStream, logLine)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Follow && container.IsRunning() {
|
follow := config.Follow && container.IsRunning()
|
||||||
chErrStderr := make(chan error)
|
tailLines, err := strconv.Atoi(config.Tail)
|
||||||
chErrStdout := make(chan error)
|
if err != nil {
|
||||||
var stdoutPipe, stderrPipe io.ReadCloser
|
tailLines = -1
|
||||||
|
}
|
||||||
|
|
||||||
// write an empty chunk of data (this is to ensure that the
|
logrus.Debug("logs: begin stream")
|
||||||
// HTTP Response is sent immediatly, even if the container has
|
readConfig := logger.ReadConfig{
|
||||||
// not yet produced any data)
|
Since: config.Since,
|
||||||
outStream.Write(nil)
|
Tail: tailLines,
|
||||||
|
Follow: follow,
|
||||||
if config.UseStdout {
|
}
|
||||||
stdoutPipe = container.StdoutLogPipe()
|
logs := logReader.ReadLogs(readConfig)
|
||||||
go func() {
|
|
||||||
logrus.Debug("logs: stdout stream begin")
|
|
||||||
chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
|
|
||||||
logrus.Debug("logs: stdout stream end")
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
if config.UseStderr {
|
|
||||||
stderrPipe = container.StderrLogPipe()
|
|
||||||
go func() {
|
|
||||||
logrus.Debug("logs: stderr stream begin")
|
|
||||||
chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
|
|
||||||
logrus.Debug("logs: stderr stream end")
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for {
|
||||||
select {
|
select {
|
||||||
case err = <-chErrStderr:
|
case err := <-logs.Err:
|
||||||
if stdoutPipe != nil {
|
logrus.Errorf("Error streaming logs: %v", err)
|
||||||
stdoutPipe.Close()
|
|
||||||
<-chErrStdout
|
|
||||||
}
|
|
||||||
case err = <-chErrStdout:
|
|
||||||
if stderrPipe != nil {
|
|
||||||
stderrPipe.Close()
|
|
||||||
<-chErrStderr
|
|
||||||
}
|
|
||||||
case <-config.Stop:
|
|
||||||
if stdoutPipe != nil {
|
|
||||||
stdoutPipe.Close()
|
|
||||||
<-chErrStdout
|
|
||||||
}
|
|
||||||
if stderrPipe != nil {
|
|
||||||
stderrPipe.Close()
|
|
||||||
<-chErrStderr
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
case <-config.Stop:
|
||||||
|
logs.Close()
|
||||||
if err != nil && err != io.EOF && err != io.ErrClosedPipe {
|
return nil
|
||||||
if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
|
case msg, ok := <-logs.Msg:
|
||||||
logrus.Errorf("error streaming logs: %v", err)
|
if !ok {
|
||||||
|
logrus.Debugf("logs: end stream")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logLine := msg.Line
|
||||||
|
if config.Timestamps {
|
||||||
|
logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
|
||||||
|
}
|
||||||
|
if msg.Source == "stdout" && config.UseStdout {
|
||||||
|
outStream.Write(logLine)
|
||||||
|
}
|
||||||
|
if msg.Source == "stderr" && config.UseStderr {
|
||||||
|
errStream.Write(logLine)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) {
|
|
||||||
if lines <= 0 {
|
|
||||||
index := strconv.Itoa(fileIndex - 1)
|
|
||||||
cLog, err := logDriver.(logger.Reader).ReadLog(index)
|
|
||||||
return cLog, err
|
|
||||||
}
|
|
||||||
buf := bytes.NewBuffer([]byte{})
|
|
||||||
remaining := lines
|
|
||||||
for i := 0; i < maxFiles; i++ {
|
|
||||||
index := strconv.Itoa(i)
|
|
||||||
cLog, err := logDriver.(logger.Reader).ReadLog(index)
|
|
||||||
if err != nil {
|
|
||||||
return buf, err
|
|
||||||
}
|
|
||||||
f := cLog.(*os.File)
|
|
||||||
ls, err := tailfile.TailFile(f, remaining)
|
|
||||||
if err != nil {
|
|
||||||
return buf, err
|
|
||||||
}
|
|
||||||
tmp := bytes.NewBuffer([]byte{})
|
|
||||||
for _, l := range ls {
|
|
||||||
fmt.Fprintf(tmp, "%s\n", l)
|
|
||||||
}
|
|
||||||
tmp.ReadFrom(buf)
|
|
||||||
buf = tmp
|
|
||||||
if len(ls) == remaining {
|
|
||||||
return buf, nil
|
|
||||||
}
|
|
||||||
remaining = remaining - len(ls)
|
|
||||||
}
|
|
||||||
return buf, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,10 @@ import (
|
||||||
"github.com/docker/docker/runconfig"
|
"github.com/docker/docker/runconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultTimeIncrement = 100
|
const (
|
||||||
|
defaultTimeIncrement = 100
|
||||||
|
loggerCloseTimeout = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// containerMonitor monitors the execution of a container's main process.
|
// containerMonitor monitors the execution of a container's main process.
|
||||||
// If a restart policy is specified for the container the monitor will ensure that the
|
// If a restart policy is specified for the container the monitor will ensure that the
|
||||||
|
@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
|
||||||
close(exit)
|
close(exit)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(loggerCloseTimeout):
|
||||||
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
|
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
|
||||||
case <-exit:
|
case <-exit:
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from
|
||||||
the container's `STDOUT` and `STDERR`.
|
the container's `STDOUT` and `STDERR`.
|
||||||
|
|
||||||
Passing a negative number or a non-integer to `--tail` is invalid and the
|
Passing a negative number or a non-integer to `--tail` is invalid and the
|
||||||
value is set to `latest` in that case.
|
value is set to `all` in that case.
|
||||||
|
|
||||||
The `docker logs --timestamp` commands will add an RFC3339Nano
|
The `docker logs --timestamp` commands will add an RFC3339Nano
|
||||||
timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each
|
timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each
|
||||||
|
|
|
@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
|
logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
|
||||||
|
|
||||||
stdout, err := logCmd.StdoutPipe()
|
stdout, err := logCmd.StdoutPipe()
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
|
c.Assert(logCmd.Start(), check.IsNil)
|
||||||
if err := logCmd.Start(); err != nil {
|
|
||||||
c.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// First read slowly
|
// First read slowly
|
||||||
bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)
|
bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)
|
||||||
|
|
226
pkg/ioutils/multireader.go
Normal file
226
pkg/ioutils/multireader.go
Normal file
|
@ -0,0 +1,226 @@
|
||||||
|
package ioutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type pos struct {
|
||||||
|
idx int
|
||||||
|
offset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiReadSeeker struct {
|
||||||
|
readers []io.ReadSeeker
|
||||||
|
pos *pos
|
||||||
|
posIdx map[io.ReadSeeker]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
var tmpOffset int64
|
||||||
|
switch whence {
|
||||||
|
case os.SEEK_SET:
|
||||||
|
for i, rdr := range r.readers {
|
||||||
|
// get size of the current reader
|
||||||
|
s, err := rdr.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > tmpOffset+s {
|
||||||
|
if i == len(r.readers)-1 {
|
||||||
|
rdrOffset := s + (offset - tmpOffset)
|
||||||
|
if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
r.pos = &pos{i, rdrOffset}
|
||||||
|
return offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpOffset += s
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rdrOffset := offset - tmpOffset
|
||||||
|
idx := i
|
||||||
|
|
||||||
|
rdr.Seek(rdrOffset, os.SEEK_SET)
|
||||||
|
// make sure all following readers are at 0
|
||||||
|
for _, rdr := range r.readers[i+1:] {
|
||||||
|
rdr.Seek(0, os.SEEK_SET)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rdrOffset == s && i != len(r.readers)-1 {
|
||||||
|
idx += 1
|
||||||
|
rdrOffset = 0
|
||||||
|
}
|
||||||
|
r.pos = &pos{idx, rdrOffset}
|
||||||
|
return offset, nil
|
||||||
|
}
|
||||||
|
case os.SEEK_END:
|
||||||
|
for _, rdr := range r.readers {
|
||||||
|
s, err := rdr.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
tmpOffset += s
|
||||||
|
}
|
||||||
|
r.Seek(tmpOffset+offset, os.SEEK_SET)
|
||||||
|
return tmpOffset + offset, nil
|
||||||
|
case os.SEEK_CUR:
|
||||||
|
if r.pos == nil {
|
||||||
|
return r.Seek(offset, os.SEEK_SET)
|
||||||
|
}
|
||||||
|
// Just return the current offset
|
||||||
|
if offset == 0 {
|
||||||
|
return r.getCurOffset()
|
||||||
|
}
|
||||||
|
|
||||||
|
curOffset, err := r.getCurOffset()
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.pos = &pos{r.posIdx[rdr], rdrOffset}
|
||||||
|
return curOffset + offset, nil
|
||||||
|
default:
|
||||||
|
return -1, fmt.Errorf("Invalid whence: %d", whence)
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
|
||||||
|
var rdr io.ReadSeeker
|
||||||
|
var rdrOffset int64
|
||||||
|
|
||||||
|
for i, rdr := range r.readers {
|
||||||
|
offsetTo, err := r.getOffsetToReader(rdr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, -1, err
|
||||||
|
}
|
||||||
|
if offsetTo > offset {
|
||||||
|
rdr = r.readers[i-1]
|
||||||
|
rdrOffset = offsetTo - offset
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if rdr == r.readers[len(r.readers)-1] {
|
||||||
|
rdrOffset = offsetTo + offset
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rdr, rdrOffset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *multiReadSeeker) getCurOffset() (int64, error) {
|
||||||
|
var totalSize int64
|
||||||
|
for _, rdr := range r.readers[:r.pos.idx+1] {
|
||||||
|
if r.posIdx[rdr] == r.pos.idx {
|
||||||
|
totalSize += r.pos.offset
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := getReadSeekerSize(rdr)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("error getting seeker size: %v", err)
|
||||||
|
}
|
||||||
|
totalSize += size
|
||||||
|
}
|
||||||
|
return totalSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
|
||||||
|
var offset int64
|
||||||
|
for _, r := range r.readers {
|
||||||
|
if r == rdr {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := getReadSeekerSize(rdr)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
offset += size
|
||||||
|
}
|
||||||
|
return offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *multiReadSeeker) Read(b []byte) (int, error) {
|
||||||
|
if r.pos == nil {
|
||||||
|
r.pos = &pos{0, 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
bCap := int64(cap(b))
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
var rdr io.ReadSeeker
|
||||||
|
|
||||||
|
for _, rdr = range r.readers[r.pos.idx:] {
|
||||||
|
readBytes, err := io.CopyN(buf, rdr, bCap)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
bCap -= readBytes
|
||||||
|
|
||||||
|
if bCap == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
r.pos = &pos{r.posIdx[rdr], rdrPos}
|
||||||
|
return buf.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
|
||||||
|
// save the current position
|
||||||
|
pos, err := rdr.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the size
|
||||||
|
size, err := rdr.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset the position
|
||||||
|
if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
return size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
|
||||||
|
// input readseekers. After calling this method the initial position is set to the
|
||||||
|
// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
|
||||||
|
// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
|
||||||
|
// Seek can be used over the sum of lengths of all readseekers.
|
||||||
|
//
|
||||||
|
// When a MultiReadSeeker is used, no Read and Seek operations should be made on
|
||||||
|
// its ReadSeeker components. Also, users should make no assumption on the state
|
||||||
|
// of individual readseekers while the MultiReadSeeker is used.
|
||||||
|
func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
|
||||||
|
if len(readers) == 1 {
|
||||||
|
return readers[0]
|
||||||
|
}
|
||||||
|
idx := make(map[io.ReadSeeker]int)
|
||||||
|
for i, rdr := range readers {
|
||||||
|
idx[rdr] = i
|
||||||
|
}
|
||||||
|
return &multiReadSeeker{
|
||||||
|
readers: readers,
|
||||||
|
posIdx: idx,
|
||||||
|
}
|
||||||
|
}
|
149
pkg/ioutils/multireader_test.go
Normal file
149
pkg/ioutils/multireader_test.go
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
package ioutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMultiReadSeekerReadAll(t *testing.T) {
|
||||||
|
str := "hello world"
|
||||||
|
s1 := strings.NewReader(str + " 1")
|
||||||
|
s2 := strings.NewReader(str + " 2")
|
||||||
|
s3 := strings.NewReader(str + " 3")
|
||||||
|
mr := MultiReadSeeker(s1, s2, s3)
|
||||||
|
|
||||||
|
expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
|
||||||
|
|
||||||
|
b, err := ioutil.ReadAll(mr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := "hello world 1hello world 2hello world 3"
|
||||||
|
if string(b) != expected {
|
||||||
|
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := mr.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if size != expectedSize {
|
||||||
|
t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the position and read again
|
||||||
|
pos, err := mr.Seek(0, os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if pos != 0 {
|
||||||
|
t.Fatalf("expected position to be set to 0, got %d", pos)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err = ioutil.ReadAll(mr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(b) != expected {
|
||||||
|
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReadSeekerReadEach(t *testing.T) {
|
||||||
|
str := "hello world"
|
||||||
|
s1 := strings.NewReader(str + " 1")
|
||||||
|
s2 := strings.NewReader(str + " 2")
|
||||||
|
s3 := strings.NewReader(str + " 3")
|
||||||
|
mr := MultiReadSeeker(s1, s2, s3)
|
||||||
|
|
||||||
|
var totalBytes int64
|
||||||
|
for i, s := range []*strings.Reader{s1, s2, s3} {
|
||||||
|
sLen := int64(s.Len())
|
||||||
|
buf := make([]byte, s.Len())
|
||||||
|
expected := []byte(fmt.Sprintf("%s %d", str, i+1))
|
||||||
|
|
||||||
|
if _, err := mr.Read(buf); err != nil && err != io.EOF {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(buf, expected) {
|
||||||
|
t.Fatalf("expected %q to be %q", string(buf), string(expected))
|
||||||
|
}
|
||||||
|
|
||||||
|
pos, err := mr.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("iteration: %d, error: %v", i+1, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that the total bytes read is the current position of the seeker
|
||||||
|
totalBytes += sLen
|
||||||
|
if pos != totalBytes {
|
||||||
|
t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
|
||||||
|
newPos, err := mr.Seek(pos, os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if newPos != pos {
|
||||||
|
t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
|
||||||
|
str := "hello world"
|
||||||
|
s1 := strings.NewReader(str + " 1")
|
||||||
|
s2 := strings.NewReader(str + " 2")
|
||||||
|
s3 := strings.NewReader(str + " 3")
|
||||||
|
mr := MultiReadSeeker(s1, s2, s3)
|
||||||
|
|
||||||
|
buf := make([]byte, s1.Len()+3)
|
||||||
|
_, err := mr.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
|
||||||
|
expected := "hello world 1hel"
|
||||||
|
if string(buf) != expected {
|
||||||
|
t.Fatalf("expected %s to be %s", string(buf), expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReadSeekerNegativeSeek(t *testing.T) {
|
||||||
|
str := "hello world"
|
||||||
|
s1 := strings.NewReader(str + " 1")
|
||||||
|
s2 := strings.NewReader(str + " 2")
|
||||||
|
s3 := strings.NewReader(str + " 3")
|
||||||
|
mr := MultiReadSeeker(s1, s2, s3)
|
||||||
|
|
||||||
|
s1Len := s1.Len()
|
||||||
|
s2Len := s2.Len()
|
||||||
|
s3Len := s3.Len()
|
||||||
|
|
||||||
|
s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if s != int64(s1Len+s2Len) {
|
||||||
|
t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, s3Len)
|
||||||
|
if _, err := mr.Read(buf); err != nil && err != io.EOF {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
expected := fmt.Sprintf("%s %d", str, 3)
|
||||||
|
if string(buf) != fmt.Sprintf("%s %d", str, 3) {
|
||||||
|
t.Fatalf("expected %q to be %q", string(buf), expected)
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package tailfile
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ var eol = []byte("\n")
|
||||||
var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
|
var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
|
||||||
|
|
||||||
//TailFile returns last n lines of file f
|
//TailFile returns last n lines of file f
|
||||||
func TailFile(f *os.File, n int) ([][]byte, error) {
|
func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
|
||||||
if n <= 0 {
|
if n <= 0 {
|
||||||
return nil, ErrNonPositiveLinesNumber
|
return nil, ErrNonPositiveLinesNumber
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue