libnetwork: don't embed mutex in controller
Embedded structs are part of the exported surface of a struct type. Boxing a struct value into an interface value does not erase that; any code could gain access to the embedded struct value with a simple type assertion. The mutex is supposed to be a private implementation detail, but *controller implements sync.Locker because the mutex is embedded. c, _ := libnetwork.New() c.(sync.Locker).Lock() Change the mutex to an unexported field so *controller no longer spuriously implements the sync.Locker interface. Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
parent
483b03562a
commit
ae09fe3da7
9 changed files with 128 additions and 128 deletions
|
@ -108,7 +108,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
|
|||
|
||||
// Find the deleted key. If the deleted key was the primary key,
|
||||
// a new primary key should be set before removing if from keyring.
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
added := []byte{}
|
||||
deleted := []byte{}
|
||||
j := len(c.keys)
|
||||
|
@ -157,7 +157,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
if len(added) > 0 {
|
||||
a.networkDB.SetKey(added)
|
||||
|
@ -249,8 +249,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
|
|||
// For a given subsystem getKeys sorts the keys by lamport time and returns
|
||||
// slice of keys and lamport time which can used as a unique tag for the keys
|
||||
func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
sort.Sort(ByTime(c.keys))
|
||||
|
||||
|
@ -271,8 +271,8 @@ func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
|
|||
// getPrimaryKeyTag returns the primary key for a given subsystem from the
|
||||
// list of sorted key and the associated tag
|
||||
func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
sort.Sort(ByTime(c.keys))
|
||||
keys := []*types.EncryptionKey{}
|
||||
for _, key := range c.keys {
|
||||
|
@ -316,7 +316,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
|
||||
cancelList = append(cancelList, cancel)
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.agent = &agent{
|
||||
networkDB: nDB,
|
||||
bindAddr: bindAddr,
|
||||
|
@ -325,7 +325,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
|||
coreCancelFuncs: cancelList,
|
||||
driverCancelFuncs: make(map[string][]func()),
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
go c.handleTableEvents(ch, c.handleEpTableEvent)
|
||||
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
|
||||
|
@ -383,10 +383,10 @@ func (c *controller) agentDriverNotify(d driverapi.Driver) {
|
|||
func (c *controller) agentClose() {
|
||||
// Acquire current agent instance and reset its pointer
|
||||
// then run closing functions
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
agent := c.agent
|
||||
c.agent = nil
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
// when the agent is closed the cluster provider should be cleaned up
|
||||
c.SetClusterProvider(nil)
|
||||
|
|
|
@ -174,7 +174,7 @@ type controller struct {
|
|||
agentStopDone chan struct{}
|
||||
keys []*types.EncryptionKey
|
||||
DiagnosticServer *diagnostic.Server
|
||||
sync.Mutex
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type initializer struct {
|
||||
|
@ -247,7 +247,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
|
||||
func (c *controller) SetClusterProvider(provider cluster.Provider) {
|
||||
var sameProvider bool
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
// Avoids to spawn multiple goroutine for the same cluster provider
|
||||
if c.cfg.ClusterProvider == provider {
|
||||
// If the cluster provider is already set, there is already a go routine spawned
|
||||
|
@ -256,7 +256,7 @@ func (c *controller) SetClusterProvider(provider cluster.Provider) {
|
|||
} else {
|
||||
c.cfg.ClusterProvider = provider
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
if provider == nil || sameProvider {
|
||||
return
|
||||
|
@ -284,17 +284,17 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
|
|||
}
|
||||
|
||||
if c.getAgent() == nil {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.keys = keys
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
return c.handleKeyChange(keys)
|
||||
}
|
||||
|
||||
func (c *controller) getAgent() *agent {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.agent
|
||||
}
|
||||
|
||||
|
@ -309,9 +309,9 @@ func (c *controller) clusterAgentInit() {
|
|||
case cluster.EventNetworkKeysAvailable:
|
||||
// Validates that the keys are actually available before starting the initialization
|
||||
// This will handle old spurious messages left on the channel
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
keysAvailable = c.keys != nil
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
fallthrough
|
||||
case cluster.EventSocketChange, cluster.EventNodeReady:
|
||||
if keysAvailable && !c.isDistributedControl() {
|
||||
|
@ -324,9 +324,9 @@ func (c *controller) clusterAgentInit() {
|
|||
}
|
||||
case cluster.EventNodeLeave:
|
||||
c.agentOperationStart()
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.keys = nil
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
// We are leaving the cluster. Make sure we
|
||||
// close the gossip so that we stop all
|
||||
|
@ -348,9 +348,9 @@ func (c *controller) clusterAgentInit() {
|
|||
|
||||
// AgentInitWait waits for agent initialization to be completed in the controller.
|
||||
func (c *controller) AgentInitWait() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
agentInitDone := c.agentInitDone
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
if agentInitDone != nil {
|
||||
<-agentInitDone
|
||||
|
@ -359,9 +359,9 @@ func (c *controller) AgentInitWait() {
|
|||
|
||||
// AgentStopWait waits for the Agent stop to be completed in the controller
|
||||
func (c *controller) AgentStopWait() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
agentStopDone := c.agentStopDone
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
if agentStopDone != nil {
|
||||
<-agentStopDone
|
||||
}
|
||||
|
@ -369,34 +369,34 @@ func (c *controller) AgentStopWait() {
|
|||
|
||||
// agentOperationStart marks the start of an Agent Init or Agent Stop
|
||||
func (c *controller) agentOperationStart() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if c.agentInitDone == nil {
|
||||
c.agentInitDone = make(chan struct{})
|
||||
}
|
||||
if c.agentStopDone == nil {
|
||||
c.agentStopDone = make(chan struct{})
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// agentInitComplete notifies the successful completion of the Agent initialization
|
||||
func (c *controller) agentInitComplete() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if c.agentInitDone != nil {
|
||||
close(c.agentInitDone)
|
||||
c.agentInitDone = nil
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// agentStopComplete notifies the successful completion of the Agent stop
|
||||
func (c *controller) agentStopComplete() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if c.agentStopDone != nil {
|
||||
close(c.agentStopDone)
|
||||
c.agentStopDone = nil
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
|
||||
|
@ -469,9 +469,9 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.cfg = cfg
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
var dsConfig *discoverapi.DatastoreConfigData
|
||||
for scope, sCfg := range cfg.Scopes {
|
||||
|
@ -567,8 +567,8 @@ func (c *controller) pushNodeDiscovery(d driverapi.Driver, cap driverapi.Capabil
|
|||
}
|
||||
|
||||
func (c *controller) Config() config.Config {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.cfg == nil {
|
||||
return config.Config{}
|
||||
}
|
||||
|
@ -576,8 +576,8 @@ func (c *controller) Config() config.Config {
|
|||
}
|
||||
|
||||
func (c *controller) isManager() bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.cfg == nil || c.cfg.ClusterProvider == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -585,8 +585,8 @@ func (c *controller) isManager() bool {
|
|||
}
|
||||
|
||||
func (c *controller) isAgent() bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.cfg == nil || c.cfg.ClusterProvider == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -811,9 +811,9 @@ addToStore:
|
|||
}
|
||||
|
||||
if !c.isDistributedControl() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
arrangeIngressFilterRule()
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
arrangeUserFilterRule()
|
||||
|
||||
|
@ -985,13 +985,13 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
}
|
||||
|
||||
var sb *sandbox
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
for _, s := range c.sandboxes {
|
||||
if s.containerID == containerID {
|
||||
// If not a stub, then we already have a complete sandbox.
|
||||
if !s.isStub {
|
||||
sbID := s.ID()
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
return nil, types.ForbiddenErrorf("container %s is already present in sandbox %s", containerID, sbID)
|
||||
}
|
||||
|
||||
|
@ -1004,7 +1004,7 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
break
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
sandboxID := stringid.GenerateRandomID()
|
||||
if runtime.GOOS == "windows" {
|
||||
|
@ -1027,9 +1027,9 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
|
||||
sb.processOptions(options...)
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if sb.ingress && c.ingressSandbox != nil {
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
return nil, types.ForbiddenErrorf("ingress sandbox already present")
|
||||
}
|
||||
|
||||
|
@ -1041,16 +1041,16 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
} else if sb.loadBalancerNID != "" {
|
||||
sb.id = "lb_" + sb.loadBalancerNID
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if sb.ingress {
|
||||
c.ingressSandbox = nil
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1090,14 +1090,14 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
sb.osSbox.ApplyOSTweaks(sb.oslTypes)
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.sandboxes[sb.id] = sb
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
delete(c.sandboxes, sb.id)
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -1110,8 +1110,8 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
|
|||
}
|
||||
|
||||
func (c *controller) Sandboxes() []Sandbox {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
list := make([]Sandbox, 0, len(c.sandboxes))
|
||||
for _, s := range c.sandboxes {
|
||||
|
@ -1138,9 +1138,9 @@ func (c *controller) SandboxByID(id string) (Sandbox, error) {
|
|||
if id == "" {
|
||||
return nil, ErrInvalidID(id)
|
||||
}
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
s, ok := c.sandboxes[id]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
return nil, types.NotFoundErrorf("sandbox %s not found", id)
|
||||
}
|
||||
|
@ -1150,14 +1150,14 @@ func (c *controller) SandboxByID(id string) (Sandbox, error) {
|
|||
// SandboxDestroy destroys a sandbox given a container ID
|
||||
func (c *controller) SandboxDestroy(id string) error {
|
||||
var sb *sandbox
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
for _, s := range c.sandboxes {
|
||||
if s.containerID == id {
|
||||
sb = s
|
||||
break
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
// It is not an error if sandbox is not available
|
||||
if sb == nil {
|
||||
|
@ -1253,32 +1253,32 @@ func (c *controller) Stop() {
|
|||
|
||||
// StartDiagnostic start the network dias mode
|
||||
func (c *controller) StartDiagnostic(port int) {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if !c.DiagnosticServer.IsDiagnosticEnabled() {
|
||||
c.DiagnosticServer.EnableDiagnostic("127.0.0.1", port)
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// StopDiagnostic start the network dias mode
|
||||
func (c *controller) StopDiagnostic() {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if c.DiagnosticServer.IsDiagnosticEnabled() {
|
||||
c.DiagnosticServer.DisableDiagnostic()
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// IsDiagnosticEnabled returns true if the dias is enabled
|
||||
func (c *controller) IsDiagnosticEnabled() bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.DiagnosticServer.IsDiagnosticEnabled()
|
||||
}
|
||||
|
||||
func (c *controller) iptablesEnabled() bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.cfg == nil {
|
||||
return false
|
||||
|
|
|
@ -616,9 +616,9 @@ func (ep *endpoint) rename(name string) error {
|
|||
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
|
||||
}
|
||||
} else {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
netWatch, ok = c.nmap[n.ID()]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("watch null for network %q", n.Name())
|
||||
}
|
||||
|
@ -898,9 +898,9 @@ func (ep *endpoint) getSandbox() (*sandbox, bool) {
|
|||
sid := ep.sandboxID
|
||||
ep.Unlock()
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
ps, ok := c.sandboxes[sid]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
return ps, ok
|
||||
}
|
||||
|
@ -1049,9 +1049,9 @@ func JoinOptionPriority(prio int) EndpointOption {
|
|||
return func(ep *endpoint) {
|
||||
// ep lock already acquired
|
||||
c := ep.network.getController()
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
sb, ok := c.sandboxes[ep.sandboxID]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
logrus.Errorf("Could not set endpoint priority value during Join to endpoint %s: No sandbox id present in endpoint", ep.id)
|
||||
return
|
||||
|
|
|
@ -1413,8 +1413,8 @@ func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP
|
|||
logrus.Debugf("%s (%.7s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, networkID, name, epIP, epIPv6, ipMapUpdate, method, serviceID)
|
||||
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
if !ok {
|
||||
|
@ -1449,8 +1449,8 @@ func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epI
|
|||
logrus.Debugf("%s (%.7s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, networkID, name, epIP, epIPv6, ipMapUpdate, method, serviceID)
|
||||
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
if !ok {
|
||||
|
@ -1484,8 +1484,8 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
|
|||
|
||||
epName := ep.Name()
|
||||
|
||||
n.ctrlr.Lock()
|
||||
defer n.ctrlr.Unlock()
|
||||
n.ctrlr.mu.Lock()
|
||||
defer n.ctrlr.mu.Unlock()
|
||||
sr, ok := n.ctrlr.svcRecords[n.id]
|
||||
if !ok || sr.svcMap == nil {
|
||||
return nil
|
||||
|
@ -1980,8 +1980,8 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
|
|||
|
||||
c := n.getController()
|
||||
networkID := n.ID()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
|
||||
if !ok {
|
||||
|
@ -2022,8 +2022,8 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
|
|||
func (n *network) HandleQueryResp(name string, ip net.IP) {
|
||||
networkID := n.ID()
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
|
||||
if !ok {
|
||||
|
@ -2042,8 +2042,8 @@ func (n *network) HandleQueryResp(name string, ip net.IP) {
|
|||
func (n *network) ResolveIP(ip string) string {
|
||||
networkID := n.ID()
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
|
||||
if !ok {
|
||||
|
@ -2096,8 +2096,8 @@ func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) {
|
|||
svcName := strings.Join(parts[2:], ".")
|
||||
|
||||
networkID := n.ID()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
sr, ok := c.svcRecords[networkID]
|
||||
|
||||
if !ok {
|
||||
|
|
|
@ -253,12 +253,12 @@ func (sb *sandbox) delete(force bool) error {
|
|||
logrus.Warnf("Failed to delete sandbox %s from store: %v", sb.ID(), err)
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if sb.ingress {
|
||||
c.ingressSandbox = nil
|
||||
}
|
||||
delete(c.sandboxes, sb.ID())
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -131,9 +131,9 @@ func (c *controller) startExternalKeyListener() error {
|
|||
l.Close()
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.extKeyListener = l
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
go c.acceptClientConnections(uds, l)
|
||||
return nil
|
||||
|
|
|
@ -248,9 +248,9 @@ func (c *controller) sandboxCleanup(activeSandboxes map[string]interface{}) {
|
|||
continue
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.sandboxes[sb.id] = sb
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, eps := range sbs.Eps {
|
||||
n, err := c.getNetworkFromStore(eps.Nid)
|
||||
|
|
|
@ -153,9 +153,9 @@ func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int
|
|||
id: sid,
|
||||
ports: portConfigs(ingressPorts).String(),
|
||||
}
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
s, ok := c.serviceBindings[skey]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
if !ok {
|
||||
return 0
|
||||
|
@ -170,8 +170,8 @@ func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int
|
|||
|
||||
// cleanupServiceDiscovery when the network is being deleted, erase all the associated service discovery records
|
||||
func (c *controller) cleanupServiceDiscovery(cleanupNID string) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if cleanupNID == "" {
|
||||
logrus.Debugf("cleanupServiceDiscovery for all networks")
|
||||
c.svcRecords = make(map[string]svcInfo)
|
||||
|
@ -185,12 +185,12 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
|
|||
var cleanupFuncs []func()
|
||||
|
||||
logrus.Debugf("cleanupServiceBindings for %s", cleanupNID)
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
services := make([]*service, 0, len(c.serviceBindings))
|
||||
for _, s := range c.serviceBindings {
|
||||
services = append(services, s)
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, s := range services {
|
||||
s.Lock()
|
||||
|
@ -248,7 +248,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
|
|||
|
||||
var s *service
|
||||
for {
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
var ok bool
|
||||
s, ok = c.serviceBindings[skey]
|
||||
if !ok {
|
||||
|
@ -257,7 +257,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
|
|||
s = newService(svcName, svcID, ingressPorts, serviceAliases)
|
||||
c.serviceBindings[skey] = s
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
s.Lock()
|
||||
if !s.deleted {
|
||||
// ok the object is good to be used
|
||||
|
@ -321,9 +321,9 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
|
|||
ports: portConfigs(ingressPorts).String(),
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
s, ok := c.serviceBindings[skey]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
|
||||
return nil
|
||||
|
@ -398,14 +398,14 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
|
|||
if len(s.loadBalancers) == 0 {
|
||||
// All loadbalancers for the service removed. Time to
|
||||
// remove the service itself.
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
|
||||
// Mark the object as deleted so that the add won't use it wrongly
|
||||
s.deleted = true
|
||||
// NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service
|
||||
// that is getting deleted and a new service that will be created if the entry is not anymore there
|
||||
delete(c.serviceBindings, skey)
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
|
||||
|
|
|
@ -18,9 +18,9 @@ func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) err
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
c.stores = append(c.stores, store)
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -28,14 +28,14 @@ func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) err
|
|||
func (c *controller) initStores() error {
|
||||
registerKVStores()
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if c.cfg == nil {
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
scopeConfigs := c.cfg.Scopes
|
||||
c.stores = nil
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
for scope, scfg := range scopeConfigs {
|
||||
if err := c.initScopedStore(scope, scfg); err != nil {
|
||||
|
@ -54,8 +54,8 @@ func (c *controller) closeStores() {
|
|||
}
|
||||
|
||||
func (c *controller) getStore(scope string) datastore.DataStore {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for _, store := range c.stores {
|
||||
if store.Scope() == scope {
|
||||
|
@ -67,8 +67,8 @@ func (c *controller) getStore(scope string) datastore.DataStore {
|
|||
}
|
||||
|
||||
func (c *controller) getStores() []datastore.DataStore {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
return c.stores
|
||||
}
|
||||
|
@ -244,8 +244,8 @@ type netWatch struct {
|
|||
}
|
||||
|
||||
func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
var epl []*endpoint
|
||||
for _, ep := range nw.localEps {
|
||||
|
@ -276,7 +276,7 @@ func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan da
|
|||
break
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
var addEp []*endpoint
|
||||
|
||||
delEpMap := make(map[string]*endpoint)
|
||||
|
@ -315,7 +315,7 @@ func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan da
|
|||
delete(nw.remoteEps, lEp.ID())
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
for _, lEp := range delEpMap {
|
||||
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
|
||||
|
@ -336,22 +336,22 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
|
|||
networkID := n.ID()
|
||||
endpointID := ep.ID()
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
nw, ok := nmap[networkID]
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
// Update the svc db for the local endpoint join right away
|
||||
n.updateSvcRecord(ep, c.getLocalEps(nw), true)
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
nw.localEps[endpointID] = ep
|
||||
|
||||
// If we had learned that from the kv store remove it
|
||||
// from remote ep list now that we know that this is
|
||||
// indeed a local endpoint
|
||||
delete(nw.remoteEps, endpointID)
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -365,11 +365,11 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
|
|||
// try to update this ep's container's svc records
|
||||
n.updateSvcRecord(ep, c.getLocalEps(nw), true)
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
nw.localEps[endpointID] = ep
|
||||
nmap[networkID] = nw
|
||||
nw.stopCh = make(chan struct{})
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
store := c.getStore(n.DataScope())
|
||||
if store == nil {
|
||||
|
@ -398,19 +398,19 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
|
|||
networkID := n.ID()
|
||||
endpointID := ep.ID()
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
nw, ok := nmap[networkID]
|
||||
|
||||
if ok {
|
||||
delete(nw.localEps, endpointID)
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Update the svc db about local endpoint leave right away
|
||||
// Do this after we remove this ep from localEps so that we
|
||||
// don't try to remove this svc record from this ep's container.
|
||||
n.updateSvcRecord(ep, c.getLocalEps(nw), false)
|
||||
|
||||
c.Lock()
|
||||
c.mu.Lock()
|
||||
if len(nw.localEps) == 0 {
|
||||
close(nw.stopCh)
|
||||
|
||||
|
@ -421,7 +421,7 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
|
|||
delete(nmap, networkID)
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *controller) watchLoop() {
|
||||
|
|
Loading…
Add table
Reference in a new issue