Sfoglia il codice sorgente

Merge pull request #28089 from aluzzardi/service-logs

service logs
Andrea Luzzardi 8 anni fa
parent
commit
5f9fe54b35

+ 3 - 0
api/server/router/swarm/backend.go

@@ -2,7 +2,9 @@ package swarm
 
 import (
 	basictypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
 	types "github.com/docker/docker/api/types/swarm"
+	"golang.org/x/net/context"
 )
 
 // Backend abstracts an swarm commands manager.
@@ -19,6 +21,7 @@ type Backend interface {
 	CreateService(types.ServiceSpec, string) (string, error)
 	UpdateService(string, uint64, types.ServiceSpec, string, string) error
 	RemoveService(string) error
+	ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
 	GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
 	GetNode(string) (types.Node, error)
 	UpdateNode(string, uint64, types.NodeSpec) error

+ 24 - 12
api/server/router/swarm/cluster.go

@@ -1,6 +1,9 @@
 package swarm
 
-import "github.com/docker/docker/api/server/router"
+import (
+	"github.com/docker/docker/api/server/router"
+	"github.com/docker/docker/daemon"
+)
 
 // buildRouter is a router to talk with the build controller
 type swarmRouter struct {
@@ -9,11 +12,14 @@ type swarmRouter struct {
 }
 
 // NewRouter initializes a new build router
-func NewRouter(b Backend) router.Router {
+func NewRouter(d *daemon.Daemon, b Backend) router.Router {
 	r := &swarmRouter{
 		backend: b,
 	}
 	r.initRoutes()
+	if d.HasExperimental() {
+		r.addExperimentalRoutes()
+	}
 	return r
 }
 
@@ -22,6 +28,12 @@ func (sr *swarmRouter) Routes() []router.Route {
 	return sr.routes
 }
 
+func (sr *swarmRouter) addExperimentalRoutes() {
+	sr.routes = append(sr.routes,
+		router.Cancellable(router.NewGetRoute("/services/{id}/logs", sr.getServiceLogs)),
+	)
+}
+
 func (sr *swarmRouter) initRoutes() {
 	sr.routes = []router.Route{
 		router.NewPostRoute("/swarm/init", sr.initCluster),
@@ -32,20 +44,20 @@ func (sr *swarmRouter) initRoutes() {
 		router.NewPostRoute("/swarm/update", sr.updateCluster),
 		router.NewPostRoute("/swarm/unlock", sr.unlockCluster),
 		router.NewGetRoute("/services", sr.getServices),
-		router.NewGetRoute("/services/{id:.*}", sr.getService),
+		router.NewGetRoute("/services/{id}", sr.getService),
 		router.NewPostRoute("/services/create", sr.createService),
-		router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
-		router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
+		router.NewPostRoute("/services/{id}/update", sr.updateService),
+		router.NewDeleteRoute("/services/{id}", sr.removeService),
 		router.NewGetRoute("/nodes", sr.getNodes),
-		router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
-		router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
-		router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
+		router.NewGetRoute("/nodes/{id}", sr.getNode),
+		router.NewDeleteRoute("/nodes/{id}", sr.removeNode),
+		router.NewPostRoute("/nodes/{id}/update", sr.updateNode),
 		router.NewGetRoute("/tasks", sr.getTasks),
-		router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
+		router.NewGetRoute("/tasks/{id}", sr.getTask),
 		router.NewGetRoute("/secrets", sr.getSecrets),
 		router.NewPostRoute("/secrets", sr.createSecret),
-		router.NewDeleteRoute("/secrets/{id:.*}", sr.removeSecret),
-		router.NewGetRoute("/secrets/{id:.*}", sr.getSecret),
-		router.NewPostRoute("/secrets/{id:.*}/update", sr.updateSecret),
+		router.NewDeleteRoute("/secrets/{id}", sr.removeSecret),
+		router.NewGetRoute("/secrets/{id}", sr.getSecret),
+		router.NewPostRoute("/secrets/{id}/update", sr.updateSecret),
 	}
 }

+ 54 - 0
api/server/router/swarm/cluster_routes.go

@@ -10,6 +10,7 @@ import (
 	"github.com/docker/docker/api/errors"
 	"github.com/docker/docker/api/server/httputils"
 	basictypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/filters"
 	types "github.com/docker/docker/api/types/swarm"
 	"golang.org/x/net/context"
@@ -208,6 +209,59 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter,
 	return nil
 }
 
+func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if err := httputils.ParseForm(r); err != nil {
+		return err
+	}
+
+	// Args are validated before the stream starts because when it starts we're
+	// sending HTTP 200 by writing an empty chunk of data to tell the client that
+	// daemon is going to stream. By sending this initial HTTP 200 we can't report
+	// any error after the stream starts (i.e. container not found, wrong parameters)
+	// with the appropriate status code.
+	stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
+	if !(stdout || stderr) {
+		return fmt.Errorf("Bad parameters: you must choose at least one stream")
+	}
+
+	serviceName := vars["id"]
+	logsConfig := &backend.ContainerLogsConfig{
+		ContainerLogsOptions: basictypes.ContainerLogsOptions{
+			Follow:     httputils.BoolValue(r, "follow"),
+			Timestamps: httputils.BoolValue(r, "timestamps"),
+			Since:      r.Form.Get("since"),
+			Tail:       r.Form.Get("tail"),
+			ShowStdout: stdout,
+			ShowStderr: stderr,
+			Details:    httputils.BoolValue(r, "details"),
+		},
+		OutStream: w,
+	}
+
+	if !logsConfig.Follow {
+		return fmt.Errorf("Bad parameters: Only follow mode is currently supported")
+	}
+
+	if logsConfig.Details {
+		return fmt.Errorf("Bad parameters: details is not currently supported")
+	}
+
+	chStarted := make(chan struct{})
+	if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil {
+		select {
+		case <-chStarted:
+			// The client may be expecting all of the data we're sending to
+			// be multiplexed, so send it through OutStream, which will
+			// have been set up to handle that if needed.
+			fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err)
+		default:
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if err := httputils.ParseForm(r); err != nil {
 		return err

+ 1 - 0
cli/command/service/cmd.go

@@ -26,6 +26,7 @@ func NewServiceCommand(dockerCli *command.DockerCli) *cobra.Command {
 		newRemoveCommand(dockerCli),
 		newScaleCommand(dockerCli),
 		newUpdateCommand(dockerCli),
+		newLogsCommand(dockerCli),
 	)
 	return cmd
 }

+ 163 - 0
cli/command/service/logs.go

@@ -0,0 +1,163 @@
+package service
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"strings"
+
+	"golang.org/x/net/context"
+
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/swarm"
+	"github.com/docker/docker/cli"
+	"github.com/docker/docker/cli/command"
+	"github.com/docker/docker/cli/command/idresolver"
+	"github.com/docker/docker/pkg/stdcopy"
+	"github.com/spf13/cobra"
+)
+
+type logsOptions struct {
+	noResolve  bool
+	follow     bool
+	since      string
+	timestamps bool
+	details    bool
+	tail       string
+
+	service string
+}
+
+func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command {
+	var opts logsOptions
+
+	cmd := &cobra.Command{
+		Use:   "logs [OPTIONS] SERVICE",
+		Short: "Fetch the logs of a service",
+		Args:  cli.ExactArgs(1),
+		RunE: func(cmd *cobra.Command, args []string) error {
+			opts.service = args[0]
+			return runLogs(dockerCli, &opts)
+		},
+		Tags: map[string]string{"experimental": ""},
+	}
+
+	flags := cmd.Flags()
+	flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names")
+	flags.BoolVarP(&opts.follow, "follow", "f", false, "Follow log output")
+	flags.StringVar(&opts.since, "since", "", "Show logs since timestamp")
+	flags.BoolVarP(&opts.timestamps, "timestamps", "t", false, "Show timestamps")
+	flags.BoolVar(&opts.details, "details", false, "Show extra details provided to logs")
+	flags.StringVar(&opts.tail, "tail", "all", "Number of lines to show from the end of the logs")
+	return cmd
+}
+
+func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
+	ctx := context.Background()
+
+	options := types.ContainerLogsOptions{
+		ShowStdout: true,
+		ShowStderr: true,
+		Since:      opts.since,
+		Timestamps: opts.timestamps,
+		Follow:     opts.follow,
+		Tail:       opts.tail,
+		Details:    opts.details,
+	}
+
+	client := dockerCli.Client()
+	responseBody, err := client.ServiceLogs(ctx, opts.service, options)
+	if err != nil {
+		return err
+	}
+	defer responseBody.Close()
+
+	resolver := idresolver.New(client, opts.noResolve)
+
+	stdout := &logWriter{ctx: ctx, opts: opts, r: resolver, w: dockerCli.Out()}
+	stderr := &logWriter{ctx: ctx, opts: opts, r: resolver, w: dockerCli.Err()}
+
+	// TODO(aluzzardi): Do an io.Copy for services with TTY enabled.
+	_, err = stdcopy.StdCopy(stdout, stderr, responseBody)
+	return err
+}
+
+type logWriter struct {
+	ctx  context.Context
+	opts *logsOptions
+	r    *idresolver.IDResolver
+	w    io.Writer
+}
+
+func (lw *logWriter) Write(buf []byte) (int, error) {
+	contextIndex := 0
+	numParts := 2
+	if lw.opts.timestamps {
+		contextIndex++
+		numParts++
+	}
+
+	parts := bytes.SplitN(buf, []byte(" "), numParts)
+	if len(parts) != numParts {
+		return 0, fmt.Errorf("invalid context in log message: %v", string(buf))
+	}
+
+	taskName, nodeName, err := lw.parseContext(string(parts[contextIndex]))
+	if err != nil {
+		return 0, err
+	}
+
+	output := []byte{}
+	for i, part := range parts {
+		// First part doesn't get space separation.
+		if i > 0 {
+			output = append(output, []byte(" ")...)
+		}
+
+		if i == contextIndex {
+			// TODO(aluzzardi): Consider constant padding.
+			output = append(output, []byte(fmt.Sprintf("%s@%s    |", taskName, nodeName))...)
+		} else {
+			output = append(output, part...)
+		}
+	}
+	_, err = lw.w.Write(output)
+	if err != nil {
+		return 0, err
+	}
+
+	return len(buf), nil
+}
+
+func (lw *logWriter) parseContext(input string) (string, string, error) {
+	context := make(map[string]string)
+
+	components := strings.Split(input, ",")
+	for _, component := range components {
+		parts := strings.SplitN(component, "=", 2)
+		if len(parts) != 2 {
+			return "", "", fmt.Errorf("invalid context: %s", input)
+		}
+		context[parts[0]] = parts[1]
+	}
+
+	taskID, ok := context["com.docker.swarm.task.id"]
+	if !ok {
+		return "", "", fmt.Errorf("missing task id in context: %s", input)
+	}
+	taskName, err := lw.r.Resolve(lw.ctx, swarm.Task{}, taskID)
+	if err != nil {
+		return "", "", err
+	}
+
+	nodeID, ok := context["com.docker.swarm.node.id"]
+	if !ok {
+		return "", "", fmt.Errorf("missing node id in context: %s", input)
+	}
+	nodeName, err := lw.r.Resolve(lw.ctx, swarm.Node{}, nodeID)
+	if err != nil {
+		return "", "", err
+	}
+
+	return taskName, nodeName, nil
+}

+ 1 - 0
client/interface.go

@@ -111,6 +111,7 @@ type ServiceAPIClient interface {
 	ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
 	ServiceRemove(ctx context.Context, serviceID string) error
 	ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) error
+	ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error)
 	TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error)
 	TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
 }

