Merge pull request #712 from sanimej/bfix

Handling container rename in libnetwork
This commit is contained in:
Jana Radhakrishnan 2015-10-26 16:12:37 -07:00
commit bd77346032
6 changed files with 119 additions and 14 deletions

View file

@ -143,6 +143,7 @@ type controller struct {
watchCh chan *endpoint
unWatchCh chan *endpoint
svcDb map[string]svcMap
nmap map[string]*netWatch
sync.Mutex
}

View file

@ -431,6 +431,51 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
return sb.clearDefaultGW()
}
func (ep *endpoint) rename(name string) error {
var err error
n := ep.getNetwork()
if n == nil {
return fmt.Errorf("network not connected for ep %q", ep.name)
}
n.getController().Lock()
netWatch, ok := n.getController().nmap[n.ID()]
n.getController().Unlock()
if !ok {
return fmt.Errorf("watch null for network %q", n.Name())
}
n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
oldName := ep.name
ep.name = name
n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
defer func() {
if err != nil {
n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
ep.name = oldName
n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
}
}()
// Update the store with the updated name
if err = n.getController().updateToStore(ep); err != nil {
return err
}
// After the name change do a dummy endpoint count update to
// trigger the service record update in the peer nodes
// Ignore the error because updateStore fail for EpCnt is a
// benign error. Besides there is no meaningful recovery that
// we can do. When the cluster recovers subsequent EpCnt update
// will force the peers to get the correct EP name.
_ = n.getEpCnt().updateStore()
return err
}
func (ep *endpoint) hasInterface(iName string) bool {
ep.Lock()
defer ep.Unlock()

View file

@ -108,6 +108,25 @@ func (ec *endpointCnt) EndpointCnt() uint64 {
return ec.Count
}
func (ec *endpointCnt) updateStore() error {
retry:
store := ec.n.getController().getStore(ec.DataScope())
if store == nil {
return fmt.Errorf("store not found for scope %s", ec.DataScope())
}
if err := ec.n.getController().updateToStore(ec); err != nil {
if err == datastore.ErrKeyModified {
if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
return fmt.Errorf("could not update the kvobject to latest on rename: %v", err)
}
goto retry
}
return err
}
return nil
}
func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
retry:
ec.Lock()

View file

@ -1134,6 +1134,10 @@ func (f *fakeSandbox) Delete() error {
return nil
}
func (f *fakeSandbox) Rename(name string) error {
return nil
}
func (f *fakeSandbox) SetKey(key string) error {
return nil
}

View file

@ -34,6 +34,8 @@ type Sandbox interface {
Refresh(options ...SandboxOption) error
// SetKey updates the Sandbox Key
SetKey(key string) error
// Rename changes the name of all attached Endpoints
Rename(name string) error
// Delete destroys this container after detaching it from all connected endpoints.
Delete() error
}
@ -201,6 +203,35 @@ func (sb *sandbox) Delete() error {
return nil
}
func (sb *sandbox) Rename(name string) error {
var err error
undo := []func(){}
for _, ep := range sb.getConnectedEndpoints() {
if ep.endpointInGWNetwork() {
continue
}
oldName := ep.Name()
lEp := ep
if err = ep.rename(name); err != nil {
break
}
undo = append(undo,
func() {
// Ignore the error while undoing
_ = lEp.rename(oldName)
})
}
if err != nil {
for _, f := range undo {
f()
}
}
return err
}
func (sb *sandbox) Refresh(options ...SandboxOption) error {
// Store connected endpoints
epList := sb.getConnectedEndpoints()

View file

@ -274,25 +274,30 @@ func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan da
continue
}
if _, ok := nw.remoteEps[lEp.ID()]; ok {
delete(delEpMap, lEp.ID())
continue
if ep, ok := nw.remoteEps[lEp.ID()]; ok {
// On a container rename EP ID will remain
// the same but the name will change. service
// records should reflect the change.
// Keep old EP entry in the delEpMap and add
// EP from the store (which has the new name)
// into the new list
if lEp.name == ep.name {
delete(delEpMap, lEp.ID())
continue
}
}
nw.remoteEps[lEp.ID()] = lEp
addEp = append(addEp, lEp)
}
c.Unlock()
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
for _, lEp := range delEpMap {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
}
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
}
}
}
@ -378,13 +383,13 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
c.Unlock()
}
func (c *controller) watchLoop(nmap map[string]*netWatch) {
func (c *controller) watchLoop() {
for {
select {
case ep := <-c.watchCh:
c.processEndpointCreate(nmap, ep)
c.processEndpointCreate(c.nmap, ep)
case ep := <-c.unWatchCh:
c.processEndpointDelete(nmap, ep)
c.processEndpointDelete(c.nmap, ep)
}
}
}
@ -392,7 +397,7 @@ func (c *controller) watchLoop(nmap map[string]*netWatch) {
func (c *controller) startWatch() {
c.watchCh = make(chan *endpoint)
c.unWatchCh = make(chan *endpoint)
nmap := make(map[string]*netWatch)
c.nmap = make(map[string]*netWatch)
go c.watchLoop(nmap)
go c.watchLoop()
}