Browse Source

Merge pull request #45164 from corhere/libnet/peer-op-function-call

libnetwork/d/overlay: handle peer ops directly
Brian Goff 2 years ago
parent
commit
0a334ea081
2 changed files with 21 additions and 100 deletions
  1. 3 16
      libnetwork/drivers/overlay/overlay.go
  2. 18 84
      libnetwork/drivers/overlay/peerdb.go

+ 3 - 16
libnetwork/drivers/overlay/overlay.go

@@ -6,7 +6,6 @@ package overlay
 //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
 
 import (
-	"context"
 	"fmt"
 	"net"
 	"sync"
@@ -53,8 +52,7 @@ type driver struct {
 	joinOnce         sync.Once
 	localJoinOnce    sync.Once
 	keys             []*key
-	peerOpCh         chan *peerOperation
-	peerOpCancel     context.CancelFunc
+	peerOpMu         sync.Mutex
 	sync.Mutex
 }
 
@@ -69,16 +67,10 @@ func Register(r driverapi.Registerer, config map[string]interface{}) error {
 		peerDb: peerNetworkMap{
 			mp: map[string]*peerMap{},
 		},
-		secMap:   &encrMap{nodes: map[string][]*spi{}},
-		config:   config,
-		peerOpCh: make(chan *peerOperation),
+		secMap: &encrMap{nodes: map[string][]*spi{}},
+		config: config,
 	}
 
-	// Launch the go routine for processing peer operations
-	ctx, cancel := context.WithCancel(context.Background())
-	d.peerOpCancel = cancel
-	go d.peerOpRoutine(ctx, d.peerOpCh)
-
 	if data, ok := config[netlabel.GlobalKVClient]; ok {
 		var err error
 		dsc, ok := data.(discoverapi.DatastoreConfigData)
@@ -166,11 +158,6 @@ func (d *driver) restoreEndpoints() error {
 func Fini(drv driverapi.Driver) {
 	d := drv.(*driver)
 
-	// Notify the peer go routine to return
-	if d.peerOpCancel != nil {
-		d.peerOpCancel()
-	}
-
 	if d.exitCh != nil {
 		waitCh := make(chan struct{})
 

+ 18 - 84
libnetwork/drivers/overlay/peerdb.go

@@ -4,13 +4,11 @@
 package overlay
 
 import (
-	"context"
 	"fmt"
 	"net"
 	"sync"
 	"syscall"
 
-	"github.com/docker/docker/libnetwork/internal/caller"
 	"github.com/docker/docker/libnetwork/internal/setmatrix"
 	"github.com/docker/docker/libnetwork/osl"
 	"github.com/sirupsen/logrus"
@@ -247,62 +245,10 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
 // in one single atomic operation. This is fundamental to guarantee consistency, and avoid that
 // new peerAdd or peerDelete gets reordered during the sandbox init.
 func (d *driver) initSandboxPeerDB(nid string) {
-	d.peerInit(nid)
-}
-
-type peerOperationType int32
-
-const (
-	peerOperationINIT peerOperationType = iota
-	peerOperationADD
-	peerOperationDELETE
-	peerOperationFLUSH
-)
-
-type peerOperation struct {
-	opType     peerOperationType
-	networkID  string
-	endpointID string
-	peerIP     net.IP
-	peerIPMask net.IPMask
-	peerMac    net.HardwareAddr
-	vtepIP     net.IP
-	l2Miss     bool
-	l3Miss     bool
-	localPeer  bool
-	callerName string
-}
-
-func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
-	var err error
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case op := <-ch:
-			switch op.opType {
-			case peerOperationINIT:
-				err = d.peerInitOp(op.networkID)
-			case peerOperationADD:
-				err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.l2Miss, op.l3Miss, true, op.localPeer)
-			case peerOperationDELETE:
-				err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
-			case peerOperationFLUSH:
-				err = d.peerFlushOp(op.networkID)
-			}
-			if err != nil {
-				logrus.Warnf("Peer operation failed:%s op:%v", err, op)
-			}
-		}
-	}
-}
-
-func (d *driver) peerInit(nid string) {
-	callerName := caller.Name(1)
-	d.peerOpCh <- &peerOperation{
-		opType:     peerOperationINIT,
-		networkID:  nid,
-		callerName: callerName,
+	d.peerOpMu.Lock()
+	defer d.peerOpMu.Unlock()
+	if err := d.peerInitOp(nid); err != nil {
+		logrus.WithError(err).Warn("Peer init operation failed")
 	}
 }
 
@@ -321,18 +267,11 @@ func (d *driver) peerInitOp(nid string) error {
 
 func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, l2Miss, l3Miss, localPeer bool) {
-	d.peerOpCh <- &peerOperation{
-		opType:     peerOperationADD,
-		networkID:  nid,
-		endpointID: eid,
-		peerIP:     peerIP,
-		peerIPMask: peerIPMask,
-		peerMac:    peerMac,
-		vtepIP:     vtep,
-		l2Miss:     l2Miss,
-		l3Miss:     l3Miss,
-		localPeer:  localPeer,
-		callerName: caller.Name(1),
+	d.peerOpMu.Lock()
+	defer d.peerOpMu.Unlock()
+	err := d.peerAddOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, l2Miss, l3Miss, true, localPeer)
+	if err != nil {
+		logrus.WithError(err).Warn("Peer add operation failed")
 	}
 }
 
@@ -413,16 +352,11 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
 
 func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	peerMac net.HardwareAddr, vtep net.IP, localPeer bool) {
-	d.peerOpCh <- &peerOperation{
-		opType:     peerOperationDELETE,
-		networkID:  nid,
-		endpointID: eid,
-		peerIP:     peerIP,
-		peerIPMask: peerIPMask,
-		peerMac:    peerMac,
-		vtepIP:     vtep,
-		callerName: caller.Name(1),
-		localPeer:  localPeer,
+	d.peerOpMu.Lock()
+	defer d.peerOpMu.Unlock()
+	err := d.peerDeleteOp(nid, eid, peerIP, peerIPMask, peerMac, vtep, localPeer)
+	if err != nil {
+		logrus.WithError(err).Warn("Peer delete operation failed")
 	}
 }
 
@@ -485,10 +419,10 @@ func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPM
 }
 
 func (d *driver) peerFlush(nid string) {
-	d.peerOpCh <- &peerOperation{
-		opType:     peerOperationFLUSH,
-		networkID:  nid,
-		callerName: caller.Name(1),
+	d.peerOpMu.Lock()
+	defer d.peerOpMu.Unlock()
+	if err := d.peerFlushOp(nid); err != nil {
+		logrus.WithError(err).Warn("Peer flush operation failed")
 	}
 }