Преглед на файлове

Merge pull request #27279 from dongluochen/lbapi_integration

Support health aware load balancing
Madhu Venugopal преди 8 години
родител
ревизия
bee2beeaad

+ 2 - 0
daemon/cluster/executor/backend.go

@@ -27,6 +27,8 @@ type Backend interface {
 	ContainerStart(name string, hostConfig *container.HostConfig, validateHostname bool, checkpoint string, checkpointDir string) error
 	ContainerStop(name string, seconds *int) error
 	ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
+	ActivateContainerServiceBinding(containerName string) error
+	DeactivateContainerServiceBinding(containerName string) error
 	UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
 	ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
 	ContainerWaitWithContext(ctx context.Context, name string) error

+ 8 - 0
daemon/cluster/executor/container/adapter.go

@@ -331,6 +331,14 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error {
 	return nil
 }
 
+func (c *containerAdapter) activateServiceBinding() error {
+	return c.backend.ActivateContainerServiceBinding(c.container.name())
+}
+
+func (c *containerAdapter) deactivateServiceBinding() error {
+	return c.backend.DeactivateContainerServiceBinding(c.container.name())
+}
+
 // todo: typed/wrapped errors
 func isContainerCreateNameConflict(err error) bool {
 	return strings.Contains(err.Error(), "Conflict. The name")

+ 14 - 0
daemon/cluster/executor/container/controller.go

@@ -183,6 +183,10 @@ func (r *controller) Start(ctx context.Context) error {
 
 	// no health check
 	if ctnr.Config == nil || ctnr.Config.Healthcheck == nil {
+		if err := r.adapter.activateServiceBinding(); err != nil {
+			log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
+			return err
+		}
 		return nil
 	}
 
@@ -225,6 +229,10 @@ func (r *controller) Start(ctx context.Context) error {
 				// set health check error, and wait for container to fully exit ("die" event)
 				healthErr = ErrContainerUnhealthy
 			case "health_status: healthy":
+				if err := r.adapter.activateServiceBinding(); err != nil {
+					log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
+					return err
+				}
 				return nil
 			}
 		case <-ctx.Done():
@@ -290,6 +298,12 @@ func (r *controller) Shutdown(ctx context.Context) error {
 		r.cancelPull()
 	}
 
+	// remove container from service binding
+	if err := r.adapter.deactivateServiceBinding(); err != nil {
+		log.G(ctx).WithError(err).Errorf("failed to deactivate service binding for container %s", r.adapter.container.name())
+		return err
+	}
+
 	if err := r.adapter.shutdown(ctx); err != nil {
 		if isUnknownContainer(err) || isStoppedContainer(err) {
 			return nil

+ 33 - 0
daemon/container_operations.go

@@ -719,6 +719,13 @@ func (daemon *Daemon) connectToNetwork(container *container.Container, idOrName
 		return err
 	}
 
+	if !container.Managed {
+		// add container name/alias to DNS
+		if err := daemon.ActivateContainerServiceBinding(container.Name); err != nil {
+			return fmt.Errorf("Activate container service binding for %s failed: %v", container.Name, err)
+		}
+	}
+
 	if err := container.UpdateJoinInfo(n, ep); err != nil {
 		return fmt.Errorf("Updating join info failed: %v", err)
 	}
@@ -987,3 +994,29 @@ func (daemon *Daemon) DisconnectFromNetwork(container *container.Container, netw
 	}
 	return nil
 }
+
+// ActivateContainerServiceBinding puts this container into load balancer active rotation and DNS response
+func (daemon *Daemon) ActivateContainerServiceBinding(containerName string) error {
+	container, err := daemon.GetContainer(containerName)
+	if err != nil {
+		return err
+	}
+	sb := daemon.getNetworkSandbox(container)
+	if sb == nil {
+		return fmt.Errorf("network sandbox not exists for container %s", containerName)
+	}
+	return sb.EnableService()
+}
+
+// DeactivateContainerServiceBinding remove this container fromload balancer active rotation, and DNS response
+func (daemon *Daemon) DeactivateContainerServiceBinding(containerName string) error {
+	container, err := daemon.GetContainer(containerName)
+	if err != nil {
+		return err
+	}
+	sb := daemon.getNetworkSandbox(container)
+	if sb == nil {
+		return fmt.Errorf("network sandbox not exists for container %s", containerName)
+	}
+	return sb.DisableService()
+}

+ 4 - 0
daemon/network.go

@@ -178,6 +178,10 @@ func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nod
 		if err := ep.Join(sb, nil); err != nil {
 			logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
 		}
+
+		if err := sb.EnableService(); err != nil {
+			logrus.WithError(err).Error("Failed enabling service for ingress sandbox")
+		}
 	}()
 
 	return nil

+ 1 - 1
vendor.conf

@@ -23,7 +23,7 @@ github.com/RackSec/srslog 365bf33cd9acc21ae1c355209865f17228ca534e
 github.com/imdario/mergo 0.2.1
 
 #get libnetwork packages
-github.com/docker/libnetwork 9ab6e136fa628b5bb4af4a75f76609ef2c21c024
+github.com/docker/libnetwork a98901aebe7ce920b6fbf02ebe5c3afc9ca975b8
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

+ 13 - 4
vendor/github.com/docker/libnetwork/endpoint.go

@@ -74,6 +74,7 @@ type endpoint struct {
 	ingressPorts      []*PortConfig
 	dbIndex           uint64
 	dbExists          bool
+	serviceEnabled    bool
 	sync.Mutex
 }
 
@@ -303,6 +304,18 @@ func (ep *endpoint) isAnonymous() bool {
 	return ep.anonymous
 }
 
+// enableService sets ep's serviceEnabled to the passed value if it's not in the
+// current state and returns true; false otherwise.
+func (ep *endpoint) enableService(state bool) bool {
+	ep.Lock()
+	defer ep.Unlock()
+	if ep.serviceEnabled != state {
+		ep.serviceEnabled = state
+		return true
+	}
+	return false
+}
+
 func (ep *endpoint) needResolver() bool {
 	ep.Lock()
 	defer ep.Unlock()
@@ -502,10 +515,6 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
 		return err
 	}
 
-	if e := ep.addToCluster(); e != nil {
-		log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e)
-	}
-
 	if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
 		return sb.setupDefaultGW()
 	}

+ 11 - 4
vendor/github.com/docker/libnetwork/networkdb/networkdb.go

@@ -11,6 +11,7 @@ import (
 	"github.com/Sirupsen/logrus"
 	"github.com/armon/go-radix"
 	"github.com/docker/go-events"
+	"github.com/docker/libnetwork/types"
 	"github.com/hashicorp/memberlist"
 	"github.com/hashicorp/serf/serf"
 )
@@ -237,7 +238,7 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
 
 	e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
 	if !ok {
-		return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
+		return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
 	}
 
 	return e.(*entry), nil
@@ -247,10 +248,16 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
 // table, key) tuple and if the NetworkDB is part of the cluster
 // propogates this event to the cluster. It is an error to create an
 // entry for the same tuple for which there is already an existing
-// entry.
+// entry unless the current entry is deleting state.
 func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
-	if _, err := nDB.GetEntry(tname, nid, key); err == nil {
-		return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key)
+	oldEntry, err := nDB.getEntry(tname, nid, key)
+	if err != nil {
+		if _, ok := err.(types.NotFoundError); !ok {
+			return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
+		}
+	}
+	if oldEntry != nil && !oldEntry.deleting {
+		return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
 	}
 
 	entry := &entry{

+ 30 - 0
vendor/github.com/docker/libnetwork/sandbox.go

@@ -42,6 +42,12 @@ type Sandbox interface {
 	// ResolveService returns all the backend details about the containers or hosts
 	// backing a service. Its purpose is to satisfy an SRV query
 	ResolveService(name string) ([]*net.SRV, []net.IP)
+	// EnableService  makes a managed container's service available by adding the
+	// endpoint to the service load balancer and service discovery
+	EnableService() error
+	// DisableService removes a managed contianer's endpoints from the load balancer
+	// and service discovery
+	DisableService() error
 }
 
 // SandboxOption is an option setter function type used to pass various options to
@@ -655,6 +661,30 @@ func (sb *sandbox) SetKey(basePath string) error {
 	return nil
 }
 
+func (sb *sandbox) EnableService() error {
+	for _, ep := range sb.getConnectedEndpoints() {
+		if ep.enableService(true) {
+			if err := ep.addToCluster(); err != nil {
+				ep.enableService(false)
+				return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
+			}
+		}
+	}
+	return nil
+}
+
+func (sb *sandbox) DisableService() error {
+	for _, ep := range sb.getConnectedEndpoints() {
+		if ep.enableService(false) {
+			if err := ep.deleteFromCluster(); err != nil {
+				ep.enableService(true)
+				return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
+			}
+		}
+	}
+	return nil
+}
+
 func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
 	for _, i := range osSbox.Info().Interfaces() {
 		// Only remove the interfaces owned by this endpoint from the sandbox.