+ 52 - 0
client/service_logs.go

@@ -0,0 +1,52 @@
+package client
+
+import (
+	"io"
+	"net/url"
+	"time"
+
+	"golang.org/x/net/context"
+
+	"github.com/docker/docker/api/types"
+	timetypes "github.com/docker/docker/api/types/time"
+)
+
+// ServiceLogs returns the logs generated by a service in an io.ReadCloser.
+// It's up to the caller to close the stream.
+func (cli *Client) ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
+	query := url.Values{}
+	if options.ShowStdout {
+		query.Set("stdout", "1")
+	}
+
+	if options.ShowStderr {
+		query.Set("stderr", "1")
+	}
+
+	if options.Since != "" {
+		ts, err := timetypes.GetTimestamp(options.Since, time.Now())
+		if err != nil {
+			return nil, err
+		}
+		query.Set("since", ts)
+	}
+
+	if options.Timestamps {
+		query.Set("timestamps", "1")
+	}
+
+	if options.Details {
+		query.Set("details", "1")
+	}
+
+	if options.Follow {
+		query.Set("follow", "1")
+	}
+	query.Set("tail", options.Tail)
+
+	resp, err := cli.get(ctx, "/services/"+serviceID+"/logs", query, nil)
+	if err != nil {
+		return nil, err
+	}
+	return resp.body, nil
+}

