浏览代码

Merge pull request #32015 from dperny/service-logs-support-task-logs

Add Support for Service Task Logs
Victor Vieux 8 年之前
父节点
当前提交
170be9c267

+ 1 - 1
api/server/router/swarm/backend.go

@@ -21,7 +21,7 @@ type Backend interface {
 	CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error)
 	CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error)
 	UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error)
 	UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error)
 	RemoveService(string) error
 	RemoveService(string) error
-	ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
+	ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error
 	GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
 	GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
 	GetNode(string) (types.Node, error)
 	GetNode(string) (types.Node, error)
 	UpdateNode(string, uint64, types.NodeSpec) error
 	UpdateNode(string, uint64, types.NodeSpec) error

+ 1 - 0
api/server/router/swarm/cluster.go

@@ -43,6 +43,7 @@ func (sr *swarmRouter) initRoutes() {
 		router.NewPostRoute("/nodes/{id}/update", sr.updateNode),
 		router.NewPostRoute("/nodes/{id}/update", sr.updateNode),
 		router.NewGetRoute("/tasks", sr.getTasks),
 		router.NewGetRoute("/tasks", sr.getTasks),
 		router.NewGetRoute("/tasks/{id}", sr.getTask),
 		router.NewGetRoute("/tasks/{id}", sr.getTask),
+		router.Experimental(router.Cancellable(router.NewGetRoute("/tasks/{id}/logs", sr.getTaskLogs))),
 		router.NewGetRoute("/secrets", sr.getSecrets),
 		router.NewGetRoute("/secrets", sr.getSecrets),
 		router.NewPostRoute("/secrets/create", sr.createSecret),
 		router.NewPostRoute("/secrets/create", sr.createSecret),
 		router.NewDeleteRoute("/secrets/{id}", sr.removeSecret),
 		router.NewDeleteRoute("/secrets/{id}", sr.removeSecret),

+ 16 - 43
api/server/router/swarm/cluster_routes.go

@@ -13,7 +13,6 @@ import (
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/filters"
 	types "github.com/docker/docker/api/types/swarm"
 	types "github.com/docker/docker/api/types/swarm"
-	"github.com/docker/docker/pkg/stdcopy"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
 
 
@@ -215,54 +214,28 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter,
 	return nil
 	return nil
 }
 }
 
 
-func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+func (sr *swarmRouter) getTaskLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if err := httputils.ParseForm(r); err != nil {
 	if err := httputils.ParseForm(r); err != nil {
 		return err
 		return err
 	}
 	}
 
 
-	// Args are validated before the stream starts because when it starts we're
-	// sending HTTP 200 by writing an empty chunk of data to tell the client that
-	// daemon is going to stream. By sending this initial HTTP 200 we can't report
-	// any error after the stream starts (i.e. container not found, wrong parameters)
-	// with the appropriate status code.
-	stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
-	if !(stdout || stderr) {
-		return fmt.Errorf("Bad parameters: you must choose at least one stream")
-	}
-
-	serviceName := vars["id"]
-	logsConfig := &backend.ContainerLogsConfig{
-		ContainerLogsOptions: basictypes.ContainerLogsOptions{
-			Follow:     httputils.BoolValue(r, "follow"),
-			Timestamps: httputils.BoolValue(r, "timestamps"),
-			Since:      r.Form.Get("since"),
-			Tail:       r.Form.Get("tail"),
-			ShowStdout: stdout,
-			ShowStderr: stderr,
-			Details:    httputils.BoolValue(r, "details"),
-		},
-		OutStream: w,
-	}
-
-	if logsConfig.Details {
-		return fmt.Errorf("Bad parameters: details is not currently supported")
-	}
-
-	chStarted := make(chan struct{})
-	if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil {
-		select {
-		case <-chStarted:
-			// The client may be expecting all of the data we're sending to
-			// be multiplexed, so send it through OutStream, which will
-			// have been set up to handle that if needed.
-			stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
-			fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
-		default:
-			return err
-		}
+	// make a selector to pass to the helper function
+	selector := &backend.LogSelector{
+		Tasks: []string{vars["id"]},
 	}
 	}
+	return sr.swarmLogs(ctx, w, r, selector)
+}
 
 
-	return nil
+func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if err := httputils.ParseForm(r); err != nil {
+		return err
+	}
+
+	// make a selector to pass to the helper function
+	selector := &backend.LogSelector{
+		Services: []string{vars["id"]},
+	}
+	return sr.swarmLogs(ctx, w, r, selector)
 }
 }
 
 
 func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

