Serialize the endpoint join/leave at the sandbox level

Signed-off-by: Santhosh Manohar <santhosh@docker.com>
This commit is contained in:
Santhosh Manohar 2015-09-18 17:33:55 -07:00
parent eb54ed5d42
commit 1cd9c4dcaa
3 changed files with 86 additions and 59 deletions

View file

@ -68,8 +68,9 @@ func (sb *sandbox) setupDefaultGW(srcEp *endpoint) error {
if err != nil {
return fmt.Errorf("container %s: endpoint create on GW Network failed: %v", sb.containerID, err)
}
epLocal := newEp.(*endpoint)
if err := newEp.Join(sb); err != nil {
if err := epLocal.sbJoin(sb); err != nil {
return fmt.Errorf("container %s: endpoint join on GW Network failed: %v", sb.containerID, err)
}
return nil
@ -82,7 +83,7 @@ func (sb *sandbox) clearDefaultGW() error {
return nil
}
if err := ep.Leave(sb); err != nil {
if err := ep.sbLeave(sb); err != nil {
return fmt.Errorf("container %s: endpoint leaving GW Network failed: %v", sb.containerID, err)
}
if err := ep.Delete(); err != nil {

View file

@ -183,40 +183,7 @@ func (ep *endpoint) processOptions(options ...EndpointOption) {
}
}
// joinLeaveStart waits to ensure there are no joins or leaves in progress and
// marks this join/leave in progress without race
func (ep *endpoint) joinLeaveStart() {
ep.Lock()
defer ep.Unlock()
for ep.joinLeaveDone != nil {
joinLeaveDone := ep.joinLeaveDone
ep.Unlock()
select {
case <-joinLeaveDone:
}
ep.Lock()
}
ep.joinLeaveDone = make(chan struct{})
}
// joinLeaveEnd marks the end of this join/leave operation and
// signals the same without race to other join and leave waiters
func (ep *endpoint) joinLeaveEnd() {
ep.Lock()
defer ep.Unlock()
if ep.joinLeaveDone != nil {
close(ep.joinLeaveDone)
ep.joinLeaveDone = nil
}
}
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
var err error
if sbox == nil {
return types.BadRequestErrorf("endpoint cannot be joined by nil container")
@ -227,8 +194,18 @@ func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
ep.joinLeaveStart()
defer ep.joinLeaveEnd()
sb.joinLeaveStart()
defer sb.joinLeaveEnd()
return ep.sbJoin(sbox, options...)
}
func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
var err error
sb, ok := sbox.(*sandbox)
if !ok {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
ep.Lock()
if ep.sandboxID != "" {
@ -319,9 +296,6 @@ func (ep *endpoint) hasInterface(iName string) bool {
}
func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
ep.joinLeaveStart()
defer ep.joinLeaveEnd()
if sbox == nil || sbox.ID() == "" || sbox.Key() == "" {
return types.BadRequestErrorf("invalid Sandbox passed to enpoint leave: %v", sbox)
}
@ -331,6 +305,18 @@ func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
sb.joinLeaveStart()
defer sb.joinLeaveEnd()
return ep.sbLeave(sbox, options...)
}
func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
sb, ok := sbox.(*sandbox)
if !ok {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
ep.Lock()
sid := ep.sandboxID
ep.Unlock()

View file

@ -347,15 +347,16 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
}
}
sb.Lock()
highEp := sb.endpoints[0]
sb.Unlock()
if ep == highEp {
if err := sb.updateGateway(ep); err != nil {
return err
for _, gwep := range sb.getConnectedEndpoints() {
if len(gwep.Gateway()) > 0 {
if gwep != ep {
return nil
}
if err := sb.updateGateway(gwep); err != nil {
return err
}
}
}
return nil
}
@ -394,26 +395,33 @@ func (sb *sandbox) clearNetworkResources(ep *endpoint) error {
return nil
}
highEpBefore := sb.endpoints[0]
var (
i int
e *endpoint
gwepBefore, gwepAfter *endpoint
index = -1
)
for i, e = range sb.endpoints {
for i, e := range sb.endpoints {
if e == ep {
index = i
}
if len(e.Gateway()) > 0 && gwepBefore == nil {
gwepBefore = e
}
if index != -1 && gwepBefore != nil {
break
}
}
heap.Remove(&sb.endpoints, i)
var highEpAfter *endpoint
if len(sb.endpoints) > 0 {
highEpAfter = sb.endpoints[0]
heap.Remove(&sb.endpoints, index)
for _, e := range sb.endpoints {
if len(e.Gateway()) > 0 {
gwepAfter = e
break
}
}
delete(sb.epPriority, ep.ID())
sb.Unlock()
if highEpBefore != highEpAfter {
sb.updateGateway(highEpAfter)
if gwepAfter != nil && gwepBefore != gwepAfter {
sb.updateGateway(gwepAfter)
}
return nil
@ -634,6 +642,38 @@ func (sb *sandbox) updateDNS(ipv6Enabled bool) error {
return os.Rename(tmpResolvFile.Name(), sb.config.resolvConfPath)
}
// joinLeaveStart waits to ensure there are no joins or leaves in progress and
// marks this join/leave in progress without race
func (sb *sandbox) joinLeaveStart() {
sb.Lock()
defer sb.Unlock()
for sb.joinLeaveDone != nil {
joinLeaveDone := sb.joinLeaveDone
sb.Unlock()
select {
case <-joinLeaveDone:
}
sb.Lock()
}
sb.joinLeaveDone = make(chan struct{})
}
// joinLeaveEnd marks the end of this join/leave operation and
// signals the same without race to other join and leave waiters
func (sb *sandbox) joinLeaveEnd() {
sb.Lock()
defer sb.Unlock()
if sb.joinLeaveDone != nil {
close(sb.joinLeaveDone)
sb.joinLeaveDone = nil
}
}
// OptionHostname function returns an option setter for hostname option to
// be passed to NewSandbox method.
func OptionHostname(name string) SandboxOption {
@ -757,11 +797,11 @@ func (eh epHeap) Less(i, j int) bool {
epj := eh[j]
if epi.endpointInGWNetwork() {
return true
return false
}
if epj.endpointInGWNetwork() {
return false
return true
}
cip, ok := ci.epPriority[eh[i].ID()]