+ 133 - 0
client/service_logs_test.go

@@ -0,0 +1,133 @@
+package client
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"net/http"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/docker/docker/api/types"
+
+	"golang.org/x/net/context"
+)
+
+func TestServiceLogsError(t *testing.T) {
+	client := &Client{
+		client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
+	}
+	_, err := client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{})
+	if err == nil || err.Error() != "Error response from daemon: Server error" {
+		t.Fatalf("expected a Server Error, got %v", err)
+	}
+	_, err = client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{
+		Since: "2006-01-02TZ",
+	})
+	if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) {
+		t.Fatalf("expected a 'parsing time' error, got %v", err)
+	}
+}
+
+func TestServiceLogs(t *testing.T) {
+	expectedURL := "/services/service_id/logs"
+	cases := []struct {
+		options             types.ContainerLogsOptions
+		expectedQueryParams map[string]string
+	}{
+		{
+			expectedQueryParams: map[string]string{
+				"tail": "",
+			},
+		},
+		{
+			options: types.ContainerLogsOptions{
+				Tail: "any",
+			},
+			expectedQueryParams: map[string]string{
+				"tail": "any",
+			},
+		},
+		{
+			options: types.ContainerLogsOptions{
+				ShowStdout: true,
+				ShowStderr: true,
+				Timestamps: true,
+				Details:    true,
+				Follow:     true,
+			},
+			expectedQueryParams: map[string]string{
+				"tail":       "",
+				"stdout":     "1",
+				"stderr":     "1",
+				"timestamps": "1",
+				"details":    "1",
+				"follow":     "1",
+			},
+		},
+		{
+			options: types.ContainerLogsOptions{
+				// An complete invalid date, timestamp or go duration will be
+				// passed as is
+				Since: "invalid but valid",
+			},
+			expectedQueryParams: map[string]string{
+				"tail":  "",
+				"since": "invalid but valid",
+			},
+		},
+	}
+	for _, logCase := range cases {
+		client := &Client{
+			client: newMockClient(func(r *http.Request) (*http.Response, error) {
+				if !strings.HasPrefix(r.URL.Path, expectedURL) {
+					return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, r.URL)
+				}
+				// Check query parameters
+				query := r.URL.Query()
+				for key, expected := range logCase.expectedQueryParams {
+					actual := query.Get(key)
+					if actual != expected {
+						return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
+					}
+				}
+				return &http.Response{
+					StatusCode: http.StatusOK,
+					Body:       ioutil.NopCloser(bytes.NewReader([]byte("response"))),
+				}, nil
+			}),
+		}
+		body, err := client.ServiceLogs(context.Background(), "service_id", logCase.options)
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer body.Close()
+		content, err := ioutil.ReadAll(body)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if string(content) != "response" {
+			t.Fatalf("expected response to contain 'response', got %s", string(content))
+		}
+	}
+}
+
+func ExampleClient_ServiceLogs_withTimeout() {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
+	client, _ := NewEnvClient()
+	reader, err := client.ServiceLogs(ctx, "service_id", types.ContainerLogsOptions{})
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	_, err = io.Copy(os.Stdout, reader)
+	if err != nil && err != io.EOF {
+		log.Fatal(err)
+	}
+}

