diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index 2c3f4788c2..e5c364c307 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -6,7 +6,6 @@ package overlay //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto import ( - "context" "fmt" "net" "sync" @@ -53,8 +52,7 @@ type driver struct { joinOnce sync.Once localJoinOnce sync.Once keys []*key - peerOpCh chan *peerOperation - peerOpCancel context.CancelFunc + peerOpMu sync.Mutex sync.Mutex } @@ -69,16 +67,10 @@ func Register(r driverapi.Registerer, config map[string]interface{}) error { peerDb: peerNetworkMap{ mp: map[string]*peerMap{}, }, - secMap: &encrMap{nodes: map[string][]*spi{}}, - config: config, - peerOpCh: make(chan *peerOperation), + secMap: &encrMap{nodes: map[string][]*spi{}}, + config: config, } - // Launch the go routine for processing peer operations - ctx, cancel := context.WithCancel(context.Background()) - d.peerOpCancel = cancel - go d.peerOpRoutine(ctx, d.peerOpCh) - if data, ok := config[netlabel.GlobalKVClient]; ok { var err error dsc, ok := data.(discoverapi.DatastoreConfigData) @@ -166,11 +158,6 @@ func (d *driver) restoreEndpoints() error { func Fini(drv driverapi.Driver) { d := drv.(*driver) - // Notify the peer go routine to return - if d.peerOpCancel != nil { - d.peerOpCancel() - } - if d.exitCh != nil { waitCh := make(chan struct{}) diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index 31ea2d3445..56f58f10d0 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -4,13 +4,11 @@ package overlay import ( - "context" "fmt" "net" "sync" "syscall" - "github.com/docker/docker/libnetwork/internal/caller" "github.com/docker/docker/libnetwork/internal/setmatrix" "github.com/docker/docker/libnetwork/osl" "github.com/sirupsen/logrus" @@ -247,62 +245,10 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM // in one single atomic operation. This is fundamental to guarantee consistency, and avoid that // new peerAdd or peerDelete gets reordered during the sandbox init. func (d *driver) initSandboxPeerDB(nid string) { - d.peerInit(nid) -} - -type peerOperationType int32 - -const ( - peerOperationINIT peerOperationType = iota - peerOperationADD - peerOperationDELETE - peerOperationFLUSH -) - -type peerOperation struct { - opType peerOperationType - networkID string - endpointID string - peerIP net.IP - peerIPMask net.IPMask - peerMac net.HardwareAddr - vtepIP net.IP - l2Miss bool - l3Miss bool - localPeer bool - callerName string -} - -func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { - var err error - for { - select { - case <-ctx.Done(): - return - case op := <-ch: - switch op.opType { - case peerOperationINIT: - err = d.peerInitOp(op.networkID) - case peerOperationADD: - err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.l2Miss, op.l3Miss, true, op.localPeer) - case peerOperationDELETE: - err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer) - case peerOperationFLUSH: - err = d.peerFlushOp(op.networkID) - } - if err != nil { - logrus.Warnf("Peer operation failed:%s op:%v", err, op) - } - } - } -} - -func (d *driver) peerInit(nid string) { - callerName := caller.Name(1) - d.peerOpCh <- &peerOperation{ - opType: peerOperationINIT, - networkID: nid, - callerName: callerName, + d.peerOpMu.Lock() + defer d.peerOpMu.Unlock() + if err := d.peerInitOp(nid); err != nil { + logrus.WithError(err).Warn("Peer init operation failed") } } @@ -321,18 +267,11 @@ func (d *driver) peerInitOp(nid string) error { func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, l2Miss, l3Miss, localPeer bool) { - d.peerOpCh <- &peerOperation{ - opType: peerOperationADD, - networkID: nid, - endpointID: eid, - peerIP: peerIP, - peerIPMask: peerIPMask, - peerMac: peerMac, - vtepIP: vtep, - l2Miss: l2Miss, - l3Miss: l3Miss, - localPeer: localPeer, - callerName: caller.Name(1), + d.peerOpMu.Lock() + defer d.peerOpMu.Unlock() + err := d.peerAddOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, l2Miss, l3Miss, true, localPeer) + if err != nil { + logrus.WithError(err).Warn("Peer add operation failed") } } @@ -413,16 +352,11 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, localPeer bool) { - d.peerOpCh <- &peerOperation{ - opType: peerOperationDELETE, - networkID: nid, - endpointID: eid, - peerIP: peerIP, - peerIPMask: peerIPMask, - peerMac: peerMac, - vtepIP: vtep, - callerName: caller.Name(1), - localPeer: localPeer, + d.peerOpMu.Lock() + defer d.peerOpMu.Unlock() + err := d.peerDeleteOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer) + if err != nil { + logrus.WithError(err).Warn("Peer delete operation failed") } } @@ -485,10 +419,10 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPM } func (d *driver) peerFlush(nid string) { - d.peerOpCh <- &peerOperation{ - opType: peerOperationFLUSH, - networkID: nid, - callerName: caller.Name(1), + d.peerOpMu.Lock() + defer d.peerOpMu.Unlock() + if err := d.peerFlushOp(nid); err != nil { + logrus.WithError(err).Warn("Peer flush operation failed") } }