+ 55 - 0
api/server/router/swarm/helpers.go

@@ -0,0 +1,55 @@
+package swarm
+
+import (
+	"fmt"
+	"net/http"
+
+	"github.com/docker/docker/api/server/httputils"
+	basictypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
+	"github.com/docker/docker/pkg/stdcopy"
+	"golang.org/x/net/context"
+)
+
+// swarmLogs takes an http response, request, and selector, and writes the logs
+// specified by the selector to the response
+func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, selector *backend.LogSelector) error {
+	// Args are validated before the stream starts because when it starts we're
+	// sending HTTP 200 by writing an empty chunk of data to tell the client that
+	// daemon is going to stream. By sending this initial HTTP 200 we can't report
+	// any error after the stream starts (i.e. container not found, wrong parameters)
+	// with the appropriate status code.
+	stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
+	if !(stdout || stderr) {
+		return fmt.Errorf("Bad parameters: you must choose at least one stream")
+	}
+
+	logsConfig := &backend.ContainerLogsConfig{
+		ContainerLogsOptions: basictypes.ContainerLogsOptions{
+			Follow:     httputils.BoolValue(r, "follow"),
+			Timestamps: httputils.BoolValue(r, "timestamps"),
+			Since:      r.Form.Get("since"),
+			Tail:       r.Form.Get("tail"),
+			ShowStdout: stdout,
+			ShowStderr: stderr,
+			Details:    httputils.BoolValue(r, "details"),
+		},
+		OutStream: w,
+	}
+
+	chStarted := make(chan struct{})
+	if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil {
+		select {
+		case <-chStarted:
+			// The client may be expecting all of the data we're sending to
+			// be multiplexed, so send it through OutStream, which will
+			// have been set up to handle that if needed.
+			stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
+			fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
+		default:
+			return err
+		}
+	}
+
+	return nil
+}

+ 80 - 0
api/swagger.yaml

@@ -7948,6 +7948,86 @@ paths:
           required: true
           required: true
           type: "string"
           type: "string"
       tags: ["Task"]
       tags: ["Task"]
+  /tasks/{id}/logs:
+    get:
+      summary: "Get task logs"
+      description: |
+        Get `stdout` and `stderr` logs from a task.
+
+        **Note**: This endpoint works only for services with the `json-file` or `journald` logging drivers.
+      operationId: "TaskLogs"
+      produces:
+        - "application/vnd.docker.raw-stream"
+        - "application/json"
+      responses:
+        101:
+          description: "logs returned as a stream"
+          schema:
+            type: "string"
+            format: "binary"
+        200:
+          description: "logs returned as a string in response body"
+          schema:
+            type: "string"
+        404:
+          description: "no such task"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+          examples:
+            application/json:
+              message: "No such task: c2ada9df5af8"
+        500:
+          description: "server error"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+        503:
+          description: "node is not part of a swarm"
+          schema:
+            $ref: "#/definitions/ErrorResponse"
+      parameters:
+        - name: "id"
+          in: "path"
+          required: true
+          description: "ID of the task"
+          type: "string"
+        - name: "details"
+          in: "query"
+          description: "Show extra details provided to logs."
+          type: "boolean"
+          default: false
+        - name: "follow"
+          in: "query"
+          description: |
+            Return the logs as a stream.
+
+            This will return a `101` HTTP response with a `Connection: upgrade` header, then hijack the HTTP connection to send raw output. For more information about hijacking and the stream format, [see the documentation for the attach endpoint](#operation/ContainerAttach).
+          type: "boolean"
+          default: false
+        - name: "stdout"
+          in: "query"
+          description: "Return logs from `stdout`"
+          type: "boolean"
+          default: false
+        - name: "stderr"
+          in: "query"
+          description: "Return logs from `stderr`"
+          type: "boolean"
+          default: false
+        - name: "since"
+          in: "query"
+          description: "Only return logs since this time, as a UNIX timestamp"
+          type: "integer"
+          default: 0
+        - name: "timestamps"
+          in: "query"
+          description: "Add timestamps to every log line"
+          type: "boolean"
+          default: false
+        - name: "tail"
+          in: "query"
+          description: "Only return this number of log lines from the end of the logs. Specify as an integer or `all` to output all log lines."
+          type: "string"
+          default: "all"
   /secrets:
   /secrets:
     get:
     get:
       summary: "List secrets"
       summary: "List secrets"

+ 10 - 0
api/types/backend/backend.go

@@ -32,6 +32,16 @@ type ContainerLogsConfig struct {
 	OutStream io.Writer
 	OutStream io.Writer
 }
 }
 
 