+ 1 - 1
cmd/dockerd/daemon.go

@@ -456,7 +456,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
 		systemrouter.NewRouter(d, c),
 		volume.NewRouter(d),
 		build.NewRouter(dockerfile.NewBuildManager(d)),
-		swarmrouter.NewRouter(c),
+		swarmrouter.NewRouter(d, c),
 	}...)
 
 	if d.NetworkControllerEnabled() {

+ 91 - 0
daemon/cluster/cluster.go

@@ -4,6 +4,7 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net"
 	"os"
@@ -16,20 +17,24 @@ import (
 	"github.com/Sirupsen/logrus"
 	apierrors "github.com/docker/docker/api/errors"
 	apitypes "github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/network"
 	types "github.com/docker/docker/api/types/swarm"
 	"github.com/docker/docker/daemon/cluster/convert"
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
 	"github.com/docker/docker/daemon/cluster/executor/container"
+	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/opts"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/signal"
+	"github.com/docker/docker/pkg/stdcopy"
 	"github.com/docker/docker/reference"
 	"github.com/docker/docker/runconfig"
 	swarmapi "github.com/docker/swarmkit/api"
 	"github.com/docker/swarmkit/manager/encryption"
 	swarmnode "github.com/docker/swarmkit/node"
+	"github.com/docker/swarmkit/protobuf/ptypes"
 	"github.com/pkg/errors"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
@@ -45,6 +50,7 @@ const defaultAddr = "0.0.0.0:2377"
 const (
 	initialReconnectDelay = 100 * time.Millisecond
 	maxReconnectDelay     = 30 * time.Second
+	contextPrefix         = "com.docker.swarm"
 )
 
 // ErrNoSwarm is returned on leaving a cluster that was never initialized
@@ -120,6 +126,7 @@ type node struct {
 	ready          bool
 	conn           *grpc.ClientConn
 	client         swarmapi.ControlClient
+	logs           swarmapi.LogsClient
 	reconnectDelay time.Duration
 	config         nodeStartConfig
 }
@@ -371,8 +378,10 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
 			if node.conn != conn {
 				if conn == nil {
 					node.client = nil
+					node.logs = nil
 				} else {
 					node.client = swarmapi.NewControlClient(conn)
+					node.logs = swarmapi.NewLogsClient(conn)
 				}
 			}
 			node.conn = conn
@@ -1205,6 +1214,88 @@ func (c *Cluster) RemoveService(input string) error {
 	return nil
 }
 
+// ServiceLogs collects service logs and writes them back to `config.OutStream`
+func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
+	c.RLock()
+	if !c.isActiveManager() {
+		c.RUnlock()
+		return c.errNoManager()
+	}
+
+	service, err := getService(ctx, c.client, input)
+	if err != nil {
+		c.RUnlock()
+		return err
+	}
+
+	stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
+		Selector: &swarmapi.LogSelector{
+			ServiceIDs: []string{service.ID},
+		},
+		Options: &swarmapi.LogSubscriptionOptions{
+			Follow: true,
+		},
+	})
+	if err != nil {
+		c.RUnlock()
+		return err
+	}
+
+	wf := ioutils.NewWriteFlusher(config.OutStream)
+	defer wf.Close()
+	close(started)
+	wf.Flush()
+
+	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
+	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
+
+	// Release the lock before starting the stream.
+	c.RUnlock()
+	for {
+		// Check the context before doing anything.
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+
+		subscribeMsg, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		for _, msg := range subscribeMsg.Messages {
+			data := []byte{}
+
+			if config.Timestamps {
+				ts, err := ptypes.Timestamp(msg.Timestamp)
+				if err != nil {
+					return err
+				}
+				data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
+			}
+
+			data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
+				contextPrefix, msg.Context.NodeID,
+				contextPrefix, msg.Context.ServiceID,
+				contextPrefix, msg.Context.TaskID,
+			))...)
+
+			data = append(data, msg.Data...)
+
+			switch msg.Stream {
+			case swarmapi.LogStreamStdout:
+				outStream.Write(data)
+			case swarmapi.LogStreamStderr:
+				errStream.Write(data)
+			}
+		}
+	}
+}
+
 // GetNodes returns a list of all nodes known to a cluster.
 func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
 	c.RLock()

