From d6bd79c18f5ab5dbdaa472ccd921ad2858117507 Mon Sep 17 00:00:00 2001 From: Josh Horwitz Date: Tue, 9 Aug 2016 10:34:07 -1000 Subject: [PATCH] Refactor to new events api Signed-off-by: Josh Horwitz --- cli/command/container/run.go | 5 +- cli/command/container/start.go | 8 +-- cli/command/container/stats.go | 24 ++++---- cli/command/container/utils.go | 42 ++++++++------ cli/command/system/events.go | 47 ++++++++------- cli/command/system/events_utils.go | 21 +------ client/events.go | 69 ++++++++++++++++++---- client/events_test.go | 93 +++++++++++++++++++++--------- client/interface.go | 3 +- 9 files changed, 194 insertions(+), 118 deletions(-) diff --git a/cli/command/container/run.go b/cli/command/container/run.go index d36ab610cf..2f1181659d 100644 --- a/cli/command/container/run.go +++ b/cli/command/container/run.go @@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions }) } - statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove) - if err != nil { - return fmt.Errorf("Error waiting container's exit code: %v", err) - } + statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove) //start the container if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil { diff --git a/cli/command/container/start.go b/cli/command/container/start.go index 9f414a7c66..4c31f9bf97 100644 --- a/cli/command/container/start.go +++ b/cli/command/container/start.go @@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { // 3. We should open a channel for receiving status code of the container // no matter it's detached, removed on daemon side(--rm) or exit normally. - statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove) + statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove) startOptions := types.ContainerStartOptions{ CheckpointID: opts.checkpoint, } @@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil { cancelFun() <-cErr - if c.HostConfig.AutoRemove && statusErr == nil { + if c.HostConfig.AutoRemove { // wait container to be removed <-statusChan } @@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { return attchErr } - if statusErr != nil { - return fmt.Errorf("can't get container's exit code: %v", statusErr) - } - if status := <-statusChan; status != 0 { return cli.StatusError{StatusCode: status} } diff --git a/cli/command/container/stats.go b/cli/command/container/stats.go index 2bd5e3db75..394302d087 100644 --- a/cli/command/container/stats.go +++ b/cli/command/container/stats.go @@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error { options := types.EventsOptions{ Filters: f, } - resBody, err := dockerCli.Client().Events(ctx, options) - // Whether we successfully subscribed to events or not, we can now + + eventq, errq := dockerCli.Client().Events(ctx, options) + + // Whether we successfully subscribed to eventq or not, we can now // unblock the main goroutine. close(started) - if err != nil { - closeChan <- err - return - } - defer resBody.Close() - system.DecodeEvents(resBody, func(event events.Message, err error) error { - if err != nil { + for { + select { + case event := <-eventq: + c <- event + case err := <-errq: closeChan <- err - return nil + return } - c <- event - return nil - }) + } } // waitFirst is a WaitGroup to wait first stat data's reach for each container diff --git a/cli/command/container/utils.go b/cli/command/container/utils.go index 7e895834f9..9df1d115e2 100644 --- a/cli/command/container/utils.go +++ b/cli/command/container/utils.go @@ -1,7 +1,6 @@ package container import ( - "fmt" "strconv" "golang.org/x/net/context" @@ -11,11 +10,10 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/cli/command" - "github.com/docker/docker/cli/command/system" clientapi "github.com/docker/docker/client" ) -func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) { +func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int { if len(containerID) == 0 { // containerID can never be empty panic("Internal Error: waitExitOrRemoved needs a containerID as parameter") @@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai statusChan := make(chan int) exitCode := 125 - eventProcessor := func(e events.Message, err error) error { - if err != nil { - statusChan <- exitCode - return fmt.Errorf("failed to decode event: %v", err) - } + eventProcessor := func(e events.Message) bool { stopProcessing := false switch e.Status { @@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai if stopProcessing { statusChan <- exitCode - // stop the loop processing - return fmt.Errorf("done") + return true } - return nil + return false } // Get events via Events API @@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai options := types.EventsOptions{ Filters: f, } - resBody, err := dockerCli.Client().Events(ctx, options) - if err != nil { - return nil, fmt.Errorf("can't get events from daemon: %v", err) - } - go system.DecodeEvents(resBody, eventProcessor) + eventCtx, cancel := context.WithCancel(ctx) + eventq, errq := dockerCli.Client().Events(eventCtx, options) - return statusChan, nil + go func() { + defer cancel() + + for { + select { + case evt := <-eventq: + if eventProcessor(evt) { + return + } + + case err := <-errq: + logrus.Errorf("error getting events from daemon: %v", err) + statusChan <- exitCode + return + } + } + }() + + return statusChan } // getExitCode performs an inspect on the container. It returns diff --git a/cli/command/system/events.go b/cli/command/system/events.go index f2946b8763..7b5fb592cb 100644 --- a/cli/command/system/events.go +++ b/cli/command/system/events.go @@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error { Filters: opts.filter.Value(), } - responseBody, err := dockerCli.Client().Events(context.Background(), options) - if err != nil { - return err - } - defer responseBody.Close() + ctx, cancel := context.WithCancel(context.Background()) + events, errs := dockerCli.Client().Events(ctx, options) + defer cancel() - return streamEvents(dockerCli.Out(), responseBody, tmpl) + out := dockerCli.Out() + + for { + select { + case event := <-events: + if err := handleEvent(out, event, tmpl); err != nil { + return err + } + case err := <-errs: + if err == io.EOF { + return nil + } + return err + } + } +} + +func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error { + if tmpl == nil { + return prettyPrintEvent(out, event) + } + + return formatEvent(out, event, tmpl) } func makeTemplate(format string) (*template.Template, error) { @@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) { return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{}) } -// streamEvents decodes prints the incoming events in the provided output. -func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error { - return DecodeEvents(input, func(event eventtypes.Message, err error) error { - if err != nil { - return err - } - if tmpl == nil { - return prettyPrintEvent(out, event) - } - return formatEvent(out, event, tmpl) - }) -} - -type eventProcessor func(event eventtypes.Message, err error) error - // prettyPrintEvent prints all types of event information. // Each output includes the event type, actor id, name and action. // Actor attributes are printed at the end if the actor has any. diff --git a/cli/command/system/events_utils.go b/cli/command/system/events_utils.go index 71c1b0476b..b0dd909d15 100644 --- a/cli/command/system/events_utils.go +++ b/cli/command/system/events_utils.go @@ -1,14 +1,14 @@ package system import ( - "encoding/json" - "io" "sync" "github.com/Sirupsen/logrus" eventtypes "github.com/docker/docker/api/types/events" ) +type eventProcessor func(eventtypes.Message, error) error + // EventHandler is abstract interface for user to customize // own handle functions of each type of events type EventHandler interface { @@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) { go h(e) } } - -// DecodeEvents decodes event from input stream -func DecodeEvents(input io.Reader, ep eventProcessor) error { - dec := json.NewDecoder(input) - for { - var event eventtypes.Message - err := dec.Decode(&event) - if err != nil && err == io.EOF { - break - } - - if procErr := ep(event, err); procErr != nil { - return procErr - } - } - return nil -} diff --git a/client/events.go b/client/events.go index 0ba7114f94..c154f7dcf9 100644 --- a/client/events.go +++ b/client/events.go @@ -1,20 +1,71 @@ package client import ( - "io" + "encoding/json" "net/url" "time" "golang.org/x/net/context" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" timetypes "github.com/docker/docker/api/types/time" ) -// Events returns a stream of events in the daemon in a ReadCloser. -// It's up to the caller to close the stream. -func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) { +// Events returns a stream of events in the daemon. It's up to the caller to close the stream +// by cancelling the context. Once the stream has been completely read an io.EOF error will +// be sent over the error channel. If an error is sent all processing will be stopped. It's up +// to the caller to reopen the stream in the event of an error by reinvoking this method. +func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + + messages := make(chan events.Message) + errs := make(chan error, 1) + + go func() { + defer close(errs) + + query, err := buildEventsQueryParams(cli.version, options) + if err != nil { + errs <- err + return + } + + resp, err := cli.get(ctx, "/events", query, nil) + if err != nil { + errs <- err + return + } + defer resp.body.Close() + + decoder := json.NewDecoder(resp.body) + + for { + select { + case <-ctx.Done(): + errs <- ctx.Err() + return + default: + var event events.Message + if err := decoder.Decode(&event); err != nil { + errs <- err + return + } + + select { + case messages <- event: + case <-ctx.Done(): + errs <- ctx.Err() + return + } + } + } + }() + + return messages, errs +} + +func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) { query := url.Values{} ref := time.Now() @@ -25,6 +76,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io. } query.Set("since", ts) } + if options.Until != "" { ts, err := timetypes.GetTimestamp(options.Until, ref) if err != nil { @@ -32,17 +84,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io. } query.Set("until", ts) } + if options.Filters.Len() > 0 { - filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters) + filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters) if err != nil { return nil, err } query.Set("filters", filterJSON) } - serverResponse, err := cli.get(ctx, "/events", query, nil) - if err != nil { - return nil, err - } - return serverResponse.body, nil + return query, nil } diff --git a/client/events_test.go b/client/events_test.go index 6328983609..ba82d2f542 100644 --- a/client/events_test.go +++ b/client/events_test.go @@ -2,7 +2,9 @@ package client import ( "bytes" + "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -11,6 +13,7 @@ import ( "golang.org/x/net/context" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" ) @@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) { client := &Client{ client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), } - _, err := client.Events(context.Background(), e.options) + _, errs := client.Events(context.Background(), e.options) + err := <-errs if err == nil || !strings.Contains(err.Error(), e.expectedError) { t.Fatalf("expected an error %q, got %v", e.expectedError, err) } @@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) { client := &Client{ client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), } - _, err := client.Events(context.Background(), types.EventsOptions{}) + _, errs := client.Events(context.Background(), types.EventsOptions{}) + err := <-errs if err == nil || err.Error() != "Error response from daemon: Server error" { t.Fatalf("expected a Server Error, got %v", err) } } func TestEvents(t *testing.T) { + expectedURL := "/events" filters := filters.NewArgs() - filters.Add("label", "label1") - filters.Add("label", "label2") - expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}` + filters.Add("type", events.ContainerEventType) + expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType) eventsCases := []struct { options types.EventsOptions + events []events.Message + expectedEvents map[string]bool expectedQueryParams map[string]string }{ { options: types.EventsOptions{ - Since: "invalid but valid", + Filters: filters, }, expectedQueryParams: map[string]string{ - "since": "invalid but valid", - }, - }, - { - options: types.EventsOptions{ - Until: "invalid but valid", - }, - expectedQueryParams: map[string]string{ - "until": "invalid but valid", + "filters": expectedFiltersJSON, }, + events: []events.Message{}, + expectedEvents: make(map[string]bool), }, { options: types.EventsOptions{ @@ -88,6 +89,28 @@ func TestEvents(t *testing.T) { expectedQueryParams: map[string]string{ "filters": expectedFiltersJSON, }, + events: []events.Message{ + { + Type: "container", + ID: "1", + Action: "create", + }, + { + Type: "container", + ID: "2", + Action: "die", + }, + { + Type: "container", + ID: "3", + Action: "create", + }, + }, + expectedEvents: map[string]bool{ + "1": true, + "2": true, + "3": true, + }, }, } @@ -98,29 +121,45 @@ func TestEvents(t *testing.T) { return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL) } query := req.URL.Query() + for key, expected := range eventsCase.expectedQueryParams { actual := query.Get(key) if actual != expected { return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) } } + + buffer := new(bytes.Buffer) + + for _, e := range eventsCase.events { + b, _ := json.Marshal(e) + buffer.Write(b) + } + return &http.Response{ StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))), + Body: ioutil.NopCloser(buffer), }, nil }), } - body, err := client.Events(context.Background(), eventsCase.options) - if err != nil { - t.Fatal(err) - } - defer body.Close() - content, err := ioutil.ReadAll(body) - if err != nil { - t.Fatal(err) - } - if string(content) != "response" { - t.Fatalf("expected response to contain 'response', got %s", string(content)) + + messages, errs := client.Events(context.Background(), eventsCase.options) + + loop: + for { + select { + case err := <-errs: + if err != nil && err != io.EOF { + t.Fatal(err) + } + + break loop + case e := <-messages: + _, ok := eventsCase.expectedEvents[e.ID] + if !ok { + t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID) + } + } } } } diff --git a/client/interface.go b/client/interface.go index 2d5555ff06..81320918b3 100644 --- a/client/interface.go +++ b/client/interface.go @@ -6,6 +6,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/registry" @@ -120,7 +121,7 @@ type SwarmAPIClient interface { // SystemAPIClient defines API client methods for the system type SystemAPIClient interface { - Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) Info(ctx context.Context) (types.Info, error) RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error) }