Merge pull request #25853 from jhorwit2/jah/event-refactor
Refactor to new engine-api events api
This commit is contained in:
commit
8c929eeb34
9 changed files with 194 additions and 118 deletions
|
@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions
|
|||
})
|
||||
}
|
||||
|
||||
statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error waiting container's exit code: %v", err)
|
||||
}
|
||||
statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove)
|
||||
|
||||
//start the container
|
||||
if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil {
|
||||
|
|
|
@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
|||
|
||||
// 3. We should open a channel for receiving status code of the container
|
||||
// no matter it's detached, removed on daemon side(--rm) or exit normally.
|
||||
statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove)
|
||||
statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove)
|
||||
startOptions := types.ContainerStartOptions{
|
||||
CheckpointID: opts.checkpoint,
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
|||
if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil {
|
||||
cancelFun()
|
||||
<-cErr
|
||||
if c.HostConfig.AutoRemove && statusErr == nil {
|
||||
if c.HostConfig.AutoRemove {
|
||||
// wait container to be removed
|
||||
<-statusChan
|
||||
}
|
||||
|
@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error {
|
|||
return attchErr
|
||||
}
|
||||
|
||||
if statusErr != nil {
|
||||
return fmt.Errorf("can't get container's exit code: %v", statusErr)
|
||||
}
|
||||
|
||||
if status := <-statusChan; status != 0 {
|
||||
return cli.StatusError{StatusCode: status}
|
||||
}
|
||||
|
|
|
@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error {
|
|||
options := types.EventsOptions{
|
||||
Filters: f,
|
||||
}
|
||||
resBody, err := dockerCli.Client().Events(ctx, options)
|
||||
// Whether we successfully subscribed to events or not, we can now
|
||||
|
||||
eventq, errq := dockerCli.Client().Events(ctx, options)
|
||||
|
||||
// Whether we successfully subscribed to eventq or not, we can now
|
||||
// unblock the main goroutine.
|
||||
close(started)
|
||||
if err != nil {
|
||||
closeChan <- err
|
||||
return
|
||||
}
|
||||
defer resBody.Close()
|
||||
|
||||
system.DecodeEvents(resBody, func(event events.Message, err error) error {
|
||||
if err != nil {
|
||||
for {
|
||||
select {
|
||||
case event := <-eventq:
|
||||
c <- event
|
||||
case err := <-errq:
|
||||
closeChan <- err
|
||||
return nil
|
||||
return
|
||||
}
|
||||
c <- event
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// waitFirst is a WaitGroup to wait first stat data's reach for each container
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
@ -11,11 +10,10 @@ import (
|
|||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/cli/command"
|
||||
"github.com/docker/docker/cli/command/system"
|
||||
clientapi "github.com/docker/docker/client"
|
||||
)
|
||||
|
||||
func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) {
|
||||
func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int {
|
||||
if len(containerID) == 0 {
|
||||
// containerID can never be empty
|
||||
panic("Internal Error: waitExitOrRemoved needs a containerID as parameter")
|
||||
|
@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
|||
statusChan := make(chan int)
|
||||
exitCode := 125
|
||||
|
||||
eventProcessor := func(e events.Message, err error) error {
|
||||
if err != nil {
|
||||
statusChan <- exitCode
|
||||
return fmt.Errorf("failed to decode event: %v", err)
|
||||
}
|
||||
eventProcessor := func(e events.Message) bool {
|
||||
|
||||
stopProcessing := false
|
||||
switch e.Status {
|
||||
|
@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
|||
|
||||
if stopProcessing {
|
||||
statusChan <- exitCode
|
||||
// stop the loop processing
|
||||
return fmt.Errorf("done")
|
||||
return true
|
||||
}
|
||||
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Get events via Events API
|
||||
|
@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai
|
|||
options := types.EventsOptions{
|
||||
Filters: f,
|
||||
}
|
||||
resBody, err := dockerCli.Client().Events(ctx, options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get events from daemon: %v", err)
|
||||
}
|
||||
|
||||
go system.DecodeEvents(resBody, eventProcessor)
|
||||
eventCtx, cancel := context.WithCancel(ctx)
|
||||
eventq, errq := dockerCli.Client().Events(eventCtx, options)
|
||||
|
||||
return statusChan, nil
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-eventq:
|
||||
if eventProcessor(evt) {
|
||||
return
|
||||
}
|
||||
|
||||
case err := <-errq:
|
||||
logrus.Errorf("error getting events from daemon: %v", err)
|
||||
statusChan <- exitCode
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return statusChan
|
||||
}
|
||||
|
||||
// getExitCode performs an inspect on the container. It returns
|
||||
|
|
|
@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error {
|
|||
Filters: opts.filter.Value(),
|
||||
}
|
||||
|
||||
responseBody, err := dockerCli.Client().Events(context.Background(), options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer responseBody.Close()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
events, errs := dockerCli.Client().Events(ctx, options)
|
||||
defer cancel()
|
||||
|
||||
return streamEvents(dockerCli.Out(), responseBody, tmpl)
|
||||
out := dockerCli.Out()
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
if err := handleEvent(out, event, tmpl); err != nil {
|
||||
return err
|
||||
}
|
||||
case err := <-errs:
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error {
|
||||
if tmpl == nil {
|
||||
return prettyPrintEvent(out, event)
|
||||
}
|
||||
|
||||
return formatEvent(out, event, tmpl)
|
||||
}
|
||||
|
||||
func makeTemplate(format string) (*template.Template, error) {
|
||||
|
@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) {
|
|||
return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{})
|
||||
}
|
||||
|
||||
// streamEvents decodes prints the incoming events in the provided output.
|
||||
func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error {
|
||||
return DecodeEvents(input, func(event eventtypes.Message, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tmpl == nil {
|
||||
return prettyPrintEvent(out, event)
|
||||
}
|
||||
return formatEvent(out, event, tmpl)
|
||||
})
|
||||
}
|
||||
|
||||
type eventProcessor func(event eventtypes.Message, err error) error
|
||||
|
||||
// prettyPrintEvent prints all types of event information.
|
||||
// Each output includes the event type, actor id, name and action.
|
||||
// Actor attributes are printed at the end if the actor has any.
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
package system
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
eventtypes "github.com/docker/docker/api/types/events"
|
||||
)
|
||||
|
||||
type eventProcessor func(eventtypes.Message, error) error
|
||||
|
||||
// EventHandler is abstract interface for user to customize
|
||||
// own handle functions of each type of events
|
||||
type EventHandler interface {
|
||||
|
@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
|
|||
go h(e)
|
||||
}
|
||||
}
|
||||
|
||||
// DecodeEvents decodes event from input stream
|
||||
func DecodeEvents(input io.Reader, ep eventProcessor) error {
|
||||
dec := json.NewDecoder(input)
|
||||
for {
|
||||
var event eventtypes.Message
|
||||
err := dec.Decode(&event)
|
||||
if err != nil && err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if procErr := ep(event, err); procErr != nil {
|
||||
return procErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,20 +1,71 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
timetypes "github.com/docker/docker/api/types/time"
|
||||
)
|
||||
|
||||
// Events returns a stream of events in the daemon in a ReadCloser.
|
||||
// It's up to the caller to close the stream.
|
||||
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) {
|
||||
// Events returns a stream of events in the daemon. It's up to the caller to close the stream
|
||||
// by cancelling the context. Once the stream has been completely read an io.EOF error will
|
||||
// be sent over the error channel. If an error is sent all processing will be stopped. It's up
|
||||
// to the caller to reopen the stream in the event of an error by reinvoking this method.
|
||||
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
|
||||
|
||||
messages := make(chan events.Message)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
defer close(errs)
|
||||
|
||||
query, err := buildEventsQueryParams(cli.version, options)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := cli.get(ctx, "/events", query, nil)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
defer resp.body.Close()
|
||||
|
||||
decoder := json.NewDecoder(resp.body)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
errs <- ctx.Err()
|
||||
return
|
||||
default:
|
||||
var event events.Message
|
||||
if err := decoder.Decode(&event); err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case messages <- event:
|
||||
case <-ctx.Done():
|
||||
errs <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return messages, errs
|
||||
}
|
||||
|
||||
func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
|
||||
query := url.Values{}
|
||||
ref := time.Now()
|
||||
|
||||
|
@ -25,6 +76,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
|
|||
}
|
||||
query.Set("since", ts)
|
||||
}
|
||||
|
||||
if options.Until != "" {
|
||||
ts, err := timetypes.GetTimestamp(options.Until, ref)
|
||||
if err != nil {
|
||||
|
@ -32,17 +84,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
|
|||
}
|
||||
query.Set("until", ts)
|
||||
}
|
||||
|
||||
if options.Filters.Len() > 0 {
|
||||
filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters)
|
||||
filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query.Set("filters", filterJSON)
|
||||
}
|
||||
|
||||
serverResponse, err := cli.get(ctx, "/events", query, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return serverResponse.body, nil
|
||||
return query, nil
|
||||
}
|
||||
|
|
|
@ -2,7 +2,9 @@ package client
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
@ -11,6 +13,7 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
)
|
||||
|
||||
|
@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) {
|
|||
client := &Client{
|
||||
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
||||
}
|
||||
_, err := client.Events(context.Background(), e.options)
|
||||
_, errs := client.Events(context.Background(), e.options)
|
||||
err := <-errs
|
||||
if err == nil || !strings.Contains(err.Error(), e.expectedError) {
|
||||
t.Fatalf("expected an error %q, got %v", e.expectedError, err)
|
||||
}
|
||||
|
@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) {
|
|||
client := &Client{
|
||||
client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")),
|
||||
}
|
||||
_, err := client.Events(context.Background(), types.EventsOptions{})
|
||||
_, errs := client.Events(context.Background(), types.EventsOptions{})
|
||||
err := <-errs
|
||||
if err == nil || err.Error() != "Error response from daemon: Server error" {
|
||||
t.Fatalf("expected a Server Error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents(t *testing.T) {
|
||||
|
||||
expectedURL := "/events"
|
||||
|
||||
filters := filters.NewArgs()
|
||||
filters.Add("label", "label1")
|
||||
filters.Add("label", "label2")
|
||||
expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}`
|
||||
filters.Add("type", events.ContainerEventType)
|
||||
expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)
|
||||
|
||||
eventsCases := []struct {
|
||||
options types.EventsOptions
|
||||
events []events.Message
|
||||
expectedEvents map[string]bool
|
||||
expectedQueryParams map[string]string
|
||||
}{
|
||||
{
|
||||
options: types.EventsOptions{
|
||||
Since: "invalid but valid",
|
||||
Filters: filters,
|
||||
},
|
||||
expectedQueryParams: map[string]string{
|
||||
"since": "invalid but valid",
|
||||
},
|
||||
},
|
||||
{
|
||||
options: types.EventsOptions{
|
||||
Until: "invalid but valid",
|
||||
},
|
||||
expectedQueryParams: map[string]string{
|
||||
"until": "invalid but valid",
|
||||
"filters": expectedFiltersJSON,
|
||||
},
|
||||
events: []events.Message{},
|
||||
expectedEvents: make(map[string]bool),
|
||||
},
|
||||
{
|
||||
options: types.EventsOptions{
|
||||
|
@ -88,6 +89,28 @@ func TestEvents(t *testing.T) {
|
|||
expectedQueryParams: map[string]string{
|
||||
"filters": expectedFiltersJSON,
|
||||
},
|
||||
events: []events.Message{
|
||||
{
|
||||
Type: "container",
|
||||
ID: "1",
|
||||
Action: "create",
|
||||
},
|
||||
{
|
||||
Type: "container",
|
||||
ID: "2",
|
||||
Action: "die",
|
||||
},
|
||||
{
|
||||
Type: "container",
|
||||
ID: "3",
|
||||
Action: "create",
|
||||
},
|
||||
},
|
||||
expectedEvents: map[string]bool{
|
||||
"1": true,
|
||||
"2": true,
|
||||
"3": true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -98,29 +121,45 @@ func TestEvents(t *testing.T) {
|
|||
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
|
||||
}
|
||||
query := req.URL.Query()
|
||||
|
||||
for key, expected := range eventsCase.expectedQueryParams {
|
||||
actual := query.Get(key)
|
||||
if actual != expected {
|
||||
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
buffer := new(bytes.Buffer)
|
||||
|
||||
for _, e := range eventsCase.events {
|
||||
b, _ := json.Marshal(e)
|
||||
buffer.Write(b)
|
||||
}
|
||||
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))),
|
||||
Body: ioutil.NopCloser(buffer),
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
body, err := client.Events(context.Background(), eventsCase.options)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer body.Close()
|
||||
content, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(content) != "response" {
|
||||
t.Fatalf("expected response to contain 'response', got %s", string(content))
|
||||
|
||||
messages, errs := client.Events(context.Background(), eventsCase.options)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case err := <-errs:
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
break loop
|
||||
case e := <-messages:
|
||||
_, ok := eventsCase.expectedEvents[e.ID]
|
||||
if !ok {
|
||||
t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
"github.com/docker/docker/api/types/registry"
|
||||
|
@ -120,7 +121,7 @@ type SwarmAPIClient interface {
|
|||
|
||||
// SystemAPIClient defines API client methods for the system
|
||||
type SystemAPIClient interface {
|
||||
Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error)
|
||||
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
|
||||
Info(ctx context.Context) (types.Info, error)
|
||||
RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue