Merge pull request #32914 from jamiehannaford/until-logging
Add --until flag for docker logs; closes #32807
This commit is contained in:
commit
68a4552529
12 changed files with 223 additions and 20 deletions
|
@ -96,6 +96,7 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
|
|||
Follow: httputils.BoolValue(r, "follow"),
|
||||
Timestamps: httputils.BoolValue(r, "timestamps"),
|
||||
Since: r.Form.Get("since"),
|
||||
Until: r.Form.Get("until"),
|
||||
Tail: r.Form.Get("tail"),
|
||||
ShowStdout: stdout,
|
||||
ShowStderr: stderr,
|
||||
|
|
|
@ -4957,6 +4957,11 @@ paths:
|
|||
description: "Only return logs since this time, as a UNIX timestamp"
|
||||
type: "integer"
|
||||
default: 0
|
||||
- name: "until"
|
||||
in: "query"
|
||||
description: "Only return logs before this time, as a UNIX timestamp"
|
||||
type: "integer"
|
||||
default: 0
|
||||
- name: "timestamps"
|
||||
in: "query"
|
||||
description: "Add timestamps to every log line"
|
||||
|
|
|
@ -74,6 +74,7 @@ type ContainerLogsOptions struct {
|
|||
ShowStdout bool
|
||||
ShowStderr bool
|
||||
Since string
|
||||
Until string
|
||||
Timestamps bool
|
||||
Follow bool
|
||||
Tail string
|
||||
|
|
|
@ -51,6 +51,14 @@ func (cli *Client) ContainerLogs(ctx context.Context, container string, options
|
|||
query.Set("since", ts)
|
||||
}
|
||||
|
||||
if options.Until != "" {
|
||||
ts, err := timetypes.GetTimestamp(options.Until, time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query.Set("until", ts)
|
||||
}
|
||||
|
||||
if options.Timestamps {
|
||||
query.Set("timestamps", "1")
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/internal/testutil"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -28,9 +29,11 @@ func TestContainerLogsError(t *testing.T) {
|
|||
_, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{
|
||||
Since: "2006-01-02TZ",
|
||||
})
|
||||
if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) {
|
||||
t.Fatalf("expected a 'parsing time' error, got %v", err)
|
||||
}
|
||||
testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`)
|
||||
_, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{
|
||||
Until: "2006-01-02TZ",
|
||||
})
|
||||
testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`)
|
||||
}
|
||||
|
||||
func TestContainerLogs(t *testing.T) {
|
||||
|
@ -80,6 +83,17 @@ func TestContainerLogs(t *testing.T) {
|
|||
"since": "invalid but valid",
|
||||
},
|
||||
},
|
||||
{
|
||||
options: types.ContainerLogsOptions{
|
||||
// An complete invalid date, timestamp or go duration will be
|
||||
// passed as is
|
||||
Until: "invalid but valid",
|
||||
},
|
||||
expectedQueryParams: map[string]string{
|
||||
"tail": "",
|
||||
"until": "invalid but valid",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, logCase := range cases {
|
||||
client := &Client{
|
||||
|
|
|
@ -122,6 +122,9 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
|
|||
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
|
||||
continue
|
||||
}
|
||||
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case watcher.Msg <- msg:
|
||||
|
|
|
@ -171,13 +171,15 @@ func (s *journald) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
|
||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) {
|
||||
var msg, data, cursor *C.char
|
||||
var length C.size_t
|
||||
var stamp C.uint64_t
|
||||
var priority, partial C.int
|
||||
var done bool
|
||||
|
||||
// Walk the journal from here forward until we run out of new entries.
|
||||
// Walk the journal from here forward until we run out of new entries
|
||||
// or we reach the until value (if provided).
|
||||
drain:
|
||||
for {
|
||||
// Try not to send a given entry twice.
|
||||
|
@ -195,6 +197,12 @@ drain:
|
|||
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
|
||||
break
|
||||
}
|
||||
// Break if the timestamp exceeds any provided until flag.
|
||||
if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) {
|
||||
done = true
|
||||
break
|
||||
}
|
||||
|
||||
// Set up the time and text of the entry.
|
||||
timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
|
||||
line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
|
||||
|
@ -240,10 +248,10 @@ drain:
|
|||
// ensure that we won't be freeing an address that's invalid
|
||||
cursor = nil
|
||||
}
|
||||
return cursor
|
||||
return cursor, done
|
||||
}
|
||||
|
||||
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
|
||||
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
|
||||
s.mu.Lock()
|
||||
s.readers.readers[logWatcher] = logWatcher
|
||||
if s.closed {
|
||||
|
@ -270,9 +278,10 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
|
|||
break
|
||||
}
|
||||
|
||||
cursor = s.drainJournal(logWatcher, config, j, cursor)
|
||||
var done bool
|
||||
cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro)
|
||||
|
||||
if status != 1 {
|
||||
if status != 1 || done {
|
||||
// We were notified to stop
|
||||
break
|
||||
}
|
||||
|
@ -304,6 +313,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
var cmatch, cursor *C.char
|
||||
var stamp C.uint64_t
|
||||
var sinceUnixMicro uint64
|
||||
var untilUnixMicro uint64
|
||||
var pipes [2]C.int
|
||||
|
||||
// Get a handle to the journal.
|
||||
|
@ -343,10 +353,19 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
nano := config.Since.UnixNano()
|
||||
sinceUnixMicro = uint64(nano / 1000)
|
||||
}
|
||||
// If we have an until value, convert it too
|
||||
if !config.Until.IsZero() {
|
||||
nano := config.Until.UnixNano()
|
||||
untilUnixMicro = uint64(nano / 1000)
|
||||
}
|
||||
if config.Tail > 0 {
|
||||
lines := config.Tail
|
||||
// Start at the end of the journal.
|
||||
if C.sd_journal_seek_tail(j) < 0 {
|
||||
// If until time provided, start from there.
|
||||
// Otherwise start at the end of the journal.
|
||||
if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 {
|
||||
logWatcher.Err <- fmt.Errorf("error seeking provided until value")
|
||||
return
|
||||
} else if C.sd_journal_seek_tail(j) < 0 {
|
||||
logWatcher.Err <- fmt.Errorf("error seeking to end of journal")
|
||||
return
|
||||
}
|
||||
|
@ -362,8 +381,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
|
||||
break
|
||||
} else {
|
||||
// Compare the timestamp on the entry
|
||||
// to our threshold value.
|
||||
// Compare the timestamp on the entry to our threshold value.
|
||||
if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) {
|
||||
break
|
||||
}
|
||||
|
@ -392,7 +410,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
return
|
||||
}
|
||||
}
|
||||
cursor = s.drainJournal(logWatcher, config, j, nil)
|
||||
cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro)
|
||||
if config.Follow {
|
||||
// Allocate a descriptor for following the journal, if we'll
|
||||
// need one. Do it here so that we can report if it fails.
|
||||
|
@ -404,7 +422,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
|||
if C.pipe(&pipes[0]) == C.int(-1) {
|
||||
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
|
||||
} else {
|
||||
cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
|
||||
cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro)
|
||||
// Let followJournal handle freeing the journal context
|
||||
// object and closing the channel.
|
||||
following = true
|
||||
|
|
|
@ -98,7 +98,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|||
|
||||
if config.Tail != 0 {
|
||||
tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
|
||||
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
||||
tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
|
||||
}
|
||||
|
||||
// close all the rotated files
|
||||
|
@ -119,7 +119,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
|
|||
l.readers[logWatcher] = struct{}{}
|
||||
l.mu.Unlock()
|
||||
|
||||
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
||||
followLogs(latestFile, logWatcher, notifyRotate, config)
|
||||
|
||||
l.mu.Lock()
|
||||
delete(l.readers, logWatcher)
|
||||
|
@ -136,7 +136,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
|||
return io.NewSectionReader(f, 0, size), nil
|
||||
}
|
||||
|
||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
|
||||
rdr := io.Reader(f)
|
||||
if tail > 0 {
|
||||
ls, err := tailfile.TailFile(f, tail)
|
||||
|
@ -158,6 +158,9 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
|
|||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-logWatcher.WatchClose():
|
||||
return
|
||||
|
@ -186,7 +189,7 @@ func watchFile(name string) (filenotify.FileWatcher, error) {
|
|||
return fileWatcher, nil
|
||||
}
|
||||
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
|
||||
dec := json.NewDecoder(f)
|
||||
l := &jsonlog.JSONLog{}
|
||||
|
||||
|
@ -324,14 +327,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
continue
|
||||
}
|
||||
|
||||
since := config.Since
|
||||
until := config.Until
|
||||
|
||||
retries = 0 // reset retries since we've succeeded
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-ctx.Done():
|
||||
logWatcher.Msg <- msg
|
||||
// This for loop is used when the logger is closed (ie, container
|
||||
// stopped) but the consumer is still waiting for logs.
|
||||
for {
|
||||
msg, err := decodeLogLine(dec, l)
|
||||
if err != nil {
|
||||
|
@ -340,6 +351,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ type SizedLogger interface {
|
|||
// ReadConfig is the configuration passed into ReadLogs.
|
||||
type ReadConfig struct {
|
||||
Since time.Time
|
||||
Until time.Time
|
||||
Tail int
|
||||
Follow bool
|
||||
}
|
||||
|
|
|
@ -77,8 +77,18 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
|
|||
since = time.Unix(s, n)
|
||||
}
|
||||
|
||||
var until time.Time
|
||||
if config.Until != "" && config.Until != "0" {
|
||||
s, n, err := timetypes.ParseTimestamps(config.Until, 0)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
until = time.Unix(s, n)
|
||||
}
|
||||
|
||||
readConfig := logger.ReadConfig{
|
||||
Since: since,
|
||||
Until: until,
|
||||
Tail: tailLines,
|
||||
Follow: follow,
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ keywords: "API, Docker, rcli, REST, documentation"
|
|||
If `Error` is `null`, container removal has succeeded, otherwise
|
||||
the test of an error message indicating why container removal has failed
|
||||
is available from `Error.Message` field.
|
||||
* `GET /containers/(name)/logs` now supports an additional query parameter: `until`, which returns log lines that occurred before the specified timestamp.
|
||||
|
||||
## v1.33 API changes
|
||||
|
||||
|
|
|
@ -2,8 +2,12 @@ package main
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -11,6 +15,7 @@ import (
|
|||
"github.com/docker/docker/client"
|
||||
"github.com/docker/docker/integration-cli/checker"
|
||||
"github.com/docker/docker/integration-cli/request"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/go-check/check"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -85,3 +90,125 @@ func (s *DockerSuite) TestLogsAPIContainerNotFound(c *check.C) {
|
|||
c.Assert(err, checker.IsNil)
|
||||
c.Assert(resp.StatusCode, checker.Equals, http.StatusNotFound)
|
||||
}
|
||||
|
||||
func (s *DockerSuite) TestLogsAPIUntilFutureFollow(c *check.C) {
|
||||
testRequires(c, DaemonIsLinux)
|
||||
|
||||
name := "logsuntilfuturefollow"
|
||||
dockerCmd(c, "run", "-d", "--name", name, "busybox", "/bin/sh", "-c", "while true; do date +%s; sleep 1; done")
|
||||
c.Assert(waitRun(name), checker.IsNil)
|
||||
|
||||
untilSecs := 5
|
||||
untilDur, err := time.ParseDuration(fmt.Sprintf("%ds", untilSecs))
|
||||
c.Assert(err, checker.IsNil)
|
||||
until := daemonTime(c).Add(untilDur)
|
||||
|
||||
client, err := request.NewClient()
|
||||
if err != nil {
|
||||
c.Fatal(err)
|
||||
}
|
||||
|
||||
cfg := types.ContainerLogsOptions{Until: until.Format(time.RFC3339Nano), Follow: true, ShowStdout: true, Timestamps: true}
|
||||
reader, err := client.ContainerLogs(context.Background(), name, cfg)
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
type logOut struct {
|
||||
out string
|
||||
err error
|
||||
}
|
||||
|
||||
chLog := make(chan logOut)
|
||||
|
||||
go func() {
|
||||
bufReader := bufio.NewReader(reader)
|
||||
defer reader.Close()
|
||||
for i := 0; i < untilSecs; i++ {
|
||||
out, _, err := bufReader.ReadLine()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
chLog <- logOut{"", err}
|
||||
return
|
||||
}
|
||||
|
||||
chLog <- logOut{strings.TrimSpace(string(out)), err}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < untilSecs; i++ {
|
||||
select {
|
||||
case l := <-chLog:
|
||||
c.Assert(l.err, checker.IsNil)
|
||||
i, err := strconv.ParseInt(strings.Split(l.out, " ")[1], 10, 64)
|
||||
c.Assert(err, checker.IsNil)
|
||||
c.Assert(time.Unix(i, 0).UnixNano(), checker.LessOrEqualThan, until.UnixNano())
|
||||
case <-time.After(20 * time.Second):
|
||||
c.Fatal("timeout waiting for logs to exit")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DockerSuite) TestLogsAPIUntil(c *check.C) {
|
||||
name := "logsuntil"
|
||||
dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; sleep 0.5; done")
|
||||
|
||||
client, err := request.NewClient()
|
||||
if err != nil {
|
||||
c.Fatal(err)
|
||||
}
|
||||
|
||||
extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string {
|
||||
reader, err := client.ContainerLogs(context.Background(), name, cfg)
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
actualStdout := new(bytes.Buffer)
|
||||
actualStderr := ioutil.Discard
|
||||
_, err = stdcopy.StdCopy(actualStdout, actualStderr, reader)
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
return strings.Split(actualStdout.String(), "\n")
|
||||
}
|
||||
|
||||
// Get timestamp of second log line
|
||||
allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true})
|
||||
t, err := time.Parse(time.RFC3339Nano, strings.Split(allLogs[1], " ")[0])
|
||||
c.Assert(err, checker.IsNil)
|
||||
until := t.Format(time.RFC3339Nano)
|
||||
|
||||
// Get logs until the timestamp of second line, i.e. first two lines
|
||||
logs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: until})
|
||||
|
||||
// Ensure log lines after cut-off are excluded
|
||||
logsString := strings.Join(logs, "\n")
|
||||
c.Assert(logsString, checker.Not(checker.Contains), "log3", check.Commentf("unexpected log message returned, until=%v", until))
|
||||
}
|
||||
|
||||
func (s *DockerSuite) TestLogsAPIUntilDefaultValue(c *check.C) {
|
||||
name := "logsuntildefaultval"
|
||||
dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; done")
|
||||
|
||||
client, err := request.NewClient()
|
||||
if err != nil {
|
||||
c.Fatal(err)
|
||||
}
|
||||
|
||||
extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string {
|
||||
reader, err := client.ContainerLogs(context.Background(), name, cfg)
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
actualStdout := new(bytes.Buffer)
|
||||
actualStderr := ioutil.Discard
|
||||
_, err = stdcopy.StdCopy(actualStdout, actualStderr, reader)
|
||||
c.Assert(err, checker.IsNil)
|
||||
|
||||
return strings.Split(actualStdout.String(), "\n")
|
||||
}
|
||||
|
||||
// Get timestamp of second log line
|
||||
allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true})
|
||||
|
||||
// Test with default value specified and parameter omitted
|
||||
defaultLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: "0"})
|
||||
c.Assert(defaultLogs, checker.DeepEquals, allLogs)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue