Merge pull request #1519 from sanimej/newlb

Add sandbox API for task insertion to service LB and service discovery
This commit is contained in:
Alessandro Boch 2016-11-03 13:31:46 -07:00 committed by GitHub
commit c5ca82daf4
4 changed files with 62 additions and 8 deletions

View file

@ -74,6 +74,7 @@ type endpoint struct {
ingressPorts []*PortConfig
dbIndex uint64
dbExists bool
serviceEnabled bool
sync.Mutex
}
@ -303,6 +304,18 @@ func (ep *endpoint) isAnonymous() bool {
return ep.anonymous
}
// enableService sets ep's serviceEnabled to the passed value if it's not in the
// current state and returns true; false otherwise.
func (ep *endpoint) enableService(state bool) bool {
ep.Lock()
defer ep.Unlock()
if ep.serviceEnabled != state {
ep.serviceEnabled = state
return true
}
return false
}
func (ep *endpoint) needResolver() bool {
ep.Lock()
defer ep.Unlock()
@ -502,10 +515,6 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
return err
}
if e := ep.addToCluster(); e != nil {
log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e)
}
if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
return sb.setupDefaultGW()
}

View file

@ -855,6 +855,14 @@ func (f *fakeSandbox) Endpoints() []libnetwork.Endpoint {
return nil
}
func (f *fakeSandbox) EnableService() error {
return nil
}
func (f *fakeSandbox) DisableService() error {
return nil
}
func TestEndpointDeleteWithActiveContainer(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()

View file

@ -11,6 +11,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/armon/go-radix"
"github.com/docker/go-events"
"github.com/docker/libnetwork/types"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
@ -237,7 +238,7 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
if !ok {
return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
}
return e.(*entry), nil
@ -247,10 +248,16 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster. It is an error to create an
// entry for the same tuple for which there is already an existing
// entry.
// entry unless the current entry is deleting state.
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err == nil {
return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key)
oldEntry, err := nDB.getEntry(tname, nid, key)
if err != nil {
if _, ok := err.(types.NotFoundError); !ok {
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
}
}
if oldEntry != nil && !oldEntry.deleting {
return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
}
entry := &entry{

View file

@ -42,6 +42,12 @@ type Sandbox interface {
// ResolveService returns all the backend details about the containers or hosts
// backing a service. Its purpose is to satisfy an SRV query
ResolveService(name string) ([]*net.SRV, []net.IP)
// EnableService makes a managed container's service available by adding the
// endpoint to the service load balancer and service discovery
EnableService() error
// DisableService removes a managed contianer's endpoints from the load balancer
// and service discovery
DisableService() error
}
// SandboxOption is an option setter function type used to pass various options to
@ -655,6 +661,30 @@ func (sb *sandbox) SetKey(basePath string) error {
return nil
}
func (sb *sandbox) EnableService() error {
for _, ep := range sb.getConnectedEndpoints() {
if ep.enableService(true) {
if err := ep.addToCluster(); err != nil {
ep.enableService(false)
return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
}
}
}
return nil
}
func (sb *sandbox) DisableService() error {
for _, ep := range sb.getConnectedEndpoints() {
if ep.enableService(false) {
if err := ep.deleteFromCluster(); err != nil {
ep.enableService(true)
return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
}
}
}
return nil
}
func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
for _, i := range osSbox.Info().Interfaces() {
// Only remove the interfaces owned by this endpoint from the sandbox.