+// LogSelector is a list of services and tasks that should be returned as part
+// of a log stream. It is similar to swarmapi.LogSelector, with the difference
+// that the names don't have to be resolved to IDs; this is mostly to avoid
+// accidents later where a swarmapi LogSelector might have been incorrectly
+// used verbatim (and to avoid the handler having to import swarmapi types)
+type LogSelector struct {
+	Services []string
+	Tasks    []string
+}
+
 // ContainerStatsConfig holds information for configuring the runtime
 // ContainerStatsConfig holds information for configuring the runtime
 // behavior of a backend.ContainerStats() call.
 // behavior of a backend.ContainerStats() call.
 type ContainerStatsConfig struct {
 type ContainerStatsConfig struct {

+ 48 - 21
cli/command/service/logs.go

@@ -30,9 +30,14 @@ type logsOptions struct {
 	timestamps bool
 	timestamps bool
 	tail       string
 	tail       string
 
 
-	service string
+	target string
 }
 }
 
 
+// TODO(dperny) the whole CLI for this is kind of a mess IMHOIRL and it needs
+// to be refactored agressively. There may be changes to the implementation of
+// details, which will be need to be reflected in this code. The refactoring
+// should be put off until we make those changes, tho, because I think the
+// decisions made WRT details will impact the design of the CLI.
 func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
 func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
 	var opts logsOptions
 	var opts logsOptions
 
 
@@ -41,16 +46,16 @@ func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
 		Short: "Fetch the logs of a service",
 		Short: "Fetch the logs of a service",
 		Args:  cli.ExactArgs(1),
 		Args:  cli.ExactArgs(1),
 		RunE: func(cmd *cobra.Command, args []string) error {
 		RunE: func(cmd *cobra.Command, args []string) error {
-			opts.service = args[0]
+			opts.target = args[0]
 			return runLogs(dockerCli, &opts)
 			return runLogs(dockerCli, &opts)
 		},
 		},
 		Tags: map[string]string{"experimental": ""},
 		Tags: map[string]string{"experimental": ""},
 	}
 	}
 
 
 	flags := cmd.Flags()
 	flags := cmd.Flags()
-	flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names")
+	flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names in output")
 	flags.BoolVar(&opts.noTrunc, "no-trunc", false, "Do not truncate output")
 	flags.BoolVar(&opts.noTrunc, "no-trunc", false, "Do not truncate output")
-	flags.BoolVar(&opts.noTaskIDs, "no-task-ids", false, "Do not include task IDs")
+	flags.BoolVar(&opts.noTaskIDs, "no-task-ids", false, "Do not include task IDs in output")
 	flags.BoolVarP(&opts.follow, "follow", "f", false, "Follow log output")
 	flags.BoolVarP(&opts.follow, "follow", "f", false, "Follow log output")
 	flags.StringVar(&opts.since, "since", "", "Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)")
 	flags.StringVar(&opts.since, "since", "", "Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)")
 	flags.BoolVarP(&opts.timestamps, "timestamps", "t", false, "Show timestamps")
 	flags.BoolVarP(&opts.timestamps, "timestamps", "t", false, "Show timestamps")
@@ -70,28 +75,44 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 		Tail:       opts.tail,
 		Tail:       opts.tail,
 	}
 	}
 
 
-	client := dockerCli.Client()
+	cli := dockerCli.Client()
 
 
-	service, _, err := client.ServiceInspectWithRaw(ctx, opts.service)
-	if err != nil {
-		return err
-	}
+	var (
+		maxLength    = 1
+		responseBody io.ReadCloser
+	)
 
 
-	responseBody, err := client.ServiceLogs(ctx, opts.service, options)
+	service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target)
 	if err != nil {
 	if err != nil {
-		return err
+		// if it's any error other than service not found, it's Real
+		if !client.IsErrServiceNotFound(err) {
+			return err
+		}
+		task, _, err := cli.TaskInspectWithRaw(ctx, opts.target)
+		if err != nil {
+			if client.IsErrTaskNotFound(err) {
+				// if the task ALSO isn't found, rewrite the error to be clear
+				// that we looked for services AND tasks
+				err = fmt.Errorf("No such task or service")
+			}
+			return err
+		}
+		maxLength = getMaxLength(task.Slot)
+		responseBody, err = cli.TaskLogs(ctx, opts.target, options)
+	} else {
+		responseBody, err = cli.ServiceLogs(ctx, opts.target, options)
+		if err != nil {
+			return err
+		}
+		if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
+			// if replicas are initialized, figure out if we need to pad them
+			replicas := *service.Spec.Mode.Replicated.Replicas
+			maxLength = getMaxLength(int(replicas))
+		}
 	}
 	}
 	defer responseBody.Close()
 	defer responseBody.Close()
 
 
-	var replicas uint64
-	padding := 1
-	if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
-		// if replicas are initialized, figure out if we need to pad them
-		replicas = *service.Spec.Mode.Replicated.Replicas
-		padding = len(strconv.FormatUint(replicas, 10))
-	}
-
-	taskFormatter := newTaskFormatter(client, opts, padding)
+	taskFormatter := newTaskFormatter(cli, opts, maxLength)
 
 
 	stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()}
 	stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()}
 	stderr := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Err()}
 	stderr := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Err()}
@@ -101,6 +122,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 	return err
 	return err
 }
 }
 
 
+// getMaxLength gets the maximum length of the number in base 10
+func getMaxLength(i int) int {
+	return len(strconv.FormatInt(int64(i), 10))
+}
+
 type taskFormatter struct {
 type taskFormatter struct {
 	client  client.APIClient
 	client  client.APIClient
 	opts    *logsOptions
 	opts    *logsOptions
@@ -148,7 +174,8 @@ func (f *taskFormatter) format(ctx context.Context, logCtx logContext) (string,
 			taskName += fmt.Sprintf(".%s", stringid.TruncateID(task.ID))
 			taskName += fmt.Sprintf(".%s", stringid.TruncateID(task.ID))
 		}
 		}
 	}
 	}
-	padding := strings.Repeat(" ", f.padding-len(strconv.FormatInt(int64(task.Slot), 10)))
+
+	padding := strings.Repeat(" ", f.padding-getMaxLength(task.Slot))
 	formatted := fmt.Sprintf("%s@%s%s", taskName, nodeName, padding)
 	formatted := fmt.Sprintf("%s@%s%s", taskName, nodeName, padding)
 	f.cache[logCtx] = formatted
 	f.cache[logCtx] = formatted
 	return formatted, nil
 	return formatted, nil

+ 1 - 0
client/interface.go

@@ -128,6 +128,7 @@ type ServiceAPIClient interface {
 	ServiceRemove(ctx context.Context, serviceID string) error
 	ServiceRemove(ctx context.Context, serviceID string) error
 	ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error)
 	ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error)
 	ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
 	ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
+	TaskLogs(ctx context.Context, taskID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
 	TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error)
 	TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error)
 	TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
 	TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
 }
 }

+ 52 - 0
client/task_logs.go

@@ -0,0 +1,52 @@
+package client
+
+import (
+	"io"
+	"net/url"
+	"time"
+
+	"golang.org/x/net/context"
+
+	"github.com/docker/docker/api/types"
+	timetypes "github.com/docker/docker/api/types/time"
+)
+
+// TaskLogs returns the logs generated by a task in an io.ReadCloser.
+// It's up to the caller to close the stream.
+func (cli *Client) TaskLogs(ctx context.Context, taskID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
+	query := url.Values{}
+	if options.ShowStdout {
+		query.Set("stdout", "1")
+	}
+
+	if options.ShowStderr {
+		query.Set("stderr", "1")
+	}
+
+	if options.Since != "" {
+		ts, err := timetypes.GetTimestamp(options.Since, time.Now())
+		if err != nil {
+			return nil, err
+		}
+		query.Set("since", ts)
+	}
+
+	if options.Timestamps {
+		query.Set("timestamps", "1")
+	}
+
+	if options.Details {
+		query.Set("details", "1")
+	}
+
+	if options.Follow {
+		query.Set("follow", "1")
+	}
+	query.Set("tail", options.Tail)
+
+	resp, err := cli.get(ctx, "/tasks/"+taskID+"/logs", query, nil)
+	if err != nil {
+		return nil, err
+	}
+	return resp.body, nil
+}

