Reworked endpoint store operation to address a few cases

* Removed network from being marshalled (it is part of the key anyways)
* Reworked the watch function to handle container-id on endpoints
* Included ContainerInfo to be marshalled which needs to be synchronized
* Resolved multiple race issues by introducing data locks

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2015-06-05 13:31:12 -07:00
parent 47a3f3690d
commit f88824fb8a
11 changed files with 427 additions and 130 deletions

View file

@ -12,6 +12,7 @@ import (
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/reexec"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/term"
@ -31,6 +32,10 @@ var (
)
func main() {
if reexec.Init() {
return
}
_, stdout, stderr := term.StdStreams()
logrus.SetOutput(stderr)

View file

@ -102,7 +102,6 @@ type controller struct {
sandboxes sandboxTable
cfg *config.Config
store datastore.DataStore
stopChan chan struct{}
sync.Mutex
}
@ -132,7 +131,6 @@ func New(configFile string) (NetworkController, error) {
// But without that, datastore cannot be initialized.
log.Debugf("Unable to Parse LibNetwork Config file : %v", err)
}
c.stopChan = make(chan struct{})
return c, nil
}
@ -226,7 +224,10 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
return nil, err
}
if err := c.addNetworkToStore(network); err != nil {
if err := c.updateNetworkToStore(network); err != nil {
if e := network.Delete(); e != nil {
log.Warnf("couldnt cleanup network %s: %v", network.name, err)
}
return nil, err
}

View file

@ -2,10 +2,11 @@ package datastore
import (
"encoding/json"
"errors"
"reflect"
"strings"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/types"
"github.com/docker/swarm/pkg/store"
)
@ -52,17 +53,25 @@ const (
EndpointKeyPrefix = "endpoint"
)
var rootChain = []string{"docker", "libnetwork"}
//Key provides convenient method to create a Key
func Key(key ...string) string {
keychain := []string{"docker", "libnetwork"}
keychain = append(keychain, key...)
keychain := append(rootChain, key...)
str := strings.Join(keychain, "/")
return str + "/"
}
var errNewDatastore = errors.New("Error creating new Datastore")
var errInvalidConfiguration = errors.New("Invalid Configuration passed to Datastore")
var errInvalidAtomicRequest = errors.New("Invalid Atomic Request")
//ParseKey provides convenient method to unpack the key to complement the Key function
func ParseKey(key string) ([]string, error) {
chain := strings.Split(strings.Trim(key, "/"), "/")
// The key must atleast be equal to the rootChain in order to be considered as valid
if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
return nil, types.BadRequestErrorf("invalid Key : %s", key)
}
return chain[len(rootChain):], nil
}
// newClient used to connect to KV Store
func newClient(kv string, addrs string) (DataStore, error) {
@ -77,7 +86,7 @@ func newClient(kv string, addrs string) (DataStore, error) {
// NewDataStore creates a new instance of LibKV data store
func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) {
if cfg == nil {
return nil, errInvalidConfiguration
return nil, types.BadRequestErrorf("invalid configuration passed to datastore")
}
// TODO : cfg.Embedded case
return newClient(cfg.Client.Provider, cfg.Client.Address)
@ -95,12 +104,12 @@ func (ds *datastore) KVStore() store.Store {
// PutObjectAtomic adds a new Record based on an object into the datastore
func (ds *datastore) PutObjectAtomic(kvObject KV) error {
if kvObject == nil {
return errors.New("kvObject is nil")
return types.BadRequestErrorf("invalid KV Object : nil")
}
kvObjValue := kvObject.Value()
if kvObjValue == nil {
return errInvalidAtomicRequest
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
@ -116,7 +125,7 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error {
// PutObject adds a new Record based on an object into the datastore
func (ds *datastore) PutObject(kvObject KV) error {
if kvObject == nil {
return errors.New("kvObject is nil")
return types.BadRequestErrorf("invalid KV Object : nil")
}
return ds.putObjectWithKey(kvObject, kvObject.Key()...)
}
@ -125,7 +134,7 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
kvObjValue := kvObject.Value()
if kvObjValue == nil {
return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...))
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
return ds.store.Put(Key(key...), kvObjValue, nil)
}
@ -147,16 +156,12 @@ func (ds *datastore) DeleteObject(kvObject KV) error {
// DeleteObjectAtomic performs atomic delete on a record
func (ds *datastore) DeleteObjectAtomic(kvObject KV) error {
if kvObject == nil {
return errors.New("kvObject is nil")
return types.BadRequestErrorf("invalid KV Object : nil")
}
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
_, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous)
if err != nil {
return err
}
return nil
return err
}
// DeleteTree unconditionally deletes a record from the store

View file

@ -2,6 +2,7 @@ package datastore
import (
"encoding/json"
"reflect"
"testing"
"github.com/docker/libnetwork/config"
@ -16,6 +17,25 @@ func NewTestDataStore() DataStore {
return &datastore{store: NewMockStore()}
}
func TestKey(t *testing.T) {
eKey := []string{"hello", "world"}
sKey := Key(eKey...)
if sKey != "docker/libnetwork/hello/world/" {
t.Fatalf("unexpected key : %s", sKey)
}
}
func TestParseKey(t *testing.T) {
keySlice, err := ParseKey("/docker/libnetwork/hello/world/")
if err != nil {
t.Fatal(err)
}
eKey := []string{"hello", "world"}
if len(keySlice) < 2 || !reflect.DeepEqual(eKey, keySlice) {
t.Fatalf("unexpected unkey : %s", keySlice)
}
}
func TestInvalidDataStore(t *testing.T) {
config := &config.DatastoreCfg{}
config.Embedded = false
@ -94,6 +114,11 @@ type dummyObject struct {
func (n *dummyObject) Key() []string {
return []string{dummyKey, n.ID}
}
func (n *dummyObject) KeyPrefix() []string {
return []string{dummyKey}
}
func (n *dummyObject) Value() []byte {
if !n.ReturnValue {
return nil

View file

@ -3,6 +3,7 @@ package datastore
import (
"errors"
"github.com/docker/libnetwork/types"
"github.com/docker/swarm/pkg/store"
)
@ -69,7 +70,8 @@ func (s *MockStore) List(prefix string) ([]*store.KVPair, error) {
// DeleteTree deletes a range of values at "directory"
func (s *MockStore) DeleteTree(prefix string) error {
return ErrNotImplmented
delete(s.db, prefix)
return nil
}
// Watch a single key for modifications
@ -92,7 +94,7 @@ func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locke
func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
mData := s.db[key]
if mData != nil && mData.Index != previous.LastIndex {
return false, nil, errInvalidAtomicRequest
return false, nil, types.BadRequestErrorf("atomic put failed due to mismatched Index")
}
err := s.Put(key, newValue, nil)
if err != nil {
@ -106,7 +108,7 @@ func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPai
func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
mData := s.db[key]
if mData != nil && mData.Index != previous.LastIndex {
return false, errInvalidAtomicRequest
return false, types.BadRequestErrorf("atomic delete failed due to mismatched Index")
}
return true, s.Delete(key)
}

View file

@ -110,7 +110,6 @@ func Init(dc driverapi.DriverCallback) error {
if out, err := exec.Command("modprobe", "-va", "bridge", "nf_nat", "br_netfilter").Output(); err != nil {
logrus.Warnf("Running modprobe bridge nf_nat failed with message: %s, error: %v", out, err)
}
return dc.RegisterDriver(networkType, newDriver())
}

View file

@ -3,6 +3,7 @@ package libnetwork
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
@ -98,6 +99,7 @@ type containerInfo struct {
id string
config containerConfig
data ContainerData
sync.Mutex
}
type endpoint struct {
@ -114,11 +116,33 @@ type endpoint struct {
sync.Mutex
}
func (ci *containerInfo) MarshalJSON() ([]byte, error) {
ci.Lock()
defer ci.Unlock()
// We are just interested in the container ID. This can be expanded to include all of containerInfo if there is a need
return json.Marshal(ci.id)
}
func (ci *containerInfo) UnmarshalJSON(b []byte) (err error) {
ci.Lock()
defer ci.Unlock()
var id string
if err := json.Unmarshal(b, &id); err != nil {
return err
}
ci.id = id
return nil
}
func (ep *endpoint) MarshalJSON() ([]byte, error) {
ep.Lock()
defer ep.Unlock()
epMap := make(map[string]interface{})
epMap["name"] = ep.name
epMap["id"] = string(ep.id)
epMap["network"] = ep.network
epMap["ep_iface"] = ep.iFaces
epMap["exposed_ports"] = ep.exposedPorts
epMap["generic"] = ep.generic
@ -129,6 +153,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
}
func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
ep.Lock()
defer ep.Unlock()
var epMap map[string]interface{}
if err := json.Unmarshal(b, &epMap); err != nil {
return err
@ -136,11 +163,6 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
ep.name = epMap["name"].(string)
ep.id = types.UUID(epMap["id"].(string))
nb, _ := json.Marshal(epMap["network"])
var n network
json.Unmarshal(nb, &n)
ep.network = &n
ib, _ := json.Marshal(epMap["ep_iface"])
var ifaces []endpointInterface
json.Unmarshal(ib, &ifaces)
@ -191,12 +213,30 @@ func (ep *endpoint) Network() string {
return ep.network.name
}
// endpoint Key structure : endpoint/network-id/endpoint-id
func (ep *endpoint) Key() []string {
return []string{datastore.EndpointKeyPrefix, string(ep.network.id), string(ep.id)}
ep.Lock()
n := ep.network
defer ep.Unlock()
return []string{datastore.EndpointKeyPrefix, string(n.id), string(ep.id)}
}
func (ep *endpoint) KeyPrefix() []string {
return []string{datastore.EndpointKeyPrefix, string(ep.network.id)}
ep.Lock()
n := ep.network
defer ep.Unlock()
return []string{datastore.EndpointKeyPrefix, string(n.id)}
}
func (ep *endpoint) networkIDFromKey(key []string) (types.UUID, error) {
// endpoint Key structure : endpoint/network-id/endpoint-id
// its an invalid key if the key doesnt have all the 3 key elements above
if key == nil || len(key) < 3 || key[0] != datastore.EndpointKeyPrefix {
return types.UUID(""), fmt.Errorf("invalid endpoint key : %v", key)
}
// network-id is placed at index=1. pls refer to endpoint.Key() method
return types.UUID(key[1]), nil
}
func (ep *endpoint) Value() []byte {
@ -208,10 +248,14 @@ func (ep *endpoint) Value() []byte {
}
func (ep *endpoint) Index() uint64 {
ep.Lock()
defer ep.Unlock()
return ep.dbIndex
}
func (ep *endpoint) SetIndex(index uint64) {
ep.Lock()
defer ep.Unlock()
ep.dbIndex = index
}
@ -292,7 +336,14 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error {
}
ep.joinLeaveStart()
defer ep.joinLeaveEnd()
defer func() {
ep.joinLeaveEnd()
if err != nil {
if e := ep.Leave(containerID, options...); e != nil {
log.Warnf("couldnt leave endpoint : %v", ep.name, err)
}
}
}()
ep.Lock()
if ep.container != nil {
@ -321,8 +372,6 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error {
ep.Lock()
ep.container = nil
ep.Unlock()
} else {
ep.network.ctrlr.addEndpointToStore(ep)
}
}()
@ -369,8 +418,11 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error {
}
}()
container.data.SandboxKey = sb.Key()
if err := network.ctrlr.updateEndpointToStore(ep); err != nil {
return err
}
container.data.SandboxKey = sb.Key()
return nil
}
@ -399,7 +451,7 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error {
container := ep.container
n := ep.network
if container == nil || container.id == "" ||
if container == nil || container.id == "" || container.data.SandboxKey == "" ||
containerID == "" || container.id != containerID {
if container == nil {
err = ErrNoContainer{}
@ -418,6 +470,13 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error {
ctrlr := n.ctrlr
n.Unlock()
if err := ctrlr.updateEndpointToStore(ep); err != nil {
ep.Lock()
ep.container = container
ep.Unlock()
return err
}
err = driver.Leave(n.id, ep.id)
ctrlr.sandboxRm(container.data.SandboxKey, ep)
@ -426,25 +485,54 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error {
}
func (ep *endpoint) Delete() error {
var err error
ep.Lock()
epid := ep.id
name := ep.name
n := ep.network
if ep.container != nil {
ep.Unlock()
return &ActiveContainerError{name: ep.name, id: string(ep.id)}
return &ActiveContainerError{name: name, id: string(epid)}
}
n.Lock()
ctrlr := n.ctrlr
n.Unlock()
ep.Unlock()
if err := ep.deleteEndpoint(); err != nil {
if err = ctrlr.deleteEndpointFromStore(ep); err != nil {
return err
}
defer func() {
if err != nil {
ep.SetIndex(0)
if e := ctrlr.updateEndpointToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, err)
}
}
}()
// Update the endpoint count in network and update it in the datastore
n.DecEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil {
return err
}
defer func() {
if err != nil {
n.IncEndpointCnt()
if e := ctrlr.updateNetworkToStore(n); e != nil {
log.Warnf("failed to update network %s : %v", n.name, e)
}
}
}()
if err = ep.deleteEndpoint(); err != nil {
return err
}
if err := ep.network.ctrlr.deleteEndpointFromStore(ep); err != nil {
return err
}
return nil
}
func (ep *endpoint) deleteEndpoint() error {
var err error
ep.Lock()
n := ep.network
name := ep.name

View file

@ -1,6 +1,7 @@
package libnetwork
import (
"encoding/json"
"net"
"github.com/docker/libnetwork/driverapi"
@ -49,6 +50,59 @@ type endpointInterface struct {
routes []*net.IPNet
}
func (epi *endpointInterface) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = epi.id
epMap["mac"] = epi.mac.String()
epMap["addr"] = epi.addr.String()
epMap["addrv6"] = epi.addrv6.String()
epMap["srcName"] = epi.srcName
epMap["dstPrefix"] = epi.dstPrefix
var routes []string
for _, route := range epi.routes {
routes = append(routes, route.String())
}
epMap["routes"] = routes
return json.Marshal(epMap)
}
func (epi *endpointInterface) UnmarshalJSON(b []byte) (err error) {
var epMap map[string]interface{}
if err := json.Unmarshal(b, &epMap); err != nil {
return err
}
epi.id = int(epMap["id"].(float64))
mac, _ := net.ParseMAC(epMap["mac"].(string))
epi.mac = mac
_, ipnet, _ := net.ParseCIDR(epMap["addr"].(string))
if ipnet != nil {
epi.addr = *ipnet
}
_, ipnet, _ = net.ParseCIDR(epMap["addrv6"].(string))
if ipnet != nil {
epi.addrv6 = *ipnet
}
epi.srcName = epMap["srcName"].(string)
epi.dstPrefix = epMap["dstPrefix"].(string)
rb, _ := json.Marshal(epMap["routes"])
var routes []string
json.Unmarshal(rb, &routes)
epi.routes = make([]*net.IPNet, 0)
for _, route := range routes {
_, ipr, err := net.ParseCIDR(route)
if err == nil {
epi.routes = append(epi.routes, ipr)
}
}
return nil
}
type endpointJoinInfo struct {
gw net.IP
gw6 net.IP
@ -116,25 +170,25 @@ func (ep *endpoint) AddInterface(id int, mac net.HardwareAddr, ipv4 net.IPNet, i
return nil
}
func (i *endpointInterface) ID() int {
return i.id
func (epi *endpointInterface) ID() int {
return epi.id
}
func (i *endpointInterface) MacAddress() net.HardwareAddr {
return types.GetMacCopy(i.mac)
func (epi *endpointInterface) MacAddress() net.HardwareAddr {
return types.GetMacCopy(epi.mac)
}
func (i *endpointInterface) Address() net.IPNet {
return (*types.GetIPNetCopy(&i.addr))
func (epi *endpointInterface) Address() net.IPNet {
return (*types.GetIPNetCopy(&epi.addr))
}
func (i *endpointInterface) AddressIPv6() net.IPNet {
return (*types.GetIPNetCopy(&i.addrv6))
func (epi *endpointInterface) AddressIPv6() net.IPNet {
return (*types.GetIPNetCopy(&epi.addrv6))
}
func (i *endpointInterface) SetNames(srcName string, dstPrefix string) error {
i.srcName = srcName
i.dstPrefix = dstPrefix
func (epi *endpointInterface) SetNames(srcName string, dstPrefix string) error {
epi.srcName = srcName
epi.dstPrefix = dstPrefix
return nil
}

View file

@ -25,6 +25,7 @@ import (
)
const defaultHeartbeat = time.Duration(10) * time.Second
const TTLFactor = 3
type hostDiscovery struct {
discovery discovery.Discovery
@ -47,7 +48,7 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join
if hb == 0 {
hb = defaultHeartbeat
}
d, err := discovery.New(cfg.Discovery, hb, 3*hb)
d, err := discovery.New(cfg.Discovery, hb, TTLFactor*hb)
if err != nil {
return err
}

View file

@ -12,6 +12,7 @@ import (
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
"github.com/docker/libnetwork/types"
"github.com/docker/swarm/pkg/store"
)
// A Network represents a logical connectivity zone that containers may
@ -58,6 +59,7 @@ type network struct {
id types.UUID
driver driverapi.Driver
enableIPv6 bool
endpointCnt uint64
endpoints endpointTable
generic options.Generic
dbIndex uint64
@ -90,6 +92,8 @@ func (n *network) Type() string {
}
func (n *network) Key() []string {
n.Lock()
defer n.Unlock()
return []string{datastore.NetworkKeyPrefix, string(n.id)}
}
@ -98,6 +102,8 @@ func (n *network) KeyPrefix() []string {
}
func (n *network) Value() []byte {
n.Lock()
defer n.Unlock()
b, err := json.Marshal(n)
if err != nil {
return nil
@ -106,11 +112,33 @@ func (n *network) Value() []byte {
}
func (n *network) Index() uint64 {
n.Lock()
defer n.Unlock()
return n.dbIndex
}
func (n *network) SetIndex(index uint64) {
n.Lock()
n.dbIndex = index
n.Unlock()
}
func (n *network) EndpointCnt() uint64 {
n.Lock()
defer n.Unlock()
return n.endpointCnt
}
func (n *network) IncEndpointCnt() {
n.Lock()
n.endpointCnt++
n.Unlock()
}
func (n *network) DecEndpointCnt() {
n.Lock()
n.endpointCnt--
n.Unlock()
}
// TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
@ -119,6 +147,7 @@ func (n *network) MarshalJSON() ([]byte, error) {
netMap["name"] = n.name
netMap["id"] = string(n.id)
netMap["networkType"] = n.networkType
netMap["endpointCnt"] = n.endpointCnt
netMap["enableIPv6"] = n.enableIPv6
netMap["generic"] = n.generic
return json.Marshal(netMap)
@ -133,6 +162,7 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
n.name = netMap["name"].(string)
n.id = types.UUID(netMap["id"].(string))
n.networkType = netMap["networkType"].(string)
n.endpointCnt = uint64(netMap["endpointCnt"].(float64))
n.enableIPv6 = netMap["enableIPv6"].(bool)
if netMap["generic"] != nil {
n.generic = netMap["generic"].(map[string]interface{})
@ -165,39 +195,51 @@ func (n *network) processOptions(options ...NetworkOption) {
}
func (n *network) Delete() error {
n.ctrlr.Lock()
_, ok := n.ctrlr.networks[n.id]
var err error
n.Lock()
ctrlr := n.ctrlr
n.Unlock()
ctrlr.Lock()
_, ok := ctrlr.networks[n.id]
ctrlr.Unlock()
if !ok {
n.ctrlr.Unlock()
return &UnknownNetworkError{name: n.name, id: string(n.id)}
}
n.Lock()
numEps := len(n.endpoints)
n.Unlock()
numEps := n.EndpointCnt()
if numEps != 0 {
n.ctrlr.Unlock()
return &ActiveEndpointsError{name: n.name, id: string(n.id)}
}
n.ctrlr.Unlock()
if err = n.deleteNetwork(); err != nil {
// deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help
// prevent any possible race between endpoint join and network delete
if err = ctrlr.deleteNetworkFromStore(n); err != nil {
if err == store.ErrKeyModified {
return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
}
return err
}
if err = n.ctrlr.deleteNetworkFromStore(n); err != nil {
log.Warnf("Delete network (%s - %v) failed from datastore : %v", n.name, n.id, err)
if err = n.deleteNetwork(); err != nil {
return err
}
return nil
}
func (n *network) deleteNetwork() error {
var err error
n.Lock()
id := n.id
d := n.driver
n.ctrlr.Lock()
delete(n.ctrlr.networks, n.id)
delete(n.ctrlr.networks, id)
n.ctrlr.Unlock()
if err := n.driver.DeleteNetwork(n.id); err != nil {
n.Unlock()
if err := d.DeleteNetwork(n.id); err != nil {
// Forbidden Errors should be honored
if _, ok := err.(types.ForbiddenError); ok {
n.ctrlr.Lock()
@ -233,11 +275,12 @@ func (n *network) addEndpoint(ep *endpoint) error {
}
func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) {
var err error
if name == "" {
return nil, ErrInvalidName(name)
}
if _, err := n.EndpointByName(name); err == nil {
if _, err = n.EndpointByName(name); err == nil {
return nil, types.ForbiddenErrorf("service endpoint with name %s already exists", name)
}
@ -246,11 +289,34 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep.network = n
ep.processOptions(options...)
if err := n.addEndpoint(ep); err != nil {
n.Lock()
ctrlr := n.ctrlr
n.Unlock()
n.IncEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil {
return nil, err
}
defer func() {
if err != nil {
n.DecEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil {
log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err)
}
}
}()
if err = n.addEndpoint(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := ep.Delete(); ep != nil {
log.Warnf("cleaning up endpoint failed %s : %v", name, e)
}
}
}()
if err := n.ctrlr.addEndpointToStore(ep); err != nil {
if err = ctrlr.updateEndpointToStore(ep); err != nil {
return nil, err
}

View file

@ -10,11 +10,14 @@ import (
)
func (c *controller) initDataStore() error {
if c.cfg == nil {
c.Lock()
cfg := c.cfg
c.Unlock()
if cfg == nil {
return fmt.Errorf("datastore initialization requires a valid configuration")
}
store, err := datastore.NewDataStore(&c.cfg.Datastore)
store, err := datastore.NewDataStore(&cfg.Datastore)
if err != nil {
return err
}
@ -25,15 +28,15 @@ func (c *controller) initDataStore() error {
}
func (c *controller) newNetworkFromStore(n *network) error {
c.Lock()
n.Lock()
n.ctrlr = c
c.Unlock()
n.endpoints = endpointTable{}
n.Unlock()
return c.addNetwork(n)
}
func (c *controller) addNetworkToStore(n *network) error {
func (c *controller) updateNetworkToStore(n *network) error {
if isReservedNetwork(n.Name()) {
return nil
}
@ -45,11 +48,7 @@ func (c *controller) addNetworkToStore(n *network) error {
return nil
}
// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
// return cs.PutObjectAtomic(n)
return cs.PutObject(n)
return cs.PutObjectAtomic(n)
}
func (c *controller) deleteNetworkFromStore(n *network) error {
@ -64,11 +63,7 @@ func (c *controller) deleteNetworkFromStore(n *network) error {
return nil
}
if err := cs.DeleteObject(n); err != nil {
return err
}
if err := cs.DeleteTree(&endpoint{network: n}); err != nil {
if err := cs.DeleteObjectAtomic(n); err != nil {
return err
}
@ -83,36 +78,42 @@ func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) {
return &n, nil
}
func (c *controller) newEndpointFromStore(ep *endpoint) {
c.Lock()
n, ok := c.networks[ep.network.id]
c.Unlock()
if !ok {
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
ep.Lock()
n := ep.network
id := ep.id
ep.Unlock()
if n == nil {
// Possibly the watch event for the network has not shown up yet
// Try to get network from the store
var err error
n, err = c.getNetworkFromStore(ep.network.id)
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
log.Warnf("Network (%s) unavailable for endpoint=%s", ep.network.id, ep.name)
return
return err
}
n, err = c.getNetworkFromStore(nid)
if err != nil {
return err
}
if err := c.newNetworkFromStore(n); err != nil {
log.Warnf("Failed to add Network (%s - %s) from store", n.name, n.id)
return
return err
}
n = c.networks[nid]
}
_, err := n.EndpointByID(string(id))
if err != nil {
if _, ok := err.(ErrNoSuchEndpoint); ok {
return n.addEndpoint(ep)
}
}
ep.network = n
_, err := n.EndpointByID(string(ep.id))
if _, ok := err.(ErrNoSuchEndpoint); ok {
n.addEndpoint(ep)
}
return err
}
func (c *controller) addEndpointToStore(ep *endpoint) error {
func (c *controller) updateEndpointToStore(ep *endpoint) error {
ep.Lock()
name := ep.name
if isReservedNetwork(ep.network.name) {
ep.Unlock()
return nil
}
ep.Unlock()
@ -120,15 +121,11 @@ func (c *controller) addEndpointToStore(ep *endpoint) error {
cs := c.store
c.Unlock()
if cs == nil {
log.Debugf("datastore not initialized. endpoint %s is not added to the store", ep.name)
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
return nil
}
// Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875,
// Also Network object is Keyed with UUID & hence an Atomic put is not mandatory.
// return cs.PutObjectAtomic(ep)
return cs.PutObject(ep)
return cs.PutObjectAtomic(ep)
}
func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) {
@ -151,7 +148,7 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
return nil
}
if err := cs.DeleteObject(ep); err != nil {
if err := cs.DeleteObjectAtomic(ep); err != nil {
return err
}
@ -163,11 +160,11 @@ func (c *controller) watchStore() error {
cs := c.store
c.Unlock()
nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan)
nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), nil)
if err != nil {
return err
}
epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), c.stopChan)
epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil)
if err != nil {
return err
}
@ -187,16 +184,18 @@ func (c *controller) watchStore() error {
existing, ok := c.networks[n.id]
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.dbIndex {
existing.dbIndex = n.dbIndex
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
continue
}
}
case eps := <-epPairs:
@ -208,26 +207,78 @@ func (c *controller) watchStore() error {
continue
}
ep.dbIndex = epe.LastIndex
c.Lock()
n, ok := c.networks[ep.network.id]
c.Unlock()
if ok {
existing, _ := n.EndpointByID(string(ep.id))
if existing != nil {
ee := existing.(*endpoint)
// Skip existing endpoint update
if ee.dbIndex != ep.dbIndex {
ee.dbIndex = ep.dbIndex
ee.container = ep.container
}
n, err := c.networkFromEndpointKey(epe.Key, &ep)
if err != nil {
if _, ok := err.(ErrNoSuchNetwork); !ok {
log.Error(err)
continue
}
}
c.newEndpointFromStore(&ep)
if n != nil {
ep.network = n.(*network)
}
if c.processEndpointUpdate(&ep) {
err = c.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
}
}
}
}
}()
return nil
}
func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) {
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
return nil, err
}
return c.NetworkByID(string(nid))
}
func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) {
eKey, err := datastore.ParseKey(key)
if err != nil {
return types.UUID(""), err
}
return ep.networkIDFromKey(eKey)
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
nw := ep.network
if nw == nil {
return true
}
nw.Lock()
id := nw.id
nw.Unlock()
c.Lock()
n, ok := c.networks[id]
c.Unlock()
if !ok {
return true
}
existing, _ := n.EndpointByID(string(ep.id))
if existing == nil {
return true
}
ee := existing.(*endpoint)
ee.Lock()
if ee.dbIndex != ep.dbIndex {
ee.dbIndex = ep.dbIndex
if ee.container != nil && ep.container != nil {
// we care only about the container id
ee.container.id = ep.container.id
} else {
// we still care only about the container id, but this is a short-cut to communicate join or leave operation
ee.container = ep.container
}
}
ee.Unlock()
return false
}