+ 2 - 0
daemon/cluster/executor/backend.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/docker/distribution"
 	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/api/types/events"
 	"github.com/docker/docker/api/types/filters"
@@ -28,6 +29,7 @@ type Backend interface {
 	CreateManagedContainer(config types.ContainerCreateConfig, validateHostname bool) (container.ContainerCreateCreatedBody, error)
 	ContainerStart(name string, hostConfig *container.HostConfig, validateHostname bool, checkpoint string, checkpointDir string) error
 	ContainerStop(name string, seconds *int) error
+	ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
 	ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
 	ActivateContainerServiceBinding(containerName string) error
 	DeactivateContainerServiceBinding(containerName string) error

+ 52 - 0
daemon/cluster/executor/container/adapter.go

@@ -12,6 +12,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/api/server/httputils"
 	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
 	containertypes "github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/api/types/events"
 	"github.com/docker/docker/api/types/versions"
@@ -20,6 +21,7 @@ import (
 	"github.com/docker/swarmkit/agent/exec"
 	"github.com/docker/swarmkit/api"
 	"github.com/docker/swarmkit/log"
+	"github.com/docker/swarmkit/protobuf/ptypes"
 	"golang.org/x/net/context"
 	"golang.org/x/time/rate"
 )
@@ -376,6 +378,56 @@ func (c *containerAdapter) deactivateServiceBinding() error {
 	return c.backend.DeactivateContainerServiceBinding(c.container.name())
 }
 
+func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
+	reader, writer := io.Pipe()
+
+	apiOptions := &backend.ContainerLogsConfig{
+		ContainerLogsOptions: types.ContainerLogsOptions{
+			Follow: options.Follow,
+
+			// TODO(stevvooe): Parse timestamp out of message. This
+			// absolutely needs to be done before going to production with
+			// this, at it is completely redundant.
+			Timestamps: true,
+			Details:    false, // no clue what to do with this, let's just deprecate it.
+		},
+		OutStream: writer,
+	}
+
+	if options.Since != nil {
+		since, err := ptypes.Timestamp(options.Since)
+		if err != nil {
+			return nil, err
+		}
+		apiOptions.Since = since.Format(time.RFC3339Nano)
+	}
+
+	if options.Tail < 0 {
+		// See protobuf documentation for details of how this works.
+		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
+	} else if options.Tail > 0 {
+		return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
+	}
+
+	if len(options.Streams) == 0 {
+		// empty == all
+		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
+	} else {
+		for _, stream := range options.Streams {
+			switch stream {
+			case api.LogStreamStdout:
+				apiOptions.ShowStdout = true
+			case api.LogStreamStderr:
+				apiOptions.ShowStderr = true
+			}
+		}
+	}
+
+	chStarted := make(chan struct{})
+	go c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
+	return reader, nil
+}
+
 // todo: typed/wrapped errors
 func isContainerCreateNameConflict(err error) bool {
 	return strings.Contains(err.Error(), "Conflict. The name")

+ 129 - 0
daemon/cluster/executor/container/controller.go

@@ -1,8 +1,13 @@
 package container
 
 import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
 	"fmt"
+	"io"
 	"os"
+	"time"
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/events"
@@ -11,8 +16,10 @@ import (
 	"github.com/docker/swarmkit/agent/exec"
 	"github.com/docker/swarmkit/api"
 	"github.com/docker/swarmkit/log"
+	"github.com/docker/swarmkit/protobuf/ptypes"
 	"github.com/pkg/errors"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 )
 
 // controller implements agent.Controller against docker's API.
@@ -374,6 +381,128 @@ func (r *controller) Remove(ctx context.Context) error {
 	return nil
 }
 
+// waitReady waits for a container to be "ready".
+// Ready means it's past the started state.
+func (r *controller) waitReady(pctx context.Context) error {
+	if err := r.checkClosed(); err != nil {
+		return err
+	}
+
+	ctx, cancel := context.WithCancel(pctx)
+	defer cancel()
+
+	eventq := r.adapter.events(ctx)
+
+	ctnr, err := r.adapter.inspect(ctx)
+	if err != nil {
+		if !isUnknownContainer(err) {
+			return errors.Wrap(err, "inspect container failed")
+		}
+	} else {
+		switch ctnr.State.Status {
+		case "running", "exited", "dead":
+			return nil
+		}
+	}
+
+	for {
+		select {
+		case event := <-eventq:
+			if !r.matchevent(event) {
+				continue
+			}
+
+			switch event.Action {
+			case "start":
+				return nil
+			}
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-r.closed:
+			return r.err
+		}
+	}
+}
+
+func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
+	if err := r.checkClosed(); err != nil {
+		return err
+	}
+
+	if err := r.waitReady(ctx); err != nil {
+		return errors.Wrap(err, "container not ready for logs")
+	}
+
+	rc, err := r.adapter.logs(ctx, options)
+	if err != nil {
+		return errors.Wrap(err, "failed getting container logs")
+	}
+	defer rc.Close()
+
+	var (
+		// use a rate limiter to keep things under control but also provides some
+		// ability coalesce messages.
+		limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
+		msgctx  = api.LogContext{
+			NodeID:    r.task.NodeID,
+			ServiceID: r.task.ServiceID,
+			TaskID:    r.task.ID,
+		}
+	)
+
+	brd := bufio.NewReader(rc)
+	for {
+		// so, message header is 8 bytes, treat as uint64, pull stream off MSB
+		var header uint64
+		if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
+			if err == io.EOF {
+				return nil
+			}
+
+			return errors.Wrap(err, "failed reading log header")
+		}
+
+		stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
+
+		// limit here to decrease allocation back pressure.
+		if err := limiter.WaitN(ctx, int(size)); err != nil {
+			return errors.Wrap(err, "failed rate limiter")
+		}
+
+		buf := make([]byte, size)
+		_, err := io.ReadFull(brd, buf)
+		if err != nil {
+			return errors.Wrap(err, "failed reading buffer")
+		}
+
+		// Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
+		parts := bytes.SplitN(buf, []byte(" "), 2)
+		if len(parts) != 2 {
+			return fmt.Errorf("invalid timestamp in log message: %v", buf)
+		}
+
+		ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
+		if err != nil {
+			return errors.Wrap(err, "failed to parse timestamp")
+		}
+
+		tsp, err := ptypes.TimestampProto(ts)
+		if err != nil {
+			return errors.Wrap(err, "failed to convert timestamp")
+		}
+
+		if err := publisher.Publish(ctx, api.LogMessage{
+			Context:   msgctx,
+			Timestamp: tsp,
+			Stream:    api.LogStream(stream),
+
+			Data: parts[1],
+		}); err != nil {
+			return errors.Wrap(err, "failed to publish log message")
+		}
+	}
+}
+
 // Close the runner and clean up any ephemeral resources.
 func (r *controller) Close() error {
 	select {

+ 1 - 1
docs/reference/api/docker_remote_api.md

@@ -149,7 +149,7 @@ This section lists each version from latest to oldest.  Each listing includes a
 * `POST /containers/create` now takes `AutoRemove` in HostConfig, to enable auto-removal of the container on daemon side when the container's process exits.
 * `GET /containers/json` and `GET /containers/(id or name)/json` now return `"removing"` as a value for the `State.Status` field if the container is being removed. Previously, "exited" was returned as status.
 * `GET /containers/json` now accepts `removing` as a valid value for the `status` filter.
-* `GET /containers/json` now supports filtering containers by `health` status. 
+* `GET /containers/json` now supports filtering containers by `health` status.
 * `DELETE /volumes/(name)` now accepts a `force` query parameter to force removal of volumes that were already removed out of band by the volume driver plugin.
 * `POST /containers/create/` and `POST /containers/(name)/update` now validates restart policies.
 * `POST /containers/create` now validates IPAMConfig in NetworkingConfig, and returns error for invalid IPv4 and IPv6 addresses (`--ip` and `--ip6` in `docker create/run`).

+ 43 - 0
docs/reference/api/docker_remote_api_v1.25.md

@@ -5631,6 +5631,49 @@ image](#create-an-image) section for more details.
 -   **404** – no such service
 -   **500** – server error
 
+### Get service logs
+
+`GET /services/(id or name)/logs`
+
+Get `stdout` and `stderr` logs from the service ``id``
+
+> **Note**:
+> This endpoint works only for services with the `json-file` or `journald` logging drivers.
+
+**Example request**:
+
+     GET /services/4fa6e0f0c678/logs?stderr=1&stdout=1&timestamps=1&follow=1&tail=10&since=1428990821 HTTP/1.1
+
+**Example response**:
+
+     HTTP/1.1 101 UPGRADED
+     Content-Type: application/vnd.docker.raw-stream
+     Connection: Upgrade
+     Upgrade: tcp
+
+     {% raw %}
+     {{ STREAM }}
+     {% endraw %}
+
+**Query parameters**:
+
+-   **details** - 1/True/true or 0/False/flase, Show extra details provided to logs. Default `false`.
+-   **follow** – 1/True/true or 0/False/false, return stream. Default `false`.
+-   **stdout** – 1/True/true or 0/False/false, show `stdout` log. Default `false`.
+-   **stderr** – 1/True/true or 0/False/false, show `stderr` log. Default `false`.
+-   **since** – UNIX timestamp (integer) to filter logs. Specifying a timestamp
+    will only output log-entries since that timestamp. Default: 0 (unfiltered)
+-   **timestamps** – 1/True/true or 0/False/false, print timestamps for
+        every log line. Default `false`.
+-   **tail** – Output specified number of lines at the end of logs: `all` or `<number>`. Default all.
+
+**Status codes**:
+
+-   **101** – no error, hints proxy about hijacking
+-   **200** – no error, no upgrade header found
+-   **404** – no such service
+-   **500** – server error
+
 ## 3.10 Tasks
 
 **Note**: Task operations require the engine to be part of a swarm.

+ 67 - 0
docs/reference/commandline/service_logs.md

@@ -0,0 +1,67 @@
+---
+title: "service logs (experimental)"
+description: "The service logs command description and usage"
+keywords: "service, logs"
+advisory: "experimental"
+---
+
+<!-- This file is maintained within the docker/docker Github
+     repository at https://github.com/docker/docker/. Make all
+     pull requests against that repo. If you see this file in
+     another repository, consider it read-only there, as it will
+     periodically be overwritten by the definitive file. Pull
+     requests which include edits to this file in other repositories
+     will be rejected.
+-->
+
+# service logs
+
+```Markdown
+Usage:  docker service logs [OPTIONS] SERVICE
+
+Fetch the logs of a service
+
+Options:
+      --details        Show extra details provided to logs
+  -f, --follow         Follow log output
+      --help           Print usage
+      --since string   Show logs since timestamp
+      --tail string    Number of lines to show from the end of the logs (default "all")
+  -t, --timestamps     Show timestamps
+```
+
+The `docker service logs` command batch-retrieves logs present at the time of execution.
+
+> **Note**: this command is only functional for services that are started with
+> the `json-file` or `journald` logging driver.
+
+For more information about selecting and configuring login-drivers, refer to
+[Configure logging drivers](https://docs.docker.com/engine/admin/logging/overview/).
+
+The `docker service logs --follow` command will continue streaming the new output from
+the service's `STDOUT` and `STDERR`.
+
+Passing a negative number or a non-integer to `--tail` is invalid and the
+value is set to `all` in that case.
+
+The `docker service logs --timestamps` command will add an [RFC3339Nano timestamp](https://golang.org/pkg/time/#pkg-constants)
+, for example `2014-09-16T06:17:46.000000000Z`, to each
+log entry. To ensure that the timestamps are aligned the
+nano-second part of the timestamp will be padded with zero when necessary.
+
+The `docker service logs --details` command will add on extra attributes, such as
+environment variables and labels, provided to `--log-opt` when creating the
+service.
+
+The `--since` option shows only the service logs generated after
+a given date. You can specify the date as an RFC 3339 date, a UNIX
+timestamp, or a Go duration string (e.g. `1m30s`, `3h`). Besides RFC3339 date
+format you may also use RFC3339Nano, `2006-01-02T15:04:05`,
+`2006-01-02T15:04:05.999999999`, `2006-01-02Z07:00`, and `2006-01-02`. The local
+timezone on the client will be used if you do not provide either a `Z` or a
+`+-00:00` timezone offset at the end of the timestamp. When providing Unix
+timestamps enter seconds[.nanoseconds], where seconds is the number of seconds
+that have elapsed since January 1, 1970 (midnight UTC/GMT), not counting leap
+seconds (aka Unix epoch or Unix time), and the optional .nanoseconds field is a
+fraction of a second no more than nine digits long. You can combine the
+`--since` option with either or both of the `--follow` or `--tail` options.

+ 55 - 0
integration-cli/docker_cli_service_logs_experimental_test.go

@@ -0,0 +1,55 @@
+// +build !windows
+
+package main
+
+import (
+	"bufio"
+	"io"
+	"os/exec"
+	"strings"
+
+	"github.com/docker/docker/pkg/integration/checker"
+	"github.com/go-check/check"
+)
+
+type logMessage struct {
+	err  error
+	data []byte
+}
+
+func (s *DockerSwarmSuite) TestServiceLogs(c *check.C) {
+	testRequires(c, ExperimentalDaemon)
+
+	d := s.AddDaemon(c, true, true)
+
+	name := "TestServiceLogs"
+
+	out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "while true; do echo log test; sleep 1; done")
+	c.Assert(err, checker.IsNil)
+	c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "")
+
+	// make sure task has been deployed.
+	waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, 1)
+
+	args := []string{"service", "logs", "-f", name}
+	cmd := exec.Command(dockerBinary, d.prependHostArg(args)...)
+	r, w := io.Pipe()
+	cmd.Stdout = w
+	cmd.Stderr = w
+	c.Assert(cmd.Start(), checker.IsNil)
+
+	// Make sure pipe is written to
+	ch := make(chan *logMessage)
+	go func() {
+		reader := bufio.NewReader(r)
+		msg := &logMessage{}
+		msg.data, _, msg.err = reader.ReadLine()
+		ch <- msg
+	}()
+
+	msg := <-ch
+	c.Assert(msg.err, checker.IsNil)
+	c.Assert(string(msg.data), checker.Contains, "log test")
+
+	c.Assert(cmd.Process.Kill(), checker.IsNil)
+}