浏览代码

add health check in docker cluster

Signed-off-by: runshenzhu <runshen.zhu@gmail.com>
(cherry picked from commit 1ded1f26e154e283ab26f347971d4d4a51edc94f)
Signed-off-by: Tibor Vass <tibor@docker.com>
runshenzhu 9 年之前
父节点
当前提交
2e37061278

+ 5 - 0
daemon/cluster/executor/backend.go

@@ -2,10 +2,13 @@ package executor
 
 import (
 	"io"
+	"time"
 
 	clustertypes "github.com/docker/docker/daemon/cluster/provider"
 	"github.com/docker/engine-api/types"
 	"github.com/docker/engine-api/types/container"
+	"github.com/docker/engine-api/types/events"
+	"github.com/docker/engine-api/types/filters"
 	"github.com/docker/engine-api/types/network"
 	"github.com/docker/libnetwork/cluster"
 	networktypes "github.com/docker/libnetwork/types"
@@ -33,4 +36,6 @@ type Backend interface {
 	SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
 	SetClusterProvider(provider cluster.Provider)
 	IsSwarmCompatible() error
+	SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{})
+	UnsubscribeFromEvents(listener chan interface{})
 }

+ 36 - 3
daemon/cluster/executor/container/adapter.go

@@ -7,11 +7,13 @@ import (
 	"io"
 	"strings"
 	"syscall"
+	"time"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/api/server/httputils"
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
 	"github.com/docker/engine-api/types"
+	"github.com/docker/engine-api/types/events"
 	"github.com/docker/engine-api/types/versions"
 	"github.com/docker/libnetwork"
 	"github.com/docker/swarmkit/api"
@@ -168,9 +170,40 @@ func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, er
 
 // events issues a call to the events API and returns a channel with all
 // events. The stream of events can be shutdown by cancelling the context.
-//
-// A chan struct{} is returned that will be closed if the event processing
-// fails and needs to be restarted.
+func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
+	log.G(ctx).Debugf("waiting on events")
+	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
+	eventsq := make(chan events.Message, len(buffer))
+
+	for _, event := range buffer {
+		eventsq <- event
+	}
+
+	go func() {
+		defer c.backend.UnsubscribeFromEvents(l)
+
+		for {
+			select {
+			case ev := <-l:
+				jev, ok := ev.(events.Message)
+				if !ok {
+					log.G(ctx).Warnf("unexpected event message: %q", ev)
+					continue
+				}
+				select {
+				case eventsq <- jev:
+				case <-ctx.Done():
+					return
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	return eventsq
+}
+
 func (c *containerAdapter) wait(ctx context.Context) error {
 	return c.backend.ContainerWaitWithContext(ctx, c.container.name())
 }

+ 10 - 0
daemon/cluster/executor/container/container.go

@@ -13,6 +13,8 @@ import (
 	"github.com/docker/docker/reference"
 	"github.com/docker/engine-api/types"
 	enginecontainer "github.com/docker/engine-api/types/container"
+	"github.com/docker/engine-api/types/events"
+	"github.com/docker/engine-api/types/filters"
 	"github.com/docker/engine-api/types/network"
 	"github.com/docker/swarmkit/agent/exec"
 	"github.com/docker/swarmkit/api"
@@ -492,3 +494,11 @@ func (c *containerConfig) networkCreateRequest(name string) (clustertypes.Networ
 
 	return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil
 }
+
+func (c containerConfig) eventFilter() filters.Args {
+	filter := filters.NewArgs()
+	filter.Add("type", events.ContainerEventType)
+	filter.Add("name", c.name())
+	filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
+	return filter
+}

+ 61 - 3
daemon/cluster/executor/container/controller.go

@@ -5,6 +5,7 @@ import (
 
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
 	"github.com/docker/engine-api/types"
+	"github.com/docker/engine-api/types/events"
 	"github.com/docker/swarmkit/agent/exec"
 	"github.com/docker/swarmkit/api"
 	"github.com/docker/swarmkit/log"
@@ -150,20 +151,39 @@ func (r *controller) Wait(pctx context.Context) error {
 	ctx, cancel := context.WithCancel(pctx)
 	defer cancel()
 
+	healthErr := make(chan error, 1)
+	go func() {
+		ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
+		defer cancel()
+		if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
+			healthErr <- ErrContainerUnhealthy
+			if err := r.Shutdown(ectx); err != nil {
+				log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
+			}
+		}
+	}()
+
 	err := r.adapter.wait(ctx)
 	if ctx.Err() != nil {
 		return ctx.Err()
 	}
+
 	if err != nil {
 		ee := &exitError{}
-		if err.Error() != "" {
-			ee.cause = err
-		}
 		if ec, ok := err.(exec.ExitCoder); ok {
 			ee.code = ec.ExitCode()
 		}
+		select {
+		case e := <-healthErr:
+			ee.cause = e
+		default:
+			if err.Error() != "" {
+				ee.cause = err
+			}
+		}
 		return ee
 	}
+
 	return nil
 }
 
@@ -247,6 +267,21 @@ func (r *controller) Close() error {
 	return nil
 }
 
+func (r *controller) matchevent(event events.Message) bool {
+	if event.Type != events.ContainerEventType {
+		return false
+	}
+
+	// TODO(stevvooe): Filter based on ID matching, in addition to name.
+
+	// Make sure the events are for this container.
+	if event.Actor.Attributes["name"] != r.adapter.container.name() {
+		return false
+	}
+
+	return true
+}
+
 func (r *controller) checkClosed() error {
 	select {
 	case <-r.closed:
@@ -286,3 +321,26 @@ func (e *exitError) ExitCode() int {
 func (e *exitError) Cause() error {
 	return e.cause
 }
+
+// checkHealth blocks until unhealthy container is detected or ctx exits
+func (r *controller) checkHealth(ctx context.Context) error {
+	eventq := r.adapter.events(ctx)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case <-r.closed:
+			return nil
+		case event := <-eventq:
+			if !r.matchevent(event) {
+				continue
+			}
+
+			switch event.Action {
+			case "health_status: unhealthy":
+				return ErrContainerUnhealthy
+			}
+		}
+	}
+}

+ 3 - 0
daemon/cluster/executor/container/errors.go

@@ -9,4 +9,7 @@ var (
 	// ErrContainerDestroyed returned when a container is prematurely destroyed
 	// during a wait call.
 	ErrContainerDestroyed = fmt.Errorf("dockerexec: container destroyed")
+
+	// ErrContainerUnhealthy returned if controller detects the health check failure
+	ErrContainerUnhealthy = fmt.Errorf("dockerexec: unhealthy container")
 )

+ 102 - 0
daemon/cluster/executor/container/health_test.go

@@ -0,0 +1,102 @@
+// +build !windows
+
+package container
+
+import (
+	"testing"
+	"time"
+
+	"github.com/docker/docker/container"
+	"github.com/docker/docker/daemon"
+	"github.com/docker/docker/daemon/events"
+	containertypes "github.com/docker/engine-api/types/container"
+	"github.com/docker/swarmkit/api"
+	"golang.org/x/net/context"
+)
+
+func TestHealthStates(t *testing.T) {
+
+	// set up environment: events, task, container ....
+	e := events.New()
+	_, l, _ := e.Subscribe()
+	defer e.Evict(l)
+
+	task := &api.Task{
+		ID:        "id",
+		ServiceID: "sid",
+		Spec: api.TaskSpec{
+			Runtime: &api.TaskSpec_Container{
+				Container: &api.ContainerSpec{
+					Image: "image_name",
+					Labels: map[string]string{
+						"com.docker.swarm.task.id": "id",
+					},
+				},
+			},
+		},
+		Annotations: api.Annotations{Name: "name"},
+	}
+
+	c := &container.Container{
+		CommonContainer: container.CommonContainer{
+			ID:   "id",
+			Name: "name",
+			Config: &containertypes.Config{
+				Image: "image_name",
+				Labels: map[string]string{
+					"com.docker.swarm.task.id": "id",
+				},
+			},
+		},
+	}
+
+	daemon := &daemon.Daemon{
+		EventsService: e,
+	}
+
+	controller, err := newController(daemon, task)
+	if err != nil {
+		t.Fatalf("create controller fail %v", err)
+	}
+
+	errChan := make(chan error, 1)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// fire checkHealth
+	go func() {
+		err := controller.checkHealth(ctx)
+		select {
+		case errChan <- err:
+		case <-ctx.Done():
+		}
+	}()
+
+	// send an event and expect to get expectedErr
+	// if expectedErr is nil, shouldn't get any error
+	logAndExpect := func(msg string, expectedErr error) {
+		daemon.LogContainerEvent(c, msg)
+
+		timer := time.NewTimer(1 * time.Second)
+		defer timer.Stop()
+
+		select {
+		case err := <-errChan:
+			if err != expectedErr {
+				t.Fatalf("expect error %v, but get %v", expectedErr, err)
+			}
+		case <-timer.C:
+			if expectedErr != nil {
+				t.Fatalf("time limit exceeded, didn't get expected error")
+			}
+		}
+	}
+
+	// events that are ignored by checkHealth
+	logAndExpect("health_status: running", nil)
+	logAndExpect("health_status: healthy", nil)
+	logAndExpect("die", nil)
+
+	// unhealthy event will be caught by checkHealth
+	logAndExpect("health_status: unhealthy", ErrContainerUnhealthy)
+}