Просмотр исходного кода

Merge pull request #1893 from fcrisciani/service-issue

Service connectivity issue
Madhu Venugopal 8 лет назад
Родитель
Сommit
392231e6ce

+ 4 - 3
libnetwork/agent.go

@@ -741,11 +741,12 @@ func (n *network) addDriverWatches() {
 			return
 		}
 
-		agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
-			if nid == n.ID() {
+		agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
+			// skip the entries that are mark for deletion, this is safe because this function is
+			// called at initialization time so there is no state to delete
+			if nid == n.ID() && !deleted {
 				d.EventNotify(driverapi.Create, nid, table.name, key, value)
 			}
-
 			return false
 		})
 	}

+ 29 - 0
libnetwork/common/caller.go

@@ -0,0 +1,29 @@
+package common
+
+import (
+	"runtime"
+	"strings"
+)
+
+func callerInfo(i int) string {
+	ptr, _, _, ok := runtime.Caller(i)
+	fName := "unknown"
+	if ok {
+		f := runtime.FuncForPC(ptr)
+		if f != nil {
+			// f.Name() is like: github.com/docker/libnetwork/common.MethodName
+			tmp := strings.Split(f.Name(), ".")
+			if len(tmp) > 0 {
+				fName = tmp[len(tmp)-1]
+			}
+		}
+	}
+
+	return fName
+}
+
+// CallerName returns the name of the function at the specified level
+// level == 0 means current method name
+func CallerName(level int) string {
+	return callerInfo(2 + level)
+}

+ 49 - 0
libnetwork/common/caller_test.go

@@ -0,0 +1,49 @@
+package common
+
+import "testing"
+
+func fun1() string {
+	return CallerName(0)
+}
+
+func fun2() string {
+	return CallerName(1)
+}
+
+func fun3() string {
+	return fun4()
+}
+
+func fun4() string {
+	return CallerName(0)
+}
+
+func fun5() string {
+	return fun6()
+}
+
+func fun6() string {
+	return CallerName(1)
+}
+
+func TestCaller(t *testing.T) {
+	funName := fun1()
+	if funName != "fun1" {
+		t.Fatalf("error on fun1 caller %s", funName)
+	}
+
+	funName = fun2()
+	if funName != "TestCaller" {
+		t.Fatalf("error on fun2 caller %s", funName)
+	}
+
+	funName = fun3()
+	if funName != "fun4" {
+		t.Fatalf("error on fun2 caller %s", funName)
+	}
+
+	funName = fun5()
+	if funName != "fun5" {
+		t.Fatalf("error on fun5 caller %s", funName)
+	}
+}

+ 2 - 3
libnetwork/drivers/overlay/joinleave.go

@@ -120,8 +120,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
 		}
 	}
 
-	d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
-		net.ParseIP(d.advertiseAddress), true)
+	d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
 
 	if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil {
 		logrus.Warn(err)
@@ -205,7 +204,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
 		return
 	}
 
-	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
+	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false)
 }
 
 // Leave method is invoked when a Sandbox detaches from an endpoint.

+ 4 - 5
libnetwork/drivers/overlay/ov_network.go

@@ -683,10 +683,12 @@ func (n *network) initSandbox(restore bool) error {
 		return fmt.Errorf("could not get network sandbox (oper %t): %v", restore, err)
 	}
 
+	// this is needed to let the peerAdd configure the sandbox
 	n.setSandbox(sbox)
 
 	if !restore {
-		n.driver.peerDbUpdateSandbox(n.id)
+		// Initialize the sandbox with all the peers previously received from networkdb
+		n.driver.initSandboxPeerDB(n.id)
 	}
 
 	var nlSock *nl.NetlinkSocket
@@ -765,10 +767,7 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
 					logrus.Errorf("could not resolve peer %q: %v", ip, err)
 					continue
 				}
-
-				if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
-					logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
-				}
+				n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false)
 			} else {
 				// If the gc_thresh values are lower kernel might knock off the neighor entries.
 				// When we get a L3 miss check if its a valid peer and reprogram the neighbor

+ 3 - 8
libnetwork/drivers/overlay/ov_serf.go

@@ -120,15 +120,10 @@ func (d *driver) processEvent(u serf.UserEvent) {
 
 	switch action {
 	case "join":
-		if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
-			net.ParseIP(vtepStr), true, false, false); err != nil {
-			logrus.Errorf("Peer add failed in the driver: %v\n", err)
-		}
+		d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr),
+			true, false, false, false)
 	case "leave":
