Add support for task and arbitrary combo logs
Refactored the API to more easily accept new endpoints. Added REST, client, and CLI endpoints for getting logs from a specific task. All that is needed after this commit to enable arbitrary service log selectors is a REST endpoint and handler. Task logs can be retrieved by putting in a task ID at the CLI instead of a service ID. Signed-off-by: Drew Erny <drew.erny@docker.com>
This commit is contained in:
parent
9c0473fa65
commit
d330dc3223
11 changed files with 402 additions and 113 deletions
|
@ -21,7 +21,7 @@ type Backend interface {
|
|||
CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error)
|
||||
UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error)
|
||||
RemoveService(string) error
|
||||
ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
|
||||
ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error
|
||||
GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
|
||||
GetNode(string) (types.Node, error)
|
||||
UpdateNode(string, uint64, types.NodeSpec) error
|
||||
|
|
|
@ -43,6 +43,7 @@ func (sr *swarmRouter) initRoutes() {
|
|||
router.NewPostRoute("/nodes/{id}/update", sr.updateNode),
|
||||
router.NewGetRoute("/tasks", sr.getTasks),
|
||||
router.NewGetRoute("/tasks/{id}", sr.getTask),
|
||||
router.Experimental(router.Cancellable(router.NewGetRoute("/tasks/{id}/logs", sr.getTaskLogs))),
|
||||
router.NewGetRoute("/secrets", sr.getSecrets),
|
||||
router.NewPostRoute("/secrets/create", sr.createSecret),
|
||||
router.NewDeleteRoute("/secrets/{id}", sr.removeSecret),
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
types "github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -215,54 +214,28 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (sr *swarmRouter) getTaskLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||
if err := httputils.ParseForm(r); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// make a selector to pass to the helper function
|
||||
selector := &backend.LogSelector{
|
||||
Tasks: []string{vars["id"]},
|
||||
}
|
||||
return sr.swarmLogs(ctx, w, r, selector)
|
||||
}
|
||||
|
||||
func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||
if err := httputils.ParseForm(r); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Args are validated before the stream starts because when it starts we're
|
||||
// sending HTTP 200 by writing an empty chunk of data to tell the client that
|
||||
// daemon is going to stream. By sending this initial HTTP 200 we can't report
|
||||
// any error after the stream starts (i.e. container not found, wrong parameters)
|
||||
// with the appropriate status code.
|
||||
stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
|
||||
if !(stdout || stderr) {
|
||||
return fmt.Errorf("Bad parameters: you must choose at least one stream")
|
||||
// make a selector to pass to the helper function
|
||||
selector := &backend.LogSelector{
|
||||
Services: []string{vars["id"]},
|
||||
}
|
||||
|
||||
serviceName := vars["id"]
|
||||
logsConfig := &backend.ContainerLogsConfig{
|
||||
ContainerLogsOptions: basictypes.ContainerLogsOptions{
|
||||
Follow: httputils.BoolValue(r, "follow"),
|
||||
Timestamps: httputils.BoolValue(r, "timestamps"),
|
||||
Since: r.Form.Get("since"),
|
||||
Tail: r.Form.Get("tail"),
|
||||
ShowStdout: stdout,
|
||||
ShowStderr: stderr,
|
||||
Details: httputils.BoolValue(r, "details"),
|
||||
},
|
||||
OutStream: w,
|
||||
}
|
||||
|
||||
if logsConfig.Details {
|
||||
return fmt.Errorf("Bad parameters: details is not currently supported")
|
||||
}
|
||||
|
||||
chStarted := make(chan struct{})
|
||||
if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil {
|
||||
select {
|
||||
case <-chStarted:
|
||||
// The client may be expecting all of the data we're sending to
|
||||
// be multiplexed, so send it through OutStream, which will
|
||||
// have been set up to handle that if needed.
|
||||
stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
|
||||
fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return sr.swarmLogs(ctx, w, r, selector)
|
||||
}
|
||||
|
||||
func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||
|
|
55
api/server/router/swarm/helpers.go
Normal file
55
api/server/router/swarm/helpers.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/docker/docker/api/server/httputils"
|
||||
basictypes "github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/backend"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// swarmLogs takes an http response, request, and selector, and writes the logs
|
||||
// specified by the selector to the response
|
||||
func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, selector *backend.LogSelector) error {
|
||||
// Args are validated before the stream starts because when it starts we're
|
||||
// sending HTTP 200 by writing an empty chunk of data to tell the client that
|
||||
// daemon is going to stream. By sending this initial HTTP 200 we can't report
|
||||
// any error after the stream starts (i.e. container not found, wrong parameters)
|
||||
// with the appropriate status code.
|
||||
stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
|
||||
if !(stdout || stderr) {
|
||||
return fmt.Errorf("Bad parameters: you must choose at least one stream")
|
||||
}
|
||||
|
||||
logsConfig := &backend.ContainerLogsConfig{
|
||||
ContainerLogsOptions: basictypes.ContainerLogsOptions{
|
||||
Follow: httputils.BoolValue(r, "follow"),
|
||||
Timestamps: httputils.BoolValue(r, "timestamps"),
|
||||
Since: r.Form.Get("since"),
|
||||
Tail: r.Form.Get("tail"),
|
||||
ShowStdout: stdout,
|
||||
ShowStderr: stderr,
|
||||
Details: httputils.BoolValue(r, "details"),
|
||||
},
|
||||
OutStream: w,
|
||||
}
|
||||
|
||||
chStarted := make(chan struct{})
|
||||
if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil {
|
||||
select {
|
||||
case <-chStarted:
|
||||
// The client may be expecting all of the data we're sending to
|
||||
// be multiplexed, so send it through OutStream, which will
|
||||
// have been set up to handle that if needed.
|
||||
stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
|
||||
fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -7948,6 +7948,86 @@ paths:
|
|||
required: true
|
||||
type: "string"
|
||||
tags: ["Task"]
|
||||
/tasks/{id}/logs:
|
||||
get:
|
||||
summary: "Get task logs"
|
||||
description: |
|
||||
Get `stdout` and `stderr` logs from a task.
|
||||
|
||||
**Note**: This endpoint works only for services with the `json-file` or `journald` logging drivers.
|
||||
operationId: "TaskLogs"
|
||||
produces:
|
||||
- "application/vnd.docker.raw-stream"
|
||||
- "application/json"
|
||||
responses:
|
||||
101:
|
||||
description: "logs returned as a stream"
|
||||
schema:
|
||||
type: "string"
|
||||
format: "binary"
|
||||
200:
|
||||
description: "logs returned as a string in response body"
|
||||
schema:
|
||||
type: "string"
|
||||
404:
|
||||
description: "no such task"
|
||||
schema:
|
||||
$ref: "#/definitions/ErrorResponse"
|
||||
examples:
|
||||
application/json:
|
||||
message: "No such task: c2ada9df5af8"
|
||||
500:
|
||||
description: "server error"
|
||||
schema:
|
||||
$ref: "#/definitions/ErrorResponse"
|
||||
503:
|
||||
description: "node is not part of a swarm"
|
||||
schema:
|
||||
$ref: "#/definitions/ErrorResponse"
|
||||
parameters:
|
||||
- name: "id"
|
||||
in: "path"
|
||||
required: true
|
||||
description: "ID of the task"
|
||||
type: "string"
|
||||
- name: "details"
|
||||
in: "query"
|
||||
description: "Show extra details provided to logs."
|
||||
type: "boolean"
|
||||
default: false
|
||||
- name: "follow"
|
||||
in: "query"
|
||||
description: |
|
||||
Return the logs as a stream.
|
||||
|
||||
This will return a `101` HTTP response with a `Connection: upgrade` header, then hijack the HTTP connection to send raw output. For more information about hijacking and the stream format, [see the documentation for the attach endpoint](#operation/ContainerAttach).
|
||||
type: "boolean"
|
||||
default: false
|
||||
- name: "stdout"
|
||||
in: "query"
|
||||
description: "Return logs from `stdout`"
|
||||
type: "boolean"
|
||||
default: false
|
||||
- name: "stderr"
|
||||
in: "query"
|
||||
description: "Return logs from `stderr`"
|
||||
type: "boolean"
|
||||
default: false
|
||||
- name: "since"
|
||||
in: "query"
|
||||
description: "Only return logs since this time, as a UNIX timestamp"
|
||||
type: "integer"
|
||||
default: 0
|
||||
- name: "timestamps"
|
||||
in: "query"
|
||||
description: "Add timestamps to every log line"
|
||||
type: "boolean"
|
||||
default: false
|
||||
- name: "tail"
|
||||
in: "query"
|
||||
description: "Only return this number of log lines from the end of the logs. Specify as an integer or `all` to output all log lines."
|
||||
type: "string"
|
||||
default: "all"
|
||||
/secrets:
|
||||
get:
|
||||
summary: "List secrets"
|
||||
|
|
|
@ -32,6 +32,16 @@ type ContainerLogsConfig struct {
|
|||
OutStream io.Writer
|
||||
}
|
||||
|
||||
// LogSelector is a list of services and tasks that should be returned as part
|
||||
// of a log stream. It is similar to swarmapi.LogSelector, with the difference
|
||||
// that the names don't have to be resolved to IDs; this is mostly to avoid
|
||||
// accidents later where a swarmapi LogSelector might have been incorrectly
|
||||
// used verbatim (and to avoid the handler having to import swarmapi types)
|
||||
type LogSelector struct {
|
||||
Services []string
|
||||
Tasks []string
|
||||
}
|
||||
|
||||
// ContainerStatsConfig holds information for configuring the runtime
|
||||
// behavior of a backend.ContainerStats() call.
|
||||
type ContainerStatsConfig struct {
|
||||
|
|
|
@ -30,9 +30,14 @@ type logsOptions struct {
|
|||
timestamps bool
|
||||
tail string
|
||||
|
||||
service string
|
||||
target string
|
||||
}
|
||||
|
||||
// TODO(dperny) the whole CLI for this is kind of a mess IMHOIRL and it needs
|
||||
// to be refactored agressively. There may be changes to the implementation of
|
||||
// details, which will be need to be reflected in this code. The refactoring
|
||||
// should be put off until we make those changes, tho, because I think the
|
||||
// decisions made WRT details will impact the design of the CLI.
|
||||
func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
|
||||
var opts logsOptions
|
||||
|
||||
|
@ -41,16 +46,16 @@ func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
|
|||
Short: "Fetch the logs of a service",
|
||||
Args: cli.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
opts.service = args[0]
|
||||
opts.target = args[0]
|
||||
return runLogs(dockerCli, &opts)
|
||||
},
|
||||
Tags: map[string]string{"experimental": ""},
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names")
|
||||
flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names in output")
|
||||
flags.BoolVar(&opts.noTrunc, "no-trunc", false, "Do not truncate output")
|
||||
flags.BoolVar(&opts.noTaskIDs, "no-task-ids", false, "Do not include task IDs")
|
||||
flags.BoolVar(&opts.noTaskIDs, "no-task-ids", false, "Do not include task IDs in output")
|
||||
flags.BoolVarP(&opts.follow, "follow", "f", false, "Follow log output")
|
||||
flags.StringVar(&opts.since, "since", "", "Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)")
|
||||
flags.BoolVarP(&opts.timestamps, "timestamps", "t", false, "Show timestamps")
|
||||
|
@ -70,28 +75,44 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
|
|||
Tail: opts.tail,
|
||||
}
|
||||
|
||||
client := dockerCli.Client()
|
||||
cli := dockerCli.Client()
|
||||
|
||||
service, _, err := client.ServiceInspectWithRaw(ctx, opts.service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var (
|
||||
maxLength = 1
|
||||
responseBody io.ReadCloser
|
||||
)
|
||||
|
||||
responseBody, err := client.ServiceLogs(ctx, opts.service, options)
|
||||
service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target)
|
||||
if err != nil {
|
||||
return err
|
||||
// if it's any error other than service not found, it's Real
|
||||
if !client.IsErrServiceNotFound(err) {
|
||||
return err
|
||||
}
|
||||
task, _, err := cli.TaskInspectWithRaw(ctx, opts.target)
|
||||
if err != nil {
|
||||
if client.IsErrTaskNotFound(err) {
|
||||
// if the task ALSO isn't found, rewrite the error to be clear
|
||||
// that we looked for services AND tasks
|
||||
err = fmt.Errorf("No such task or service")
|
||||
}
|
||||
return err
|
||||
}
|
||||
maxLength = getMaxLength(task.Slot)
|
||||
responseBody, err = cli.TaskLogs(ctx, opts.target, options)
|
||||
} else {
|
||||
responseBody, err = cli.ServiceLogs(ctx, opts.target, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
|
||||
// if replicas are initialized, figure out if we need to pad them
|
||||
replicas := *service.Spec.Mode.Replicated.Replicas
|
||||
maxLength = getMaxLength(int(replicas))
|
||||
}
|
||||
}
|
||||
defer responseBody.Close()
|
||||
|
||||
var replicas uint64
|
||||
padding := 1
|
||||
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
|
||||
// if replicas are initialized, figure out if we need to pad them
|
||||
replicas = *service.Spec.Mode.Replicated.Replicas
|
||||
padding = len(strconv.FormatUint(replicas, 10))
|
||||
}
|
||||
|
||||
taskFormatter := newTaskFormatter(client, opts, padding)
|
||||
taskFormatter := newTaskFormatter(cli, opts, maxLength)
|
||||
|
||||
stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()}
|
||||
stderr := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Err()}
|
||||
|
@ -101,6 +122,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// getMaxLength gets the maximum length of the number in base 10
|
||||
func getMaxLength(i int) int {
|
||||
return len(strconv.FormatInt(int64(i), 10))
|
||||
}
|
||||
|
||||
type taskFormatter struct {
|
||||
client client.APIClient
|
||||
opts *logsOptions
|
||||
|
@ -148,7 +174,8 @@ func (f *taskFormatter) format(ctx context.Context, logCtx logContext) (string,
|
|||
taskName += fmt.Sprintf(".%s", stringid.TruncateID(task.ID))
|
||||
}
|
||||
}
|
||||
padding := strings.Repeat(" ", f.padding-len(strconv.FormatInt(int64(task.Slot), 10)))
|
||||
|
||||
padding := strings.Repeat(" ", f.padding-getMaxLength(task.Slot))
|
||||
formatted := fmt.Sprintf("%s@%s%s", taskName, nodeName, padding)
|
||||
f.cache[logCtx] = formatted
|
||||
return formatted, nil
|
||||
|
|
|
@ -128,6 +128,7 @@ type ServiceAPIClient interface {
|
|||
ServiceRemove(ctx context.Context, serviceID string) error
|
||||
ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error)
|
||||
ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
|
||||
TaskLogs(ctx context.Context, taskID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
|
||||
TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error)
|
||||
TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
|
||||
}
|
||||
|
|
52
client/task_logs.go
Normal file
52
client/task_logs.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
timetypes "github.com/docker/docker/api/types/time"
|
||||
)
|
||||
|
||||
// TaskLogs returns the logs generated by a task in an io.ReadCloser.
|
||||
// It's up to the caller to close the stream.
|
||||
func (cli *Client) TaskLogs(ctx context.Context, taskID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
|
||||
query := url.Values{}
|
||||
if options.ShowStdout {
|
||||
query.Set("stdout", "1")
|
||||
}
|
||||
|
||||
if options.ShowStderr {
|
||||
query.Set("stderr", "1")
|
||||
}
|
||||
|
||||
if options.Since != "" {
|
||||
ts, err := timetypes.GetTimestamp(options.Since, time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query.Set("since", ts)
|
||||
}
|
||||
|
||||
if options.Timestamps {
|
||||
query.Set("timestamps", "1")
|
||||
}
|
||||
|
||||
if options.Details {
|
||||
query.Set("details", "1")
|
||||
}
|
||||
|
||||
if options.Follow {
|
||||
query.Set("follow", "1")
|
||||
}
|
||||
query.Set("tail", options.Tail)
|
||||
|
||||
resp, err := cli.get(ctx, "/tasks/"+taskID+"/logs", query, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.body, nil
|
||||
}
|
|
@ -303,24 +303,32 @@ func (c *Cluster) RemoveService(input string) error {
|
|||
}
|
||||
|
||||
// ServiceLogs collects service logs and writes them back to `config.OutStream`
|
||||
func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
||||
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
||||
c.mu.RLock()
|
||||
defer func() {
|
||||
select {
|
||||
case <-started:
|
||||
// if we've started streaming logs, we are no longer holding the
|
||||
// lock and do not have to release it
|
||||
return
|
||||
default:
|
||||
// before we start, though, we're holding this lock and it needs to
|
||||
// be released
|
||||
c.mu.RUnlock()
|
||||
}
|
||||
}()
|
||||
state := c.currentNodeState()
|
||||
if !state.IsActiveManager() {
|
||||
c.mu.RUnlock()
|
||||
return c.errNoManager(state)
|
||||
}
|
||||
|
||||
service, err := getService(ctx, state.controlClient, input)
|
||||
swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector)
|
||||
if err != nil {
|
||||
c.mu.RUnlock()
|
||||
return err
|
||||
return errors.Wrap(err, "error making log selector")
|
||||
}
|
||||
container := service.Spec.Task.GetContainer()
|
||||
if container == nil {
|
||||
return errors.New("service logs only supported for container tasks")
|
||||
}
|
||||
if container.TTY {
|
||||
|
||||
// TODO(dperny) this goes away when we support TTY logs, which is in the works
|
||||
if tty {
|
||||
return errors.New("service logs not supported on tasks with a TTY attached")
|
||||
}
|
||||
|
||||
|
@ -335,7 +343,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
|
|||
|
||||
// Get tail value squared away - the number of previous log lines we look at
|
||||
var tail int64
|
||||
if config.Tail == "all" {
|
||||
if config.Tail == "all" || config.Tail == "" {
|
||||
// tail of 0 means send all logs on the swarmkit side
|
||||
tail = 0
|
||||
} else {
|
||||
|
@ -372,9 +380,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
|
|||
}
|
||||
|
||||
stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
|
||||
Selector: &swarmapi.LogSelector{
|
||||
ServiceIDs: []string{service.ID},
|
||||
},
|
||||
Selector: swarmSelector,
|
||||
Options: &swarmapi.LogSubscriptionOptions{
|
||||
Follow: config.Follow,
|
||||
Streams: stdStreams,
|
||||
|
@ -383,20 +389,26 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
c.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
wf := ioutils.NewWriteFlusher(config.OutStream)
|
||||
defer wf.Close()
|
||||
|
||||
// Release the lock before starting the stream.
|
||||
//
|
||||
// this feels like it could be racy because we would double unlock if we
|
||||
// somehow returned right after we unlocked but before we closed, but I do
|
||||
// not think such a thing is possible. i wish it were possible to atomically
|
||||
// close and unlock but that might be overkill. programming is hard.
|
||||
c.mu.RUnlock()
|
||||
close(started)
|
||||
|
||||
wf.Flush()
|
||||
|
||||
outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
|
||||
errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
|
||||
|
||||
// Release the lock before starting the stream.
|
||||
c.mu.RUnlock()
|
||||
for {
|
||||
// Check the context before doing anything.
|
||||
select {
|
||||
|
@ -442,6 +454,43 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
|
|||
}
|
||||
}
|
||||
|
||||
// convertSelector takes a backend.LogSelector, which contains raw names that
|
||||
// may or may not be valid, and converts them to an api.LogSelector proto. It
|
||||
// also returns a boolean, true if any of the services use a TTY (false
|
||||
// otherwise) and an error if something fails
|
||||
func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) {
|
||||
// if ANY tasks use a TTY, don't mux streams
|
||||
var tty bool
|
||||
// don't rely on swarmkit to resolve IDs, do it ourselves
|
||||
swarmSelector := &swarmapi.LogSelector{}
|
||||
for _, s := range selector.Services {
|
||||
service, err := getService(ctx, cc, s)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
c := service.Spec.Task.GetContainer()
|
||||
if c == nil {
|
||||
return nil, false, errors.New("logs only supported on container tasks")
|
||||
}
|
||||
// set TTY true if we have a TTY service, or if it's already true
|
||||
tty = tty || c.TTY
|
||||
swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
|
||||
}
|
||||
for _, t := range selector.Tasks {
|
||||
task, err := getTask(ctx, cc, t)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
c := task.Spec.GetContainer()
|
||||
if c == nil {
|
||||
return nil, false, errors.New("logs only supported on container tasks")
|
||||
}
|
||||
tty = tty || c.TTY
|
||||
swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
|
||||
}
|
||||
return swarmSelector, tty, nil
|
||||
}
|
||||
|
||||
// imageWithDigestString takes an image such as name or name:tag
|
||||
// and returns the image pinned to a digest, such as name@sha256:34234
|
||||
func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/docker/docker/integration-cli/checker"
|
||||
"github.com/docker/docker/integration-cli/daemon"
|
||||
icmd "github.com/docker/docker/pkg/testutil/cmd"
|
||||
"github.com/go-check/check"
|
||||
)
|
||||
|
||||
|
@ -56,10 +57,10 @@ func (s *DockerSwarmSuite) TestServiceLogs(c *check.C) {
|
|||
// output.
|
||||
func countLogLines(d *daemon.Swarm, name string) func(*check.C) (interface{}, check.CommentInterface) {
|
||||
return func(c *check.C) (interface{}, check.CommentInterface) {
|
||||
out, err := d.Cmd("service", "logs", "-t", name)
|
||||
c.Assert(err, checker.IsNil)
|
||||
lines := strings.Split(strings.TrimSpace(out), "\n")
|
||||
return len(lines), check.Commentf("output, %q", string(out))
|
||||
result := icmd.RunCmd(d.Command("service", "logs", "-t", name))
|
||||
result.Assert(c, icmd.Expected{})
|
||||
lines := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
|
||||
return len(lines), check.Commentf("output, %q", string(result.Stdout()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,7 +71,7 @@ func (s *DockerSwarmSuite) TestServiceLogsCompleteness(c *check.C) {
|
|||
name := "TestServiceLogsCompleteness"
|
||||
|
||||
// make a service that prints 6 lines
|
||||
out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "for line in $(seq 1 6); do echo log test $line; done; sleep 100000")
|
||||
out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "for line in $(seq 0 5); do echo log test $line; done; sleep 100000")
|
||||
c.Assert(err, checker.IsNil)
|
||||
c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
|
||||
|
||||
|
@ -79,22 +80,15 @@ func (s *DockerSwarmSuite) TestServiceLogsCompleteness(c *check.C) {
|
|||
// and make sure we have all the log lines
|
||||
waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
|
||||
|
||||
args := []string{"service", "logs", name}
|
||||
cmd := exec.Command(dockerBinary, d.PrependHostArg(args)...)
|
||||
r, w := io.Pipe()
|
||||
cmd.Stdout = w
|
||||
cmd.Stderr = w
|
||||
c.Assert(cmd.Start(), checker.IsNil)
|
||||
out, err = d.Cmd("service", "logs", name)
|
||||
c.Assert(err, checker.IsNil)
|
||||
lines := strings.Split(strings.TrimSpace(out), "\n")
|
||||
|
||||
reader := bufio.NewReader(r)
|
||||
// i have heard anecdotal reports that logs may come back from the engine
|
||||
// mis-ordered. if this tests fails, consider the possibility that that
|
||||
// might be occurring
|
||||
for i := 1; i <= 6; i++ {
|
||||
msg := &logMessage{}
|
||||
msg.data, _, msg.err = reader.ReadLine()
|
||||
c.Assert(msg.err, checker.IsNil)
|
||||
c.Assert(string(msg.data), checker.Contains, fmt.Sprintf("log test %v", i))
|
||||
for i, line := range lines {
|
||||
c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,21 +107,13 @@ func (s *DockerSwarmSuite) TestServiceLogsTail(c *check.C) {
|
|||
waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1)
|
||||
waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
|
||||
|
||||
args := []string{"service", "logs", "--tail=2", name}
|
||||
cmd := exec.Command(dockerBinary, d.PrependHostArg(args)...)
|
||||
r, w := io.Pipe()
|
||||
cmd.Stdout = w
|
||||
cmd.Stderr = w
|
||||
c.Assert(cmd.Start(), checker.IsNil)
|
||||
out, err = d.Cmd("service", "logs", "--tail=2", name)
|
||||
c.Assert(err, checker.IsNil)
|
||||
lines := strings.Split(strings.TrimSpace(out), "\n")
|
||||
|
||||
reader := bufio.NewReader(r)
|
||||
// see TestServiceLogsCompleteness for comments about logs being well-
|
||||
// ordered, if this flakes
|
||||
for i := 5; i <= 6; i++ {
|
||||
msg := &logMessage{}
|
||||
msg.data, _, msg.err = reader.ReadLine()
|
||||
c.Assert(msg.err, checker.IsNil)
|
||||
c.Assert(string(msg.data), checker.Contains, fmt.Sprintf("log test %v", i))
|
||||
for i, line := range lines {
|
||||
// doing i+5 is hacky but not too fragile, it's good enough. if it flakes something else is wrong
|
||||
c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i+5))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -213,3 +199,58 @@ func (s *DockerSwarmSuite) TestServiceLogsFollow(c *check.C) {
|
|||
|
||||
c.Assert(cmd.Process.Kill(), checker.IsNil)
|
||||
}
|
||||
|
||||
func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) {
|
||||
testRequires(c, ExperimentalDaemon)
|
||||
|
||||
d := s.AddDaemon(c, true, true)
|
||||
|
||||
name := "TestServicelogsTaskLogs"
|
||||
replicas := 2
|
||||
|
||||
result := icmd.RunCmd(d.Command(
|
||||
// create a service with the name
|
||||
"service", "create", "--name", name,
|
||||
// which has some number of replicas
|
||||
fmt.Sprintf("--replicas=%v", replicas),
|
||||
// which has this the task id as an environment variable templated in
|
||||
"--env", "TASK={{.Task.ID}}",
|
||||
// and runs this command to print exaclty 6 logs lines
|
||||
"busybox", "sh", "-c", "for line in $(seq 0 5); do echo $TASK log test $line; done; sleep 100000",
|
||||
))
|
||||
result.Assert(c, icmd.Expected{})
|
||||
// ^^ verify that we get no error
|
||||
// then verify that we have an id in stdout
|
||||
id := strings.TrimSpace(result.Stdout())
|
||||
c.Assert(id, checker.Not(checker.Equals), "")
|
||||
// so, right here, we're basically inspecting by id and returning only
|
||||
// the ID. if they don't match, the service doesn't exist.
|
||||
result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id))
|
||||
result.Assert(c, icmd.Expected{Out: id})
|
||||
|
||||
// make sure task has been deployed.
|
||||
waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, replicas)
|
||||
waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6*replicas)
|
||||
|
||||
// get the task ids
|
||||
result = icmd.RunCmd(d.Command("service", "ps", "-q", name))
|
||||
result.Assert(c, icmd.Expected{})
|
||||
// make sure we have two tasks
|
||||
taskIDs := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
|
||||
c.Assert(taskIDs, checker.HasLen, replicas)
|
||||
|
||||
for _, taskID := range taskIDs {
|
||||
c.Logf("checking task %v", taskID)
|
||||
result := icmd.RunCmd(d.Command("service", "logs", taskID))
|
||||
result.Assert(c, icmd.Expected{})
|
||||
lines := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
|
||||
|
||||
c.Logf("checking messages for %v", taskID)
|
||||
for i, line := range lines {
|
||||
// make sure the message is in order
|
||||
c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i))
|
||||
// make sure it contains the task id
|
||||
c.Assert(line, checker.Contains, taskID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue