Procházet zdrojové kódy

Merge pull request #1885 from fcrisciani/wait-grp-rev

Waitgroup crash fix
Madhu Venugopal před 8 roky
rodič
revize
6454dd3c17

+ 0 - 29
libnetwork/common/caller.go

@@ -1,29 +0,0 @@
-package common
-
-import (
-	"runtime"
-	"strings"
-)
-
-func callerInfo(i int) string {
-	ptr, _, _, ok := runtime.Caller(i)
-	fName := "unknown"
-	if ok {
-		f := runtime.FuncForPC(ptr)
-		if f != nil {
-			// f.Name() is like: github.com/docker/libnetwork/common.MethodName
-			tmp := strings.Split(f.Name(), ".")
-			if len(tmp) > 0 {
-				fName = tmp[len(tmp)-1]
-			}
-		}
-	}
-
-	return fName
-}
-
-// CallerName returns the name of the function at the specified level
-// level == 0 means current method name
-func CallerName(level int) string {
-	return callerInfo(2 + level)
-}

+ 0 - 49
libnetwork/common/caller_test.go

@@ -1,49 +0,0 @@
-package common
-
-import "testing"
-
-func fun1() string {
-	return CallerName(0)
-}
-
-func fun2() string {
-	return CallerName(1)
-}
-
-func fun3() string {
-	return fun4()
-}
-
-func fun4() string {
-	return CallerName(0)
-}
-
-func fun5() string {
-	return fun6()
-}
-
-func fun6() string {
-	return CallerName(1)
-}
-
-func TestCaller(t *testing.T) {
-	funName := fun1()
-	if funName != "fun1" {
-		t.Fatalf("error on fun1 caller %s", funName)
-	}
-
-	funName = fun2()
-	if funName != "TestCaller" {
-		t.Fatalf("error on fun2 caller %s", funName)
-	}
-
-	funName = fun3()
-	if funName != "fun4" {
-		t.Fatalf("error on fun2 caller %s", funName)
-	}
-
-	funName = fun5()
-	if funName != "fun5" {
-		t.Fatalf("error on fun5 caller %s", funName)
-	}
-}

+ 3 - 2
libnetwork/drivers/overlay/joinleave.go

@@ -120,7 +120,8 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
 		}
 	}
 
-	d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
+	d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
+		net.ParseIP(d.advertiseAddress), true)
 
 	if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil {
 		logrus.Warn(err)
@@ -204,7 +205,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
 		return
 	}
 
-	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false)
+	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
 }
 
 // Leave method is invoked when a Sandbox detaches from an endpoint.

+ 4 - 1
libnetwork/drivers/overlay/ov_network.go

@@ -765,7 +765,10 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
 					logrus.Errorf("could not resolve peer %q: %v", ip, err)
 					continue
 				}
-				n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false)
+
+				if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
+					logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
+				}
 			} else {
 				// If the gc_thresh values are lower kernel might knock off the neighor entries.
 				// When we get a L3 miss check if its a valid peer and reprogram the neighbor

+ 8 - 3
libnetwork/drivers/overlay/ov_serf.go

@@ -120,10 +120,15 @@ func (d *driver) processEvent(u serf.UserEvent) {
 
 	switch action {
 	case "join":
-		d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr),
-			true, false, false, false)
+		if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
+			net.ParseIP(vtepStr), true, false, false); err != nil {
+			logrus.Errorf("Peer add failed in the driver: %v\n", err)
+		}
 	case "leave":
-		d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true)
+		if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
+			net.ParseIP(vtepStr), true); err != nil {
+			logrus.Errorf("Peer delete failed in the driver: %v\n", err)
+		}
 	}
 }
 

+ 3 - 17
libnetwork/drivers/overlay/overlay.go

@@ -3,7 +3,6 @@ package overlay
 //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
 
 import (
-	"context"
 	"fmt"
 	"net"
 	"sync"
@@ -51,8 +50,6 @@ type driver struct {
 	joinOnce         sync.Once
 	localJoinOnce    sync.Once
 	keys             []*key
-	peerOpCh         chan *peerOperation
-	peerOpCancel     context.CancelFunc
 	sync.Mutex
 }
 
@@ -67,16 +64,10 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
 		peerDb: peerNetworkMap{
 			mp: map[string]*peerMap{},
 		},
-		secMap:   &encrMap{nodes: map[string][]*spi{}},
-		config:   config,
-		peerOpCh: make(chan *peerOperation),
+		secMap: &encrMap{nodes: map[string][]*spi{}},
+		config: config,
 	}
 
-	// Launch the go routine for processing peer operations
-	ctx, cancel := context.WithCancel(context.Background())
-	d.peerOpCancel = cancel
-	go d.peerOpRoutine(ctx, d.peerOpCh)
-
 	if data, ok := config[netlabel.GlobalKVClient]; ok {
 		var err error
 		dsc, ok := data.(discoverapi.DatastoreConfigData)
@@ -170,7 +161,7 @@ func (d *driver) restoreEndpoints() error {
 		}
 
 		n.incEndpointCount()
-		d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
+		d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
 	}
 	return nil
 }
@@ -179,11 +170,6 @@ func (d *driver) restoreEndpoints() error {
 func Fini(drv driverapi.Driver) {
 	d := drv.(*driver)
 
-	// Notify the peer go routine to return
-	if d.peerOpCancel != nil {
-		d.peerOpCancel()
-	}
-
 	if d.exitCh != nil {
 		waitCh := make(chan struct{})
 

+ 28 - 74
libnetwork/drivers/overlay/peerdb.go

@@ -1,14 +1,12 @@
 package overlay
 
 import (
-	"context"
 	"fmt"
 	"net"
 	"sync"
 	"syscall"
 
 	"github.com/Sirupsen/logrus"
-	"github.com/docker/libnetwork/common"
 )
 
 const ovPeerTable = "overlay_peer_table"
@@ -61,6 +59,8 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
 	return nil
 }
 
+var peerDbWg sync.WaitGroup
+
 func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
 	d.peerDb.Lock()
 	nids := []string{}
@@ -141,6 +141,8 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.
 func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, isLocal bool) {
 
+	peerDbWg.Wait()
+
 	d.peerDb.Lock()
 	pMap, ok := d.peerDb.mp[nid]
 	if !ok {
@@ -171,6 +173,7 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
 
 func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP) peerEntry {
+	peerDbWg.Wait()
 
 	d.peerDb.Lock()
 	pMap, ok := d.peerDb.mp[nid]
@@ -204,6 +207,12 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
 }
 
 func (d *driver) peerDbUpdateSandbox(nid string) {
+	// The following logic is useful only in non swarm mode
+	// In swarm mode instead the programmig will come directly from networkDB
+	if !d.isSerfAlive() {
+		return
+	}
+
 	d.peerDb.Lock()
 	pMap, ok := d.peerDb.mp[nid]
 	if !ok {
@@ -212,6 +221,9 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
 	}
 	d.peerDb.Unlock()
 
+	peerDbWg.Add(1)
+
+	var peerOps []func()
 	pMap.Lock()
 	for pKeyStr, pEntry := range pMap.mp {
 		var pKey peerKey
@@ -227,67 +239,28 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
 		// pointing to the same memory location for every iteration. Make
 		// a copy of pEntry before capturing it in the following closure.
 		entry := pEntry
+		op := func() {
+			if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
+				pKey.peerMac, entry.vtep,
+				false, false, false); err != nil {
+				logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
+					pKey.peerIP, pKey.peerMac, err)
+			}
+		}
 
-		d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false)
+		peerOps = append(peerOps, op)
 	}
 	pMap.Unlock()
-}
-
-type peerOperation struct {
-	isAdd      bool
-	networkID  string
-	endpointID string
-	peerIP     net.IP
-	peerIPMask net.IPMask
-	peerMac    net.HardwareAddr
-	vtepIP     net.IP
-	updateDB   bool
-	l2Miss     bool
-	l3Miss     bool
-	localPeer  bool
-	callerName string
-}
 
-func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
-	var err error
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case op := <-ch:
-			if op.isAdd {
-				err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer)
-			} else {
-				err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
-			}
-			if err != nil {
-				logrus.Warnf("Peer operation failed:%s op:%v", err, op)
-			}
-		}
+	for _, op := range peerOps {
+		op()
 	}
-}
 
-func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
-	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) {
-	callerName := common.CallerName(1)
-	d.peerOpCh <- &peerOperation{
-		isAdd:      true,
-		networkID:  nid,
-		endpointID: eid,
-		peerIP:     peerIP,
-		peerIPMask: peerIPMask,
-		peerMac:    peerMac,
-		vtepIP:     vtep,
-		updateDB:   updateDb,
-		l2Miss:     l2Miss,
-		l3Miss:     l3Miss,
-		localPeer:  localPeer,
-		callerName: callerName,
-	}
+	peerDbWg.Done()
 }
 
-func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
-	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error {
+func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
+	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
 
 	if err := validateID(nid, eid); err != nil {
 		return err
@@ -295,9 +268,6 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
 
 	if updateDb {
 		d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false)
-		if updateOnlyDB {
-			return nil
-		}
 	}
 
 	n := d.network(nid)
@@ -347,22 +317,6 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
 }
 
 func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
-	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) {
-	callerName := common.CallerName(1)
-	d.peerOpCh <- &peerOperation{
-		isAdd:      false,
-		networkID:  nid,
-		endpointID: eid,
-		peerIP:     peerIP,
-		peerIPMask: peerIPMask,
-		peerMac:    peerMac,
-		vtepIP:     vtep,
-		updateDB:   updateDb,
-		callerName: callerName,
-	}
-}
-
-func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
 
 	if err := validateID(nid, eid); err != nil {