From a7e1718800825ffe389ca86c01ed2568fd6f88e2 Mon Sep 17 00:00:00 2001 From: Santhosh Manohar Date: Sat, 20 Aug 2016 22:55:00 -0700 Subject: [PATCH] Add sandbox API for task insertion to service LB and service discovery Signed-off-by: Santhosh Manohar --- libnetwork/endpoint.go | 17 +++++++++++++---- libnetwork/libnetwork_test.go | 8 ++++++++ libnetwork/networkdb/networkdb.go | 15 +++++++++++---- libnetwork/sandbox.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 1c75b6fea5..ee849bb12d 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -74,6 +74,7 @@ type endpoint struct { ingressPorts []*PortConfig dbIndex uint64 dbExists bool + serviceEnabled bool sync.Mutex } @@ -303,6 +304,18 @@ func (ep *endpoint) isAnonymous() bool { return ep.anonymous } +// enableService sets ep's serviceEnabled to the passed value if it's not in the +// current state and returns true; false otherwise. +func (ep *endpoint) enableService(state bool) bool { + ep.Lock() + defer ep.Unlock() + if ep.serviceEnabled != state { + ep.serviceEnabled = state + return true + } + return false +} + func (ep *endpoint) needResolver() bool { ep.Lock() defer ep.Unlock() @@ -500,10 +513,6 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { return err } - if e := ep.addToCluster(); e != nil { - log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e) - } - if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil { return sb.setupDefaultGW() } diff --git a/libnetwork/libnetwork_test.go b/libnetwork/libnetwork_test.go index 1eee7fa6b8..4510174d8b 100644 --- a/libnetwork/libnetwork_test.go +++ b/libnetwork/libnetwork_test.go @@ -855,6 +855,14 @@ func (f *fakeSandbox) Endpoints() []libnetwork.Endpoint { return nil } +func (f *fakeSandbox) EnableService() error { + return nil +} + +func (f *fakeSandbox) DisableService() error { + return nil +} + func TestEndpointDeleteWithActiveContainer(t *testing.T) { if !testutils.IsRunningInContainer() { defer testutils.SetupTestOSContext(t)() diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index a79b4231d2..2074edb92b 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -11,6 +11,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/armon/go-radix" "github.com/docker/go-events" + "github.com/docker/libnetwork/types" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" ) @@ -217,7 +218,7 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) if !ok { - return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) + return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) } return e.(*entry), nil @@ -227,10 +228,16 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { // table, key) tuple and if the NetworkDB is part of the cluster // propogates this event to the cluster. It is an error to create an // entry for the same tuple for which there is already an existing -// entry. +// entry unless the current entry is deleting state. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { - if _, err := nDB.GetEntry(tname, nid, key); err == nil { - return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key) + oldEntry, err := nDB.getEntry(tname, nid, key) + if err != nil { + if _, ok := err.(types.NotFoundError); !ok { + return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err) + } + } + if oldEntry != nil && !oldEntry.deleting { + return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key) } entry := &entry{ diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index 7d1eef5b73..49121f351e 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -42,6 +42,12 @@ type Sandbox interface { // ResolveService returns all the backend details about the containers or hosts // backing a service. Its purpose is to satisfy an SRV query ResolveService(name string) ([]*net.SRV, []net.IP) + // EnableService makes a managed container's service available by adding the + // endpoint to the service load balancer and service discovery + EnableService() error + // DisableService removes a managed contianer's endpoints from the load balancer + // and service discovery + DisableService() error } // SandboxOption is an option setter function type used to pass various options to @@ -655,6 +661,30 @@ func (sb *sandbox) SetKey(basePath string) error { return nil } +func (sb *sandbox) EnableService() error { + for _, ep := range sb.getConnectedEndpoints() { + if ep.enableService(true) { + if err := ep.addToCluster(); err != nil { + ep.enableService(false) + return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err) + } + } + } + return nil +} + +func (sb *sandbox) DisableService() error { + for _, ep := range sb.getConnectedEndpoints() { + if ep.enableService(false) { + if err := ep.deleteFromCluster(); err != nil { + ep.enableService(true) + return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err) + } + } + } + return nil +} + func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) { for _, i := range osSbox.Info().Interfaces() { // Only remove the interfaces owned by this endpoint from the sandbox.