+ 66 - 17
daemon/cluster/services.go

@@ -303,24 +303,32 @@ func (c *Cluster) RemoveService(input string) error {
 }
 }
 
 
 // ServiceLogs collects service logs and writes them back to `config.OutStream`
 // ServiceLogs collects service logs and writes them back to `config.OutStream`
-func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
+func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error {
 	c.mu.RLock()
 	c.mu.RLock()
+	defer func() {
+		select {
+		case <-started:
+			// if we've started streaming logs, we are no longer holding the
+			// lock and do not have to release it
+			return
+		default:
+			// before we start, though, we're holding this lock and it needs to
+			// be released
+			c.mu.RUnlock()
+		}
+	}()
 	state := c.currentNodeState()
 	state := c.currentNodeState()
 	if !state.IsActiveManager() {
 	if !state.IsActiveManager() {
-		c.mu.RUnlock()
 		return c.errNoManager(state)
 		return c.errNoManager(state)
 	}
 	}
 
 
-	service, err := getService(ctx, state.controlClient, input)
+	swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector)
 	if err != nil {
 	if err != nil {
-		c.mu.RUnlock()
-		return err
-	}
-	container := service.Spec.Task.GetContainer()
-	if container == nil {
-		return errors.New("service logs only supported for container tasks")
+		return errors.Wrap(err, "error making log selector")
 	}
 	}
-	if container.TTY {
+
+	// TODO(dperny) this goes away when we support TTY logs, which is in the works
+	if tty {
 		return errors.New("service logs not supported on tasks with a TTY attached")
 		return errors.New("service logs not supported on tasks with a TTY attached")
 	}
 	}
 
 
@@ -335,7 +343,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
 
 
 	// Get tail value squared away - the number of previous log lines we look at
 	// Get tail value squared away - the number of previous log lines we look at
 	var tail int64
 	var tail int64
-	if config.Tail == "all" {
+	if config.Tail == "all" || config.Tail == "" {
 		// tail of 0 means send all logs on the swarmkit side
 		// tail of 0 means send all logs on the swarmkit side
 		tail = 0
 		tail = 0
 	} else {
 	} else {
@@ -372,9 +380,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
 	}
 	}
 
 
 	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
 	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
-		Selector: &swarmapi.LogSelector{
-			ServiceIDs: []string{service.ID},
-		},
+		Selector: swarmSelector,
 		Options: &swarmapi.LogSubscriptionOptions{
 		Options: &swarmapi.LogSubscriptionOptions{
 			Follow:  config.Follow,
 			Follow:  config.Follow,
 			Streams: stdStreams,
 			Streams: stdStreams,
@@ -383,20 +389,26 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
 		},
 		},
 	})
 	})
 	if err != nil {
 	if err != nil {
-		c.mu.RUnlock()
 		return err
 		return err
 	}
 	}
 
 
 	wf := ioutils.NewWriteFlusher(config.OutStream)
 	wf := ioutils.NewWriteFlusher(config.OutStream)
 	defer wf.Close()
 	defer wf.Close()
+
+	// Release the lock before starting the stream.
+	//
+	// this feels like it could be racy because we would double unlock if we
+	// somehow returned right after we unlocked but before we closed, but I do
+	// not think such a thing is possible. i wish it were possible to atomically
+	// close and unlock but that might be overkill. programming is hard.
+	c.mu.RUnlock()
 	close(started)
 	close(started)
+
 	wf.Flush()
 	wf.Flush()
 
 
 	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
 	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
 	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
 	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
 
 
-	// Release the lock before starting the stream.
-	c.mu.RUnlock()
 	for {
 	for {
 		// Check the context before doing anything.
 		// Check the context before doing anything.
 		select {
 		select {
@@ -442,6 +454,43 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend
 	}
 	}
 }
 }
 
 
+// convertSelector takes a backend.LogSelector, which contains raw names that
+// may or may not be valid, and converts them to an api.LogSelector proto. It
+// also returns a boolean, true if any of the services use a TTY (false
+// otherwise) and an error if something fails
+func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) {
+	// if ANY tasks use a TTY, don't mux streams
+	var tty bool
+	// don't rely on swarmkit to resolve IDs, do it ourselves
+	swarmSelector := &swarmapi.LogSelector{}
+	for _, s := range selector.Services {
+		service, err := getService(ctx, cc, s)
+		if err != nil {
+			return nil, false, err
+		}
+		c := service.Spec.Task.GetContainer()
+		if c == nil {
+			return nil, false, errors.New("logs only supported on container tasks")
+		}
+		// set TTY true if we have a TTY service, or if it's already true
+		tty = tty || c.TTY
+		swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
+	}
+	for _, t := range selector.Tasks {
+		task, err := getTask(ctx, cc, t)
+		if err != nil {
+			return nil, false, err
+		}
+		c := task.Spec.GetContainer()
+		if c == nil {
+			return nil, false, errors.New("logs only supported on container tasks")
+		}
+		tty = tty || c.TTY
+		swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
+	}
+	return swarmSelector, tty, nil
+}
+
 // imageWithDigestString takes an image such as name or name:tag
 // imageWithDigestString takes an image such as name or name:tag
 // and returns the image pinned to a digest, such as name@sha256:34234
 // and returns the image pinned to a digest, such as name@sha256:34234
 func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
 func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {

+ 72 - 31
integration-cli/docker_cli_service_logs_experimental_test.go

@@ -12,6 +12,7 @@ import (
 
 
 	"github.com/docker/docker/integration-cli/checker"
 	"github.com/docker/docker/integration-cli/checker"
 	"github.com/docker/docker/integration-cli/daemon"
 	"github.com/docker/docker/integration-cli/daemon"
+	icmd "github.com/docker/docker/pkg/testutil/cmd"
 	"github.com/go-check/check"
 	"github.com/go-check/check"
 )
 )
 
 
@@ -56,10 +57,10 @@ func (s *DockerSwarmSuite) TestServiceLogs(c *check.C) {
 // output.
 // output.
 func countLogLines(d *daemon.Swarm, name string) func(*check.C) (interface{}, check.CommentInterface) {
 func countLogLines(d *daemon.Swarm, name string) func(*check.C) (interface{}, check.CommentInterface) {
 	return func(c *check.C) (interface{}, check.CommentInterface) {
 	return func(c *check.C) (interface{}, check.CommentInterface) {
-		out, err := d.Cmd("service", "logs", "-t", name)
-		c.Assert(err, checker.IsNil)
-		lines := strings.Split(strings.TrimSpace(out), "\n")
-		return len(lines), check.Commentf("output, %q", string(out))
+		result := icmd.RunCmd(d.Command("service", "logs", "-t", name))
+		result.Assert(c, icmd.Expected{})
+		lines := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
+		return len(lines), check.Commentf("output, %q", string(result.Stdout()))
 	}
 	}
 }
 }
 
 
@@ -70,7 +71,7 @@ func (s *DockerSwarmSuite) TestServiceLogsCompleteness(c *check.C) {
 	name := "TestServiceLogsCompleteness"
 	name := "TestServiceLogsCompleteness"
 
 
 	// make a service that prints 6 lines
 	// make a service that prints 6 lines
-	out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "for line in $(seq 1 6); do echo log test $line; done; sleep 100000")
+	out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "for line in $(seq 0 5); do echo log test $line; done; sleep 100000")
 	c.Assert(err, checker.IsNil)
 	c.Assert(err, checker.IsNil)
 	c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
 	c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
 
 
@@ -79,22 +80,15 @@ func (s *DockerSwarmSuite) TestServiceLogsCompleteness(c *check.C) {
 	// and make sure we have all the log lines
 	// and make sure we have all the log lines
 	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
 	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
 
 
-	args := []string{"service", "logs", name}
-	cmd := exec.Command(dockerBinary, d.PrependHostArg(args)...)
-	r, w := io.Pipe()
-	cmd.Stdout = w
-	cmd.Stderr = w
-	c.Assert(cmd.Start(), checker.IsNil)
+	out, err = d.Cmd("service", "logs", name)
+	c.Assert(err, checker.IsNil)
+	lines := strings.Split(strings.TrimSpace(out), "\n")
 
 
-	reader := bufio.NewReader(r)
 	// i have heard anecdotal reports that logs may come back from the engine
 	// i have heard anecdotal reports that logs may come back from the engine
 	// mis-ordered. if this tests fails, consider the possibility that that
 	// mis-ordered. if this tests fails, consider the possibility that that
 	// might be occurring
 	// might be occurring
-	for i := 1; i <= 6; i++ {
-		msg := &logMessage{}
-		msg.data, _, msg.err = reader.ReadLine()
-		c.Assert(msg.err, checker.IsNil)
-		c.Assert(string(msg.data), checker.Contains, fmt.Sprintf("log test %v", i))
+	for i, line := range lines {
+		c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i))
 	}
 	}
 }
 }
 
 
