diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 4877df1c34..0ccdd87de7 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -741,11 +741,12 @@ func (n *network) addDriverWatches() { return } - agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool { - if nid == n.ID() { + agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool { + // skip the entries that are mark for deletion, this is safe because this function is + // called at initialization time so there is no state to delete + if nid == n.ID() && !deleted { d.EventNotify(driverapi.Create, nid, table.name, key, value) } - return false }) } diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index 01b53ac71c..9575350bf6 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -683,10 +683,12 @@ func (n *network) initSandbox(restore bool) error { return fmt.Errorf("could not get network sandbox (oper %t): %v", restore, err) } + // this is needed to let the peerAdd configure the sandbox n.setSandbox(sbox) if !restore { - n.driver.peerDbUpdateSandbox(n.id) + // Initialize the sandbox with all the peers previously received from networkdb + n.driver.initSandboxPeerDB(n.id) } var nlSock *nl.NetlinkSocket diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index e9f249cfa4..4b41bf7658 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -203,38 +203,31 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM return pEntry } -func (d *driver) peerDbUpdateSandbox(nid string) { - d.peerDb.Lock() - pMap, ok := d.peerDb.mp[nid] - if !ok { - d.peerDb.Unlock() - return - } - d.peerDb.Unlock() - - pMap.Lock() - for pKeyStr, pEntry := range pMap.mp { - var pKey peerKey - if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { - logrus.Errorf("peer key scan failed: %v", err) - } - - if pEntry.isLocal { - continue - } - - // Go captures variables by reference. The pEntry could be - // pointing to the same memory location for every iteration. Make - // a copy of pEntry before capturing it in the following closure. - entry := pEntry - - d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false) - } - pMap.Unlock() +// The overlay uses a lazy initialization approach, this means that when a network is created +// and the driver registered the overlay does not allocate resources till the moment that a +// sandbox is actually created. +// At the moment of this call, that happens when a sandbox is initialized, is possible that +// networkDB has already delivered some events of peers already available on remote nodes, +// these peers are saved into the peerDB and this function is used to properly configure +// the network sandbox with all those peers that got previously notified. +// Note also that this method sends a single message on the channel and the go routine on the +// other side, will atomically loop on the whole table of peers and will program their state +// in one single atomic operation. This is fundamental to guarantee consistency, and avoid that +// new peerAdd or peerDelete gets reordered during the sandbox init. +func (d *driver) initSandboxPeerDB(nid string) { + d.peerInit(nid) } +type peerOperationType int32 + +const ( + peerOperationINIT peerOperationType = iota + peerOperationADD + peerOperationDELETE +) + type peerOperation struct { - isAdd bool + opType peerOperationType networkID string endpointID string peerIP net.IP @@ -255,9 +248,12 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { case <-ctx.Done(): return case op := <-ch: - if op.isAdd { + switch op.opType { + case peerOperationINIT: + err = d.peerInitOp(op.networkID) + case peerOperationADD: err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer) - } else { + case peerOperationDELETE: err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer) } if err != nil { @@ -267,11 +263,33 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { } } +func (d *driver) peerInit(nid string) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + opType: peerOperationINIT, + networkID: nid, + callerName: callerName, + } +} + +func (d *driver) peerInitOp(nid string) error { + return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool { + // Local entries do not need to be added + if pEntry.isLocal { + return false + } + + d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, false, false, false) + // return false to loop on all entries + return false + }) +} + func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) { callerName := common.CallerName(1) d.peerOpCh <- &peerOperation{ - isAdd: true, + opType: peerOperationADD, networkID: nid, endpointID: eid, peerIP: peerIP, @@ -307,6 +325,9 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask sbox := n.sandbox() if sbox == nil { + // We are hitting this case for all the events that are arriving before that the sandbox + // is being created. The peer got already added into the database and the sanbox init will + // call the peerDbUpdateSandbox that will configure all these peers from the database return nil } @@ -350,7 +371,7 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas peerMac net.HardwareAddr, vtep net.IP, updateDb bool) { callerName := common.CallerName(1) d.peerOpCh <- &peerOperation{ - isAdd: false, + opType: peerOperationDELETE, networkID: nid, endpointID: eid, peerIP: peerIP, diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 6447a16357..4f1860ff91 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -524,7 +524,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { // WalkTable walks a single table in NetworkDB and invokes the passed // function for each entry in the table passing the network, key, // value. The walk stops if the passed function returns a true. -func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error { +func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error { nDB.RLock() values := make(map[string]interface{}) nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool { @@ -537,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo params := strings.Split(k[1:], "/") nid := params[1] key := params[2] - if fn(nid, key, v.(*entry).value) { + if fn(nid, key, v.(*entry).value, v.(*entry).deleting) { return nil } }