diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 2b07705627..e21e7d05c6 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -355,24 +355,30 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } }() - // addNetwork can be called for local scope network lazily when - // an endpoint is created after a restart and the network was - // created in previous life. Make sure you wrap around the driver - // notification of network creation in once call so that the driver - // invoked only once in case both the network and endpoint creation - // happens in the same lifetime. - network.drvOnce.Do(func() { - err = c.addNetwork(network) - }) - if err != nil { + if err := c.addNetwork(network); err != nil { return nil, err } + defer func() { + if err != nil { + if e := network.deleteNetwork(); e != nil { + log.Warnf("couldn't roll back driver network on network %s creation failure: %v", network.name, err) + } + } + }() if err = c.updateToStore(network); err != nil { - log.Warnf("couldnt create network %s: %v", network.name, err) - if e := network.Delete(); e != nil { - log.Warnf("couldnt cleanup network %s on network create failure (%v): %v", network.name, err, e) + return nil, err + } + defer func() { + if err != nil { + if e := c.deleteFromStore(network); e != nil { + log.Warnf("couldnt rollback from store, network %s on failure (%v): %v", network.name, err, e) + } } + }() + + network.epCnt = &endpointCnt{n: network} + if err = c.updateToStore(network.epCnt); err != nil { return nil, err } diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index bebb29dfbb..027fe48df3 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -5,6 +5,7 @@ import ( "log" "reflect" "strings" + "sync" "github.com/docker/libkv" "github.com/docker/libkv/store" @@ -55,6 +56,7 @@ type datastore struct { scope string store store.Store cache *cache + sync.Mutex } // KVObject is Key/Value interface used by objects to be part of the DataStore @@ -287,6 +289,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error { pair *store.KVPair err error ) + ds.Lock() + defer ds.Unlock() if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") @@ -325,6 +329,9 @@ add_cache: // PutObject adds a new Record based on an object into the datastore func (ds *datastore) PutObject(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } @@ -356,6 +363,9 @@ func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error { // GetObject returns a record matching the key func (ds *datastore) GetObject(key string, o KVObject) error { + ds.Lock() + defer ds.Unlock() + if ds.cache != nil { return ds.cache.get(key, o) } @@ -387,6 +397,9 @@ func (ds *datastore) ensureKey(key string) error { } func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { + ds.Lock() + defer ds.Unlock() + if ds.cache != nil { return ds.cache.list(kvObject) } @@ -430,6 +443,9 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { // DeleteObject unconditionally deletes a record from the store func (ds *datastore) DeleteObject(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + // cleaup the cache first if ds.cache != nil { ds.cache.del(kvObject) @@ -444,6 +460,9 @@ func (ds *datastore) DeleteObject(kvObject KVObject) error { // DeleteObjectAtomic performs atomic delete on a record func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } @@ -469,6 +488,9 @@ del_cache: // DeleteTree unconditionally deletes a record from the store func (ds *datastore) DeleteTree(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + // cleaup the cache first if ds.cache != nil { ds.cache.del(kvObject) diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 1dbfe44a31..40459369f7 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -484,12 +484,12 @@ func (ep *endpoint) Delete() error { } ep.Unlock() - if err = n.DecEndpointCnt(); err != nil { + if err = n.getEpCnt().DecEndpointCnt(); err != nil { return err } defer func() { if err != nil { - if e := n.IncEndpointCnt(); e != nil { + if e := n.getEpCnt().IncEndpointCnt(); e != nil { log.Warnf("failed to update network %s : %v", n.name, e) } } diff --git a/libnetwork/endpoint_cnt.go b/libnetwork/endpoint_cnt.go new file mode 100644 index 0000000000..550a2a3cfd --- /dev/null +++ b/libnetwork/endpoint_cnt.go @@ -0,0 +1,147 @@ +package libnetwork + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/docker/libnetwork/datastore" +) + +type endpointCnt struct { + n *network + Count uint64 + dbIndex uint64 + dbExists bool + sync.Mutex +} + +const epCntKeyPrefix = "endpoint_count" + +func (ec *endpointCnt) Key() []string { + ec.Lock() + defer ec.Unlock() + + return []string{epCntKeyPrefix, ec.n.id} +} + +func (ec *endpointCnt) KeyPrefix() []string { + ec.Lock() + defer ec.Unlock() + + return []string{epCntKeyPrefix, ec.n.id} +} + +func (ec *endpointCnt) Value() []byte { + ec.Lock() + defer ec.Unlock() + + b, err := json.Marshal(ec) + if err != nil { + return nil + } + return b +} + +func (ec *endpointCnt) SetValue(value []byte) error { + ec.Lock() + defer ec.Unlock() + + return json.Unmarshal(value, &ec) +} + +func (ec *endpointCnt) Index() uint64 { + ec.Lock() + defer ec.Unlock() + return ec.dbIndex +} + +func (ec *endpointCnt) SetIndex(index uint64) { + ec.Lock() + ec.dbIndex = index + ec.dbExists = true + ec.Unlock() +} + +func (ec *endpointCnt) Exists() bool { + ec.Lock() + defer ec.Unlock() + return ec.dbExists +} + +func (ec *endpointCnt) Skip() bool { + ec.Lock() + defer ec.Unlock() + return !ec.n.persist +} + +func (ec *endpointCnt) New() datastore.KVObject { + ec.Lock() + defer ec.Unlock() + + return &endpointCnt{ + n: ec.n, + } +} + +func (ec *endpointCnt) CopyTo(o datastore.KVObject) error { + ec.Lock() + defer ec.Unlock() + + dstEc := o.(*endpointCnt) + dstEc.n = ec.n + dstEc.Count = ec.Count + dstEc.dbExists = ec.dbExists + dstEc.dbIndex = ec.dbIndex + + return nil +} + +func (ec *endpointCnt) DataScope() string { + return ec.n.DataScope() +} + +func (ec *endpointCnt) EndpointCnt() uint64 { + ec.Lock() + defer ec.Unlock() + + return ec.Count +} + +func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error { +retry: + ec.Lock() + if inc { + ec.Count++ + } else { + ec.Count-- + } + ec.Unlock() + + store := ec.n.getController().getStore(ec.DataScope()) + if store == nil { + return fmt.Errorf("store not found for scope %s", ec.DataScope()) + } + + if err := ec.n.getController().updateToStore(ec); err != nil { + if err == datastore.ErrKeyModified { + if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil { + return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err) + } + + goto retry + } + + return err + } + + return nil +} + +func (ec *endpointCnt) IncEndpointCnt() error { + return ec.atomicIncDecEpCnt(true) +} + +func (ec *endpointCnt) DecEndpointCnt() error { + return ec.atomicIncDecEpCnt(false) +} diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index 909b809552..2847a5b3fc 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -39,7 +39,6 @@ func TestNetworkMarshalling(t *testing.T) { ipamType: "default", addrSpace: "viola", networkType: "bridge", - endpointCnt: 27, enableIPv6: true, persist: true, ipamV4Config: []*IpamConf{ @@ -142,7 +141,7 @@ func TestNetworkMarshalling(t *testing.T) { } if n.name != nn.name || n.id != nn.id || n.networkType != nn.networkType || n.ipamType != nn.ipamType || - n.addrSpace != nn.addrSpace || n.endpointCnt != nn.endpointCnt || n.enableIPv6 != nn.enableIPv6 || + n.addrSpace != nn.addrSpace || n.enableIPv6 != nn.enableIPv6 || n.persist != nn.persist || !compareIpamConfList(n.ipamV4Config, nn.ipamV4Config) || !compareIpamInfoList(n.ipamV4Info, nn.ipamV4Info) || !compareIpamConfList(n.ipamV6Config, nn.ipamV6Config) || !compareIpamInfoList(n.ipamV6Info, nn.ipamV6Info) { diff --git a/libnetwork/network.go b/libnetwork/network.go index bd545c3667..6e90dde68e 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -152,7 +152,7 @@ type network struct { ipamV4Info []*IpamInfo ipamV6Info []*IpamInfo enableIPv6 bool - endpointCnt uint64 + epCnt *endpointCnt generic options.Generic dbIndex uint64 svcRecords svcMap @@ -296,7 +296,6 @@ func (n *network) CopyTo(o datastore.KVObject) error { dstN.id = n.id dstN.networkType = n.networkType dstN.ipamType = n.ipamType - dstN.endpointCnt = n.endpointCnt dstN.enableIPv6 = n.enableIPv6 dstN.persist = n.persist dstN.dbIndex = n.dbIndex @@ -339,48 +338,11 @@ func (n *network) DataScope() string { return n.driverScope() } -func (n *network) EndpointCnt() uint64 { +func (n *network) getEpCnt() *endpointCnt { n.Lock() defer n.Unlock() - return n.endpointCnt -} -func (n *network) atomicIncDecEpCnt(inc bool) error { -retry: - n.Lock() - if inc { - n.endpointCnt++ - } else { - n.endpointCnt-- - } - n.Unlock() - - store := n.getController().getStore(n.DataScope()) - if store == nil { - return fmt.Errorf("store not found for scope %s", n.DataScope()) - } - - if err := n.getController().updateToStore(n); err != nil { - if err == datastore.ErrKeyModified { - if err := store.GetObject(datastore.Key(n.Key()...), n); err != nil { - return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err) - } - - goto retry - } - - return err - } - - return nil -} - -func (n *network) IncEndpointCnt() error { - return n.atomicIncDecEpCnt(true) -} - -func (n *network) DecEndpointCnt() error { - return n.atomicIncDecEpCnt(false) + return n.epCnt } // TODO : Can be made much more generic with the help of reflection (but has some golang limitations) @@ -391,7 +353,6 @@ func (n *network) MarshalJSON() ([]byte, error) { netMap["networkType"] = n.networkType netMap["ipamType"] = n.ipamType netMap["addrSpace"] = n.addrSpace - netMap["endpointCnt"] = n.endpointCnt netMap["enableIPv6"] = n.enableIPv6 if n.generic != nil { netMap["generic"] = n.generic @@ -437,7 +398,6 @@ func (n *network) UnmarshalJSON(b []byte) (err error) { n.name = netMap["name"].(string) n.id = netMap["id"].(string) n.networkType = netMap["networkType"].(string) - n.endpointCnt = uint64(netMap["endpointCnt"].(float64)) n.enableIPv6 = netMap["enableIPv6"].(bool) if v, ok := netMap["generic"]; ok { @@ -604,7 +564,7 @@ func (n *network) Delete() error { return &UnknownNetworkError{name: name, id: id} } - numEps := n.EndpointCnt() + numEps := n.getEpCnt().EndpointCnt() if numEps != 0 { return &ActiveEndpointsError{name: n.name, id: n.id} } @@ -622,23 +582,14 @@ func (n *network) Delete() error { }() // deleteFromStore performs an atomic delete operation and the - // network.endpointCnt field will help prevent any possible + // network.epCnt will help prevent any possible // race between endpoint join and network delete - if err = n.getController().deleteFromStore(n); err != nil { - if err == datastore.ErrKeyModified { - return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") - } - return err + if err = n.getController().deleteFromStore(n.getEpCnt()); err != nil { + return fmt.Errorf("error deleting network endpoint count from store: %v", err) + } + if err = n.getController().deleteFromStore(n); err != nil { + return fmt.Errorf("error deleting network from store: %v", err) } - - defer func() { - if err != nil { - n.dbExists = false - if e := n.getController().updateToStore(n); e != nil { - log.Warnf("failed to recreate network in store %s : %v", n.name, e) - } - } - }() n.ipamRelease() @@ -736,7 +687,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi }() // Increment endpoint count to indicate completion of endpoint addition - if err = n.IncEndpointCnt(); err != nil { + if err = n.getEpCnt().IncEndpointCnt(); err != nil { return nil, err } diff --git a/libnetwork/store.go b/libnetwork/store.go index c308423839..d5eca874a4 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -69,6 +69,13 @@ func (c *controller) getNetworkFromStore(nid string) (*network, error) { continue } + ec := &endpointCnt{n: n} + err = store.GetObject(datastore.Key(ec.Key()...), ec) + if err != nil { + return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err) + } + + n.epCnt = ec return n, nil } @@ -94,6 +101,14 @@ func (c *controller) getNetworksFromStore() ([]*network, error) { for _, kvo := range kvol { n := kvo.(*network) n.ctrlr = c + + ec := &endpointCnt{n: n} + err = store.GetObject(datastore.Key(ec.Key()...), ec) + if err != nil { + return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) + } + + n.epCnt = ec nl = append(nl, n) } } @@ -211,15 +226,15 @@ func (c *controller) unWatchSvcRecord(ep *endpoint) { c.unWatchCh <- ep } -func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) { +func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) { for { select { case <-nw.stopCh: return - case o := <-nCh: - n := o.(*network) + case o := <-ecCh: + ec := o.(*endpointCnt) - epl, err := n.getEndpointsFromStore() + epl, err := ec.n.getEndpointsFromStore() if err != nil { break } @@ -300,7 +315,7 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi return } - ch, err := store.Watch(ep.getNetwork(), nw.stopCh) + ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh) if err != nil { log.Warnf("Error creating watch for network: %v", err) return