@@ -113,21 +107,13 @@ func (s *DockerSwarmSuite) TestServiceLogsTail(c *check.C) {
 	waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1)
 	waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1)
 	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
 	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6)
 
 
-	args := []string{"service", "logs", "--tail=2", name}
-	cmd := exec.Command(dockerBinary, d.PrependHostArg(args)...)
-	r, w := io.Pipe()
-	cmd.Stdout = w
-	cmd.Stderr = w
-	c.Assert(cmd.Start(), checker.IsNil)
+	out, err = d.Cmd("service", "logs", "--tail=2", name)
+	c.Assert(err, checker.IsNil)
+	lines := strings.Split(strings.TrimSpace(out), "\n")
 
 
-	reader := bufio.NewReader(r)
-	// see TestServiceLogsCompleteness for comments about logs being well-
-	// ordered, if this flakes
-	for i := 5; i <= 6; i++ {
-		msg := &logMessage{}
-		msg.data, _, msg.err = reader.ReadLine()
-		c.Assert(msg.err, checker.IsNil)
-		c.Assert(string(msg.data), checker.Contains, fmt.Sprintf("log test %v", i))
+	for i, line := range lines {
+		// doing i+5 is hacky but not too fragile, it's good enough. if it flakes something else is wrong
+		c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i+5))
 	}
 	}
 }
 }
 
 
@@ -213,3 +199,58 @@ func (s *DockerSwarmSuite) TestServiceLogsFollow(c *check.C) {
 
 
 	c.Assert(cmd.Process.Kill(), checker.IsNil)
 	c.Assert(cmd.Process.Kill(), checker.IsNil)
 }
 }
+
+func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) {
+	testRequires(c, ExperimentalDaemon)
+
+	d := s.AddDaemon(c, true, true)
+
+	name := "TestServicelogsTaskLogs"
+	replicas := 2
+
+	result := icmd.RunCmd(d.Command(
+		// create a service with the name
+		"service", "create", "--name", name,
+		// which has some number of replicas
+		fmt.Sprintf("--replicas=%v", replicas),
+		// which has this the task id as an environment variable templated in
+		"--env", "TASK={{.Task.ID}}",
+		// and runs this command to print exaclty 6 logs lines
+		"busybox", "sh", "-c", "for line in $(seq 0 5); do echo $TASK log test $line; done; sleep 100000",
+	))
+	result.Assert(c, icmd.Expected{})
+	// ^^ verify that we get no error
+	// then verify that we have an id in stdout
+	id := strings.TrimSpace(result.Stdout())
+	c.Assert(id, checker.Not(checker.Equals), "")
+	// so, right here, we're basically inspecting by id and returning only
+	// the ID. if they don't match, the service doesn't exist.
+	result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id))
+	result.Assert(c, icmd.Expected{Out: id})
+
+	// make sure task has been deployed.
+	waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, replicas)
+	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 6*replicas)
+
+	// get the task ids
+	result = icmd.RunCmd(d.Command("service", "ps", "-q", name))
+	result.Assert(c, icmd.Expected{})
+	// make sure we have two tasks
+	taskIDs := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
+	c.Assert(taskIDs, checker.HasLen, replicas)
+
+	for _, taskID := range taskIDs {
+		c.Logf("checking task %v", taskID)
+		result := icmd.RunCmd(d.Command("service", "logs", taskID))
+		result.Assert(c, icmd.Expected{})
+		lines := strings.Split(strings.TrimSpace(result.Stdout()), "\n")
+
+		c.Logf("checking messages for %v", taskID)
+		for i, line := range lines {
+			// make sure the message is in order
+			c.Assert(line, checker.Contains, fmt.Sprintf("log test %v", i))
+			// make sure it contains the task id
+			c.Assert(line, checker.Contains, taskID)
+		}
+	}
+}