-		if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
-			net.ParseIP(vtepStr), true); err != nil {
-			logrus.Errorf("Peer delete failed in the driver: %v\n", err)
-		}
+		d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true)
 	}
 }
 

+ 17 - 3
libnetwork/drivers/overlay/overlay.go

@@ -3,6 +3,7 @@ package overlay
 //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
 
 import (
+	"context"
 	"fmt"
 	"net"
 	"sync"
@@ -50,6 +51,8 @@ type driver struct {
 	joinOnce         sync.Once
 	localJoinOnce    sync.Once
 	keys             []*key
+	peerOpCh         chan *peerOperation
+	peerOpCancel     context.CancelFunc
 	sync.Mutex
 }
 
@@ -64,10 +67,16 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
 		peerDb: peerNetworkMap{
 			mp: map[string]*peerMap{},
 		},
-		secMap: &encrMap{nodes: map[string][]*spi{}},
-		config: config,
+		secMap:   &encrMap{nodes: map[string][]*spi{}},
+		config:   config,
+		peerOpCh: make(chan *peerOperation),
 	}
 
+	// Launch the go routine for processing peer operations
+	ctx, cancel := context.WithCancel(context.Background())
+	d.peerOpCancel = cancel
+	go d.peerOpRoutine(ctx, d.peerOpCh)
+
 	if data, ok := config[netlabel.GlobalKVClient]; ok {
 		var err error
 		dsc, ok := data.(discoverapi.DatastoreConfigData)
@@ -161,7 +170,7 @@ func (d *driver) restoreEndpoints() error {
 		}
 
 		n.incEndpointCount()
-		d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
+		d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
 	}
 	return nil
 }
@@ -170,6 +179,11 @@ func (d *driver) restoreEndpoints() error {
 func Fini(drv driverapi.Driver) {
 	d := drv.(*driver)
 
+	// Notify the peer go routine to return
+	if d.peerOpCancel != nil {
+		d.peerOpCancel()
+	}
+
 	if d.exitCh != nil {
 		waitCh := make(chan struct{})
 

+ 114 - 47
libnetwork/drivers/overlay/peerdb.go

@@ -1,12 +1,14 @@
 package overlay
 
 import (
+	"context"
 	"fmt"
 	"net"
 	"sync"
 	"syscall"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/common"
 )
 
 const ovPeerTable = "overlay_peer_table"
@@ -59,8 +61,6 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
 	return nil
 }
 
-var peerDbWg sync.WaitGroup
-
 func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
 	d.peerDb.Lock()
 	nids := []string{}
@@ -141,8 +141,6 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.
 func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, isLocal bool) {
 
-	peerDbWg.Wait()
-
 	d.peerDb.Lock()
 	pMap, ok := d.peerDb.mp[nid]
 	if !ok {
@@ -173,7 +171,6 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
 
 func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP) peerEntry {
-	peerDbWg.Wait()
 
 	d.peerDb.Lock()
 	pMap, ok := d.peerDb.mp[nid]
@@ -206,61 +203,109 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
 	return pEntry
 }
 
-func (d *driver) peerDbUpdateSandbox(nid string) {
-	// The following logic is useful only in non swarm mode
-	// In swarm mode instead the programmig will come directly from networkDB
-	if !d.isSerfAlive() {
-		return
-	}
-
-	d.peerDb.Lock()
-	pMap, ok := d.peerDb.mp[nid]
-	if !ok {
-		d.peerDb.Unlock()
-		return
-	}
-	d.peerDb.Unlock()
+// The overlay uses a lazy initialization approach, this means that when a network is created
+// and the driver registered the overlay does not allocate resources till the moment that a
+// sandbox is actually created.
+// At the moment of this call, that happens when a sandbox is initialized, is possible that
+// networkDB has already delivered some events of peers already available on remote nodes,
+// these peers are saved into the peerDB and this function is used to properly configure
+// the network sandbox with all those peers that got previously notified.
+// Note also that this method sends a single message on the channel and the go routine on the
+// other side, will atomically loop on the whole table of peers and will program their state
+// in one single atomic operation. This is fundamental to guarantee consistency, and avoid that
+// new peerAdd or peerDelete gets reordered during the sandbox init.
+func (d *driver) initSandboxPeerDB(nid string) {
+	d.peerInit(nid)
+}
 
-	peerDbWg.Add(1)
+type peerOperationType int32
 
-	var peerOps []func()
-	pMap.Lock()
-	for pKeyStr, pEntry := range pMap.mp {
-		var pKey peerKey
-		if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
-			logrus.Errorf("peer key scan failed: %v", err)
-		}
+const (
+	peerOperationINIT peerOperationType = iota
+	peerOperationADD
+	peerOperationDELETE
+)
 
-		if pEntry.isLocal {
-			continue
-		}
+type peerOperation struct {
+	opType     peerOperationType
+	networkID  string
+	endpointID string
+	peerIP     net.IP
+	peerIPMask net.IPMask
+	peerMac    net.HardwareAddr
+	vtepIP     net.IP
+	updateDB   bool
+	l2Miss     bool
+	l3Miss     bool
+	localPeer  bool
+	callerName string
+}
 
-		// Go captures variables by reference. The pEntry could be
-		// pointing to the same memory location for every iteration. Make
-		// a copy of pEntry before capturing it in the following closure.
-		entry := pEntry
-		op := func() {
-			if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
-				pKey.peerMac, entry.vtep,
-				false, false, false); err != nil {
-				logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
-					pKey.peerIP, pKey.peerMac, err)
+func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
+	var err error
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case op := <-ch:
+			switch op.opType {
+			case peerOperationINIT:
+				err = d.peerInitOp(op.networkID)
+			case peerOperationADD:
+				err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer)
+			case peerOperationDELETE:
+				err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
+			}
+			if err != nil {
+				logrus.Warnf("Peer operation failed:%s op:%v", err, op)
 			}
 		}
-
-		peerOps = append(peerOps, op)
 	}
-	pMap.Unlock()
+}
 
-	for _, op := range peerOps {
-		op()
+func (d *driver) peerInit(nid string) {
+	callerName := common.CallerName(1)
+	d.peerOpCh <- &peerOperation{
+		opType:     peerOperationINIT,
+		networkID:  nid,
+		callerName: callerName,
 	}
+}
 
-	peerDbWg.Done()
+func (d *driver) peerInitOp(nid string) error {
+	return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
+		// Local entries do not need to be added
+		if pEntry.isLocal {
+			return false
+		}
+
+		d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, false, false, false)
+		// return false to loop on all entries
+		return false
+	})
 }
 
 func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
-	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
+	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) {
+	callerName := common.CallerName(1)
+	d.peerOpCh <- &peerOperation{
+		opType:     peerOperationADD,
+		networkID:  nid,
+		endpointID: eid,
+		peerIP:     peerIP,
+		peerIPMask: peerIPMask,
+		peerMac:    peerMac,
+		vtepIP:     vtep,
+		updateDB:   updateDb,
+		l2Miss:     l2Miss,
+		l3Miss:     l3Miss,
+		localPeer:  localPeer,
+		callerName: callerName,
+	}
+}
+
+func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
+	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error {
 
 	if err := validateID(nid, eid); err != nil {
 		return err
@@ -268,6 +313,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 
 	if updateDb {
 		d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false)
+		if updateOnlyDB {
+			return nil
+		}
 	}
 
 	n := d.network(nid)
@@ -277,6 +325,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 
 	sbox := n.sandbox()
 	if sbox == nil {
+		// We are hitting this case for all the events that are arriving before that the sandbox
+		// is being created. The peer got already added into the database and the sanbox init will
+		// call the peerDbUpdateSandbox that will configure all these peers from the database
 		return nil
 	}
 
@@ -317,6 +368,22 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 }
 
 func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
+	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) {
+	callerName := common.CallerName(1)
+	d.peerOpCh <- &peerOperation{
+		opType:     peerOperationDELETE,
+		networkID:  nid,
+		endpointID: eid,
+		peerIP:     peerIP,
+		peerIPMask: peerIPMask,
+		peerMac:    peerMac,
+		vtepIP:     vtep,
+		updateDB:   updateDb,
+		callerName: callerName,
+	}
+}
+
+func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
 
 	if err := validateID(nid, eid); err != nil {

+ 2 - 2
libnetwork/networkdb/networkdb.go

@@ -524,7 +524,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
 // WalkTable walks a single table in NetworkDB and invokes the passed
 // function for each entry in the table passing the network, key,
 // value. The walk stops if the passed function returns a true.
-func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
+func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
 	nDB.RLock()
 	values := make(map[string]interface{})
 	nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
@@ -537,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo
 		params := strings.Split(k[1:], "/")
 		nid := params[1]
 		key := params[2]
-		if fn(nid, key, v.(*entry).value) {
+		if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
 			return nil
 		}
 	}