Split reader interface from logger interface

Implement new reader interface on jsonfile.
Moves jsonlog decoding from daemon to jsonfile logger.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2015-07-03 09:50:06 -04:00
parent d241d2f36c
commit c0391bf554
13 changed files with 726 additions and 266 deletions

View file

@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
cmd.Require(flag.Exact, 1)
cmd.ParseFlags(args, true)

View file

@ -629,6 +629,17 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
closeNotifier = notifier.CloseNotify()
}
c, err := s.daemon.Get(vars["name"])
if err != nil {
return err
}
outStream := ioutils.NewWriteFlusher(w)
// write an empty chunk of data (this is to ensure that the
// HTTP Response is sent immediatly, even if the container has
// not yet produced any data)
outStream.Write(nil)
logsConfig := &daemon.ContainerLogsConfig{
Follow: boolValue(r, "follow"),
Timestamps: boolValue(r, "timestamps"),
@ -636,11 +647,11 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
Tail: r.Form.Get("tail"),
UseStdout: stdout,
UseStderr: stderr,
OutStream: ioutils.NewWriteFlusher(w),
OutStream: outStream,
Stop: closeNotifier,
}
if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
fmt.Fprintf(w, "Error running logs job: %s\n", err)
}

View file

@ -25,7 +25,6 @@ import (
"github.com/docker/docker/pkg/broadcastwriter"
"github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/mount"
"github.com/docker/docker/pkg/nat"
"github.com/docker/docker/pkg/promise"
@ -721,6 +720,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
}
func (container *Container) getLogger() (logger.Logger, error) {
if container.logDriver != nil && container.IsRunning() {
return container.logDriver, nil
}
cfg := container.getLogConfig()
if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
return nil, err
@ -894,36 +896,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
}
func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
if logs {
logDriver, err := c.getLogger()
if err != nil {
logrus.Errorf("Error obtaining the logger %v", err)
return err
}
if _, ok := logDriver.(logger.Reader); !ok {
logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
} else {
if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
logrus.Errorf("Error reading logs %v", err)
} else {
dec := json.NewDecoder(cLog)
for {
l := &jsonlog.JSONLog{}
cLog, ok := logDriver.(logger.LogReader)
if !ok {
return logger.ErrReadLogsNotSupported
}
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
logrus.Errorf("Error streaming logs: %s", err)
break
}
if l.Stream == "stdout" && stdout != nil {
io.WriteString(stdout, l.Log)
}
if l.Stream == "stderr" && stderr != nil {
io.WriteString(stderr, l.Log)
}
LogLoop:
for {
select {
case msg, ok := <-logs.Msg:
if !ok {
break LogLoop
}
if msg.Source == "stdout" && stdout != nil {
stdout.Write(msg.Line)
}
if msg.Source == "stderr" && stderr != nil {
stderr.Write(msg.Line)
}
case err := <-logs.Err:
logrus.Errorf("Error streaming logs: %v", err)
break LogLoop
}
}
}

View file

@ -27,6 +27,7 @@ type Context struct {
LogPath string
}
// Hostname returns the hostname from the underlying OS
func (ctx *Context) Hostname() (string, error) {
hostname, err := os.Hostname()
if err != nil {
@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
return hostname, nil
}
// Command returns the command that the container being logged was started with
func (ctx *Context) Command() string {
terms := []string{ctx.ContainerEntrypoint}
for _, arg := range ctx.ContainerArgs {

View file

@ -2,32 +2,42 @@ package jsonfilelog
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"sync"
"time"
"gopkg.in/fsnotify.v1"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile"
"github.com/docker/docker/pkg/timeutils"
"github.com/docker/docker/pkg/units"
)
const (
Name = "json-file"
Name = "json-file"
maxJSONDecodeRetry = 10
)
// JSONFileLogger is Logger implementation for default docker logging:
// JSON objects to file
type JSONFileLogger struct {
buf *bytes.Buffer
f *os.File // store for closing
mu sync.Mutex // protects buffer
capacity int64 //maximum size of each file
n int //maximum number of files
ctx logger.Context
buf *bytes.Buffer
f *os.File // store for closing
mu sync.Mutex // protects buffer
capacity int64 //maximum size of each file
n int //maximum number of files
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
notifyRotate *pubsub.Publisher
}
func init() {
@ -64,11 +74,13 @@ func New(ctx logger.Context) (logger.Logger, error) {
}
}
return &JSONFileLogger{
f: log,
buf: bytes.NewBuffer(nil),
ctx: ctx,
capacity: capval,
n: maxFiles,
f: log,
buf: bytes.NewBuffer(nil),
ctx: ctx,
capacity: capval,
n: maxFiles,
readers: make(map[*logger.LogWatcher]struct{}),
notifyRotate: pubsub.NewPublisher(0, 1),
}, nil
}
@ -111,6 +123,7 @@ func writeLog(l *JSONFileLogger) (int64, error) {
return -1, err
}
l.f = file
l.notifyRotate.Publish(struct{}{})
}
return writeToBuf(l)
}
@ -148,11 +161,11 @@ func backup(old, curr string) error {
}
}
if _, err := os.Stat(curr); os.IsNotExist(err) {
if f, err := os.Create(curr); err != nil {
f, err := os.Create(curr)
if err != nil {
return err
} else {
f.Close()
}
f.Close()
}
return os.Rename(curr, old)
}
@ -169,31 +182,200 @@ func ValidateLogOpt(cfg map[string]string) error {
return nil
}
func (l *JSONFileLogger) ReadLog(args ...string) (io.Reader, error) {
pth := l.ctx.LogPath
if len(args) > 0 {
//check if args[0] is an integer index
index, err := strconv.ParseInt(args[0], 0, 0)
if err != nil {
return nil, err
}
if index > 0 {
pth = pth + "." + args[0]
}
}
return os.Open(pth)
}
func (l *JSONFileLogger) LogPath() string {
return l.ctx.LogPath
}
// Close closes underlying file
// Close closes underlying file and signals all readers to stop
func (l *JSONFileLogger) Close() error {
return l.f.Close()
l.mu.Lock()
err := l.f.Close()
for r := range l.readers {
r.Close()
delete(l.readers, r)
}
l.mu.Unlock()
return err
}
// Name returns name of this logger
func (l *JSONFileLogger) Name() string {
return Name
}
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
return nil, err
}
msg := &logger.Message{
Source: l.Stream,
Timestamp: l.Created,
Line: []byte(l.Log),
}
return msg, nil
}
// Reads from the log file
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()
go l.readLogs(logWatcher, config)
return logWatcher
}
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)
pth := l.ctx.LogPath
var files []io.ReadSeeker
for i := l.n; i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
break
}
continue
}
defer f.Close()
files = append(files, f)
}
latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
return
}
defer latestFile.Close()
files = append(files, latestFile)
tailer := ioutils.MultiReadSeeker(files...)
if config.Tail != 0 {
tailFile(tailer, logWatcher, config.Tail, config.Since)
}
if !config.Follow {
return
}
if config.Tail == 0 {
latestFile.Seek(0, os.SEEK_END)
}
l.mu.Lock()
l.readers[logWatcher] = struct{}{}
l.mu.Unlock()
notifyRotate := l.notifyRotate.Subscribe()
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.mu.Lock()
delete(l.readers, logWatcher)
l.mu.Unlock()
l.notifyRotate.Evict(notifyRotate)
}
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader = f
if tail > 0 {
ls, err := tailfile.TailFile(f, tail)
if err != nil {
logWatcher.Err <- err
return
}
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
}
dec := json.NewDecoder(rdr)
l := &jsonlog.JSONLog{}
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err != io.EOF {
logWatcher.Err <- err
}
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}
fileWatcher, err := fsnotify.NewWatcher()
if err != nil {
logWatcher.Err <- err
return
}
defer fileWatcher.Close()
if err := fileWatcher.Add(f.Name()); err != nil {
logWatcher.Err <- err
return
}
var retries int
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err != io.EOF {
// try again because this shouldn't happen
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
dec = json.NewDecoder(f)
retries += 1
continue
}
logWatcher.Err <- err
return
}
select {
case <-fileWatcher.Events:
dec = json.NewDecoder(f)
continue
case <-fileWatcher.Errors:
logWatcher.Err <- err
return
case <-logWatcher.WatchClose():
return
case <-notifyRotate:
fileWatcher.Remove(f.Name())
f, err = os.Open(f.Name())
if err != nil {
logWatcher.Err <- err
return
}
if err := fileWatcher.Add(f.Name()); err != nil {
logWatcher.Err <- err
}
dec = json.NewDecoder(f)
continue
}
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
select {
case logWatcher.Msg <- msg:
case <-logWatcher.WatchClose():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}
}
}

View file

@ -2,11 +2,19 @@ package logger
import (
"errors"
"io"
"time"
"github.com/docker/docker/pkg/timeutils"
)
var ReadLogsNotSupported = errors.New("configured logging reader does not support reading")
// ErrReadLogsNotSupported is returned when the logger does not support reading logs
var ErrReadLogsNotSupported = errors.New("configured logging reader does not support reading")
const (
// TimeFormat is the time format used for timestamps sent to log readers
TimeFormat = timeutils.RFC3339NanoFixed
logWatcherBufferSize = 4096
)
// Message is datastructure that represents record from some container
type Message struct {
@ -16,14 +24,51 @@ type Message struct {
Timestamp time.Time
}
// Logger is interface for docker logging drivers
// Logger is the interface for docker logging drivers
type Logger interface {
Log(*Message) error
Name() string
Close() error
}
//Reader is an interface for docker logging drivers that support reading
type Reader interface {
ReadLog(args ...string) (io.Reader, error)
// ReadConfig is the configuration passed into ReadLogs
type ReadConfig struct {
Since time.Time
Tail int
Follow bool
}
// LogReader is the interface for reading log messages for loggers that support reading
type LogReader interface {
// Read logs from underlying logging backend
ReadLogs(ReadConfig) *LogWatcher
}
// LogWatcher is used when consuming logs read from the LogReader interface
type LogWatcher struct {
// For sending log messages to a reader
Msg chan *Message
// For sending error messages that occur while while reading logs
Err chan error
closeNotifier chan struct{}
}
// NewLogWatcher returns a new LogWatcher.
func NewLogWatcher() *LogWatcher {
return &LogWatcher{
Msg: make(chan *Message, logWatcherBufferSize),
Err: make(chan error, 1),
closeNotifier: make(chan struct{}),
}
}
// Close notifies the underlying log reader to stop
func (w *LogWatcher) Close() {
close(w.closeNotifier)
}
// WatchClose returns a channel receiver that receives notification when the watcher has been closed
// This should only be called from one goroutine
func (w *LogWatcher) WatchClose() <-chan struct{} {
return w.closeNotifier
}

View file

@ -1,23 +1,14 @@
package daemon
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net"
"os"
"strconv"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/pkg/tailfile"
"github.com/docker/docker/pkg/timeutils"
)
type ContainerLogsConfig struct {
@ -29,209 +20,64 @@ type ContainerLogsConfig struct {
Stop <-chan bool
}
func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) error {
var (
lines = -1
format string
)
func (daemon *Daemon) ContainerLogs(container *Container, config *ContainerLogsConfig) error {
if !(config.UseStdout || config.UseStderr) {
return fmt.Errorf("You must choose at least one stream")
}
if config.Timestamps {
format = timeutils.RFC3339NanoFixed
}
if config.Tail == "" {
config.Tail = "latest"
}
container, err := daemon.Get(name)
if err != nil {
return err
}
var (
outStream = config.OutStream
errStream io.Writer
)
outStream := config.OutStream
errStream := outStream
if !container.Config.Tty {
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
} else {
errStream = outStream
}
if container.LogDriverType() != jsonfilelog.Name {
return fmt.Errorf("\"logs\" endpoint is supported only for \"json-file\" logging driver")
}
maxFile := 1
container.readHostConfig()
cfg := container.getLogConfig()
conf := cfg.Config
if val, ok := conf["max-file"]; ok {
var err error
maxFile, err = strconv.Atoi(val)
if err != nil {
return fmt.Errorf("Error reading max-file value: %s", err)
}
}
logDriver, err := container.getLogger()
cLog, err := container.getLogger()
if err != nil {
return err
}
_, ok := logDriver.(logger.Reader)
logReader, ok := cLog.(logger.LogReader)
if !ok {
logrus.Errorf("Cannot read logs of the [%s] driver", logDriver.Name())
} else {
// json-file driver
if config.Tail != "all" && config.Tail != "latest" {
var err error
lines, err = strconv.Atoi(config.Tail)
if err != nil {
logrus.Errorf("Failed to parse tail %s, error: %v, show all logs", config.Tail, err)
lines = -1
}
}
if lines != 0 {
n := maxFile
if config.Tail == "latest" && config.Since.IsZero() {
n = 1
}
before := false
for i := n; i > 0; i-- {
if before {
break
}
cLog, err := getReader(logDriver, i, n, lines)
if err != nil {
logrus.Debugf("Error reading %d log file: %v", i-1, err)
continue
}
//if lines are specified, then iterate only once
if lines > 0 {
i = 1
} else { // if lines are not specified, cLog is a file, It needs to be closed
defer cLog.(*os.File).Close()
}
dec := json.NewDecoder(cLog)
l := &jsonlog.JSONLog{}
for {
l.Reset()
if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
logrus.Errorf("Error streaming logs: %s", err)
break
}
logLine := l.Log
if !config.Since.IsZero() && l.Created.Before(config.Since) {
continue
}
if config.Timestamps {
// format can be "" or time format, so here can't be error
logLine, _ = l.Format(format)
}
if l.Stream == "stdout" && config.UseStdout {
io.WriteString(outStream, logLine)
}
if l.Stream == "stderr" && config.UseStderr {
io.WriteString(errStream, logLine)
}
}
}
}
return logger.ErrReadLogsNotSupported
}
if config.Follow && container.IsRunning() {
chErrStderr := make(chan error)
chErrStdout := make(chan error)
var stdoutPipe, stderrPipe io.ReadCloser
follow := config.Follow && container.IsRunning()
tailLines, err := strconv.Atoi(config.Tail)
if err != nil {
tailLines = -1
}
// write an empty chunk of data (this is to ensure that the
// HTTP Response is sent immediatly, even if the container has
// not yet produced any data)
outStream.Write(nil)
if config.UseStdout {
stdoutPipe = container.StdoutLogPipe()
go func() {
logrus.Debug("logs: stdout stream begin")
chErrStdout <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since)
logrus.Debug("logs: stdout stream end")
}()
}
if config.UseStderr {
stderrPipe = container.StderrLogPipe()
go func() {
logrus.Debug("logs: stderr stream begin")
chErrStderr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since)
logrus.Debug("logs: stderr stream end")
}()
}
logrus.Debug("logs: begin stream")
readConfig := logger.ReadConfig{
Since: config.Since,
Tail: tailLines,
Follow: follow,
}
logs := logReader.ReadLogs(readConfig)
for {
select {
case err = <-chErrStderr:
if stdoutPipe != nil {
stdoutPipe.Close()
<-chErrStdout
}
case err = <-chErrStdout:
if stderrPipe != nil {
stderrPipe.Close()
<-chErrStderr
}
case <-config.Stop:
if stdoutPipe != nil {
stdoutPipe.Close()
<-chErrStdout
}
if stderrPipe != nil {
stderrPipe.Close()
<-chErrStderr
}
case err := <-logs.Err:
logrus.Errorf("Error streaming logs: %v", err)
return nil
}
if err != nil && err != io.EOF && err != io.ErrClosedPipe {
if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE {
logrus.Errorf("error streaming logs: %v", err)
case <-config.Stop:
logs.Close()
return nil
case msg, ok := <-logs.Msg:
if !ok {
logrus.Debugf("logs: end stream")
return nil
}
logLine := msg.Line
if config.Timestamps {
logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
}
if msg.Source == "stdout" && config.UseStdout {
outStream.Write(logLine)
}
if msg.Source == "stderr" && config.UseStderr {
errStream.Write(logLine)
}
}
}
return nil
}
func getReader(logDriver logger.Logger, fileIndex, maxFiles, lines int) (io.Reader, error) {
if lines <= 0 {
index := strconv.Itoa(fileIndex - 1)
cLog, err := logDriver.(logger.Reader).ReadLog(index)
return cLog, err
}
buf := bytes.NewBuffer([]byte{})
remaining := lines
for i := 0; i < maxFiles; i++ {
index := strconv.Itoa(i)
cLog, err := logDriver.(logger.Reader).ReadLog(index)
if err != nil {
return buf, err
}
f := cLog.(*os.File)
ls, err := tailfile.TailFile(f, remaining)
if err != nil {
return buf, err
}
tmp := bytes.NewBuffer([]byte{})
for _, l := range ls {
fmt.Fprintf(tmp, "%s\n", l)
}
tmp.ReadFrom(buf)
buf = tmp
if len(ls) == remaining {
return buf, nil
}
remaining = remaining - len(ls)
}
return buf, nil
}

View file

@ -12,7 +12,10 @@ import (
"github.com/docker/docker/runconfig"
)
const defaultTimeIncrement = 100
const (
defaultTimeIncrement = 100
loggerCloseTimeout = 10 * time.Second
)
// containerMonitor monitors the execution of a container's main process.
// If a restart policy is specified for the container the monitor will ensure that the
@ -310,7 +313,7 @@ func (m *containerMonitor) resetContainer(lock bool) {
close(exit)
}()
select {
case <-time.After(1 * time.Second):
case <-time.After(loggerCloseTimeout):
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
case <-exit:
}

View file

@ -29,7 +29,7 @@ The `docker logs --follow` command will continue streaming the new output from
the container's `STDOUT` and `STDERR`.
Passing a negative number or a non-integer to `--tail` is invalid and the
value is set to `latest` in that case.
value is set to `all` in that case.
The `docker logs --timestamp` commands will add an RFC3339Nano
timestamp, for example `2014-09-16T06:17:46.000000000Z`, to each

View file

@ -250,13 +250,9 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) {
}()
logCmd := exec.Command(dockerBinary, "logs", "-f", cleanedContainerID)
stdout, err := logCmd.StdoutPipe()
c.Assert(err, check.IsNil)
if err := logCmd.Start(); err != nil {
c.Fatal(err)
}
c.Assert(logCmd.Start(), check.IsNil)
// First read slowly
bytes1, err := consumeWithSpeed(stdout, 10, 50*time.Millisecond, stopSlowRead)

226
pkg/ioutils/multireader.go Normal file
View file

@ -0,0 +1,226 @@
package ioutils
import (
"bytes"
"fmt"
"io"
"os"
)
type pos struct {
idx int
offset int64
}
type multiReadSeeker struct {
readers []io.ReadSeeker
pos *pos
posIdx map[io.ReadSeeker]int
}
func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) {
var tmpOffset int64
switch whence {
case os.SEEK_SET:
for i, rdr := range r.readers {
// get size of the current reader
s, err := rdr.Seek(0, os.SEEK_END)
if err != nil {
return -1, err
}
if offset > tmpOffset+s {
if i == len(r.readers)-1 {
rdrOffset := s + (offset - tmpOffset)
if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil {
return -1, err
}
r.pos = &pos{i, rdrOffset}
return offset, nil
}
tmpOffset += s
continue
}
rdrOffset := offset - tmpOffset
idx := i
rdr.Seek(rdrOffset, os.SEEK_SET)
// make sure all following readers are at 0
for _, rdr := range r.readers[i+1:] {
rdr.Seek(0, os.SEEK_SET)
}
if rdrOffset == s && i != len(r.readers)-1 {
idx += 1
rdrOffset = 0
}
r.pos = &pos{idx, rdrOffset}
return offset, nil
}
case os.SEEK_END:
for _, rdr := range r.readers {
s, err := rdr.Seek(0, os.SEEK_END)
if err != nil {
return -1, err
}
tmpOffset += s
}
r.Seek(tmpOffset+offset, os.SEEK_SET)
return tmpOffset + offset, nil
case os.SEEK_CUR:
if r.pos == nil {
return r.Seek(offset, os.SEEK_SET)
}
// Just return the current offset
if offset == 0 {
return r.getCurOffset()
}
curOffset, err := r.getCurOffset()
if err != nil {
return -1, err
}
rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset)
if err != nil {
return -1, err
}
r.pos = &pos{r.posIdx[rdr], rdrOffset}
return curOffset + offset, nil
default:
return -1, fmt.Errorf("Invalid whence: %d", whence)
}
return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset)
}
func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) {
var rdr io.ReadSeeker
var rdrOffset int64
for i, rdr := range r.readers {
offsetTo, err := r.getOffsetToReader(rdr)
if err != nil {
return nil, -1, err
}
if offsetTo > offset {
rdr = r.readers[i-1]
rdrOffset = offsetTo - offset
break
}
if rdr == r.readers[len(r.readers)-1] {
rdrOffset = offsetTo + offset
break
}
}
return rdr, rdrOffset, nil
}
func (r *multiReadSeeker) getCurOffset() (int64, error) {
var totalSize int64
for _, rdr := range r.readers[:r.pos.idx+1] {
if r.posIdx[rdr] == r.pos.idx {
totalSize += r.pos.offset
break
}
size, err := getReadSeekerSize(rdr)
if err != nil {
return -1, fmt.Errorf("error getting seeker size: %v", err)
}
totalSize += size
}
return totalSize, nil
}
func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) {
var offset int64
for _, r := range r.readers {
if r == rdr {
break
}
size, err := getReadSeekerSize(rdr)
if err != nil {
return -1, err
}
offset += size
}
return offset, nil
}
func (r *multiReadSeeker) Read(b []byte) (int, error) {
if r.pos == nil {
r.pos = &pos{0, 0}
}
bCap := int64(cap(b))
buf := bytes.NewBuffer(nil)
var rdr io.ReadSeeker
for _, rdr = range r.readers[r.pos.idx:] {
readBytes, err := io.CopyN(buf, rdr, bCap)
if err != nil && err != io.EOF {
return -1, err
}
bCap -= readBytes
if bCap == 0 {
break
}
}
rdrPos, err := rdr.Seek(0, os.SEEK_CUR)
if err != nil {
return -1, err
}
r.pos = &pos{r.posIdx[rdr], rdrPos}
return buf.Read(b)
}
func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) {
// save the current position
pos, err := rdr.Seek(0, os.SEEK_CUR)
if err != nil {
return -1, err
}
// get the size
size, err := rdr.Seek(0, os.SEEK_END)
if err != nil {
return -1, err
}
// reset the position
if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil {
return -1, err
}
return size, nil
}
// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided
// input readseekers. After calling this method the initial position is set to the
// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances
// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker.
// Seek can be used over the sum of lengths of all readseekers.
//
// When a MultiReadSeeker is used, no Read and Seek operations should be made on
// its ReadSeeker components. Also, users should make no assumption on the state
// of individual readseekers while the MultiReadSeeker is used.
func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker {
if len(readers) == 1 {
return readers[0]
}
idx := make(map[io.ReadSeeker]int)
for i, rdr := range readers {
idx[rdr] = i
}
return &multiReadSeeker{
readers: readers,
posIdx: idx,
}
}

View file

@ -0,0 +1,149 @@
package ioutils
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"testing"
)
func TestMultiReadSeekerReadAll(t *testing.T) {
str := "hello world"
s1 := strings.NewReader(str + " 1")
s2 := strings.NewReader(str + " 2")
s3 := strings.NewReader(str + " 3")
mr := MultiReadSeeker(s1, s2, s3)
expectedSize := int64(s1.Len() + s2.Len() + s3.Len())
b, err := ioutil.ReadAll(mr)
if err != nil {
t.Fatal(err)
}
expected := "hello world 1hello world 2hello world 3"
if string(b) != expected {
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
}
size, err := mr.Seek(0, os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if size != expectedSize {
t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize)
}
// Reset the position and read again
pos, err := mr.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if pos != 0 {
t.Fatalf("expected position to be set to 0, got %d", pos)
}
b, err = ioutil.ReadAll(mr)
if err != nil {
t.Fatal(err)
}
if string(b) != expected {
t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected)
}
}
func TestMultiReadSeekerReadEach(t *testing.T) {
str := "hello world"
s1 := strings.NewReader(str + " 1")
s2 := strings.NewReader(str + " 2")
s3 := strings.NewReader(str + " 3")
mr := MultiReadSeeker(s1, s2, s3)
var totalBytes int64
for i, s := range []*strings.Reader{s1, s2, s3} {
sLen := int64(s.Len())
buf := make([]byte, s.Len())
expected := []byte(fmt.Sprintf("%s %d", str, i+1))
if _, err := mr.Read(buf); err != nil && err != io.EOF {
t.Fatal(err)
}
if !bytes.Equal(buf, expected) {
t.Fatalf("expected %q to be %q", string(buf), string(expected))
}
pos, err := mr.Seek(0, os.SEEK_CUR)
if err != nil {
t.Fatalf("iteration: %d, error: %v", i+1, err)
}
// check that the total bytes read is the current position of the seeker
totalBytes += sLen
if pos != totalBytes {
t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1)
}
// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well
newPos, err := mr.Seek(pos, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if newPos != pos {
t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos)
}
}
}
func TestMultiReadSeekerReadSpanningChunks(t *testing.T) {
str := "hello world"
s1 := strings.NewReader(str + " 1")
s2 := strings.NewReader(str + " 2")
s3 := strings.NewReader(str + " 3")
mr := MultiReadSeeker(s1, s2, s3)
buf := make([]byte, s1.Len()+3)
_, err := mr.Read(buf)
if err != nil {
t.Fatal(err)
}
// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string
expected := "hello world 1hel"
if string(buf) != expected {
t.Fatalf("expected %s to be %s", string(buf), expected)
}
}
func TestMultiReadSeekerNegativeSeek(t *testing.T) {
str := "hello world"
s1 := strings.NewReader(str + " 1")
s2 := strings.NewReader(str + " 2")
s3 := strings.NewReader(str + " 3")
mr := MultiReadSeeker(s1, s2, s3)
s1Len := s1.Len()
s2Len := s2.Len()
s3Len := s3.Len()
s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END)
if err != nil {
t.Fatal(err)
}
if s != int64(s1Len+s2Len) {
t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len())
}
buf := make([]byte, s3Len)
if _, err := mr.Read(buf); err != nil && err != io.EOF {
t.Fatal(err)
}
expected := fmt.Sprintf("%s %d", str, 3)
if string(buf) != fmt.Sprintf("%s %d", str, 3) {
t.Fatalf("expected %q to be %q", string(buf), expected)
}
}

View file

@ -3,6 +3,7 @@ package tailfile
import (
"bytes"
"errors"
"io"
"os"
)
@ -12,7 +13,7 @@ var eol = []byte("\n")
var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive")
//TailFile returns last n lines of file f
func TailFile(f *os.File, n int) ([][]byte, error) {
func TailFile(f io.ReadSeeker, n int) ([][]byte, error) {
if n <= 0 {
return nil, ErrNonPositiveLinesNumber
}