diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index 5fe953ac05..ee7a367fb8 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -27,8 +27,8 @@ type Backend interface { CreateManagedNetwork(clustertypes.NetworkCreateRequest) error DeleteManagedNetwork(name string) error FindNetwork(idName string) (libnetwork.Network, error) - SetupIngress(req clustertypes.NetworkCreateRequest, nodeIP string) error - ReleaseIngress() error + SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error) + ReleaseIngress() (<-chan struct{}, error) PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error) ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go index 4af8bc8f10..6be0f3156c 100644 --- a/daemon/cluster/executor/container/executor.go +++ b/daemon/cluster/executor/container/executor.go @@ -139,13 +139,15 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error { options.IPAM.Config = append(options.IPAM.Config, c) } - return e.backend.SetupIngress(clustertypes.NetworkCreateRequest{ + _, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{ ID: na.Network.ID, NetworkCreateRequest: types.NetworkCreateRequest{ Name: na.Network.Spec.Annotations.Name, NetworkCreate: options, }, }, na.Addresses[0]) + + return err } // Controller returns a docker container runner. diff --git a/daemon/daemon.go b/daemon/daemon.go index 71e235a1da..3e2ed115c8 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -445,7 +445,25 @@ func (daemon *Daemon) DaemonLeavesCluster() { // Daemon is in charge of removing the attachable networks with // connected containers when the node leaves the swarm daemon.clearAttachableNetworks() + // We no longer need the cluster provider, stop it now so that + // the network agent will stop listening to cluster events. daemon.setClusterProvider(nil) + // Wait for the networking cluster agent to stop + daemon.netController.AgentStopWait() + // Daemon is in charge of removing the ingress network when the + // node leaves the swarm. Wait for job to be done or timeout. + // This is called also on graceful daemon shutdown. We need to + // wait, because the ingress release has to happen before the + // network controller is stopped. + if done, err := daemon.ReleaseIngress(); err == nil { + select { + case <-done: + case <-time.After(5 * time.Second): + logrus.Warnf("timeout while waiting for ingress network removal") + } + } else { + logrus.Warnf("failed to initiate ingress network removal: %v", err) + } } // setClusterProvider sets a component for querying the current cluster state. @@ -832,6 +850,12 @@ func (daemon *Daemon) Shutdown() error { } } + // If we are part of a cluster, clean up cluster's stuff + if daemon.clusterProvider != nil { + logrus.Debugf("start clean shutdown of cluster resources...") + daemon.DaemonLeavesCluster() + } + // Shutdown plugins after containers and layerstore. Don't change the order. daemon.pluginShutdown() diff --git a/daemon/network.go b/daemon/network.go index d72fbb6c57..06d3b3eb87 100644 --- a/daemon/network.go +++ b/daemon/network.go @@ -101,8 +101,9 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network { } type ingressJob struct { - create *clustertypes.NetworkCreateRequest - ip net.IP + create *clustertypes.NetworkCreateRequest + ip net.IP + jobDone chan struct{} } var ( @@ -124,6 +125,7 @@ func (daemon *Daemon) startIngressWorker() { daemon.releaseIngress(ingressID) ingressID = "" } + close(r.jobDone) } } }() @@ -137,19 +139,23 @@ func (daemon *Daemon) enqueueIngressJob(job *ingressJob) { } // SetupIngress setups ingress networking. -func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) error { +// The function returns a channel which will signal the caller when the programming is completed. +func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) (<-chan struct{}, error) { ip, _, err := net.ParseCIDR(nodeIP) if err != nil { - return err + return nil, err } - daemon.enqueueIngressJob(&ingressJob{&create, ip}) - return nil + done := make(chan struct{}) + daemon.enqueueIngressJob(&ingressJob{&create, ip, done}) + return done, nil } // ReleaseIngress releases the ingress networking. -func (daemon *Daemon) ReleaseIngress() error { - daemon.enqueueIngressJob(&ingressJob{nil, nil}) - return nil +// The function returns a channel which will signal the caller when the programming is completed. +func (daemon *Daemon) ReleaseIngress() (<-chan struct{}, error) { + done := make(chan struct{}) + daemon.enqueueIngressJob(&ingressJob{nil, nil, done}) + return done, nil } func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) { diff --git a/vendor.conf b/vendor.conf index 29db3350c1..61a947ecfb 100644 --- a/vendor.conf +++ b/vendor.conf @@ -24,7 +24,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5 github.com/imdario/mergo 0.2.1 #get libnetwork packages -github.com/docker/libnetwork b6cb1eee1e7fc27ee05f0eb830d3e60e67a88565 +github.com/docker/libnetwork f3c4ca8ce5c128e071bab198c4ed9fd0d08384eb github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/vendor/github.com/docker/libnetwork/agent.go b/vendor/github.com/docker/libnetwork/agent.go index feda2c2846..ff1869055f 100644 --- a/vendor/github.com/docker/libnetwork/agent.go +++ b/vendor/github.com/docker/libnetwork/agent.go @@ -218,6 +218,7 @@ func (c *controller) agentSetup() error { if c.agent != nil && c.agentInitDone != nil { close(c.agentInitDone) c.agentInitDone = nil + c.agentStopDone = make(chan struct{}) } c.Unlock() diff --git a/vendor/github.com/docker/libnetwork/controller.go b/vendor/github.com/docker/libnetwork/controller.go index c3d25ee720..0674af516d 100644 --- a/vendor/github.com/docker/libnetwork/controller.go +++ b/vendor/github.com/docker/libnetwork/controller.go @@ -127,6 +127,9 @@ type NetworkController interface { // Wait for agent initialization complete in libnetwork controller AgentInitWait() + // Wait for agent to stop if running + AgentStopWait() + // SetKeys configures the encryption key for gossip and overlay data path SetKeys(keys []*types.EncryptionKey) error } @@ -160,6 +163,7 @@ type controller struct { agent *agent networkLocker *locker.Locker agentInitDone chan struct{} + agentStopDone chan struct{} keys []*types.EncryptionKey clusterConfigAvailable bool sync.Mutex @@ -338,7 +342,12 @@ func (c *controller) clusterAgentInit() { c.agentClose() c.cleanupServiceBindings("") - c.clearIngress(true) + c.Lock() + if c.agentStopDone != nil { + close(c.agentStopDone) + c.agentStopDone = nil + } + c.Unlock() return } @@ -357,6 +366,15 @@ func (c *controller) AgentInitWait() { } } +func (c *controller) AgentStopWait() { + c.Lock() + agentStopDone := c.agentStopDone + c.Unlock() + if agentStopDone != nil { + <-agentStopDone + } +} + func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { if c.cfg == nil { return nil @@ -1153,46 +1171,7 @@ func (c *controller) getIPAMDriver(name string) (ipamapi.Ipam, *ipamapi.Capabili } func (c *controller) Stop() { - c.clearIngress(false) c.closeStores() c.stopExternalKeyListener() osl.GC() } - -func (c *controller) clearIngress(clusterLeave bool) { - c.Lock() - ingressSandbox := c.ingressSandbox - c.ingressSandbox = nil - c.Unlock() - - var n *network - if ingressSandbox != nil { - for _, ep := range ingressSandbox.getConnectedEndpoints() { - if nw := ep.getNetwork(); nw.ingress { - n = nw - break - } - } - if err := ingressSandbox.Delete(); err != nil { - logrus.Warnf("Could not delete ingress sandbox while leaving: %v", err) - } - } - - if n == nil { - for _, nw := range c.Networks() { - if nw.Info().Ingress() { - n = nw.(*network) - break - } - } - } - if n == nil && clusterLeave { - logrus.Warnf("Could not find ingress network while leaving") - } - - if n != nil { - if err := n.Delete(); err != nil { - logrus.Warnf("Could not delete ingress network while leaving: %v", err) - } - } -} diff --git a/vendor/github.com/docker/libnetwork/drivers/overlay/joinleave.go b/vendor/github.com/docker/libnetwork/drivers/overlay/joinleave.go index 0af09b71ba..cdbb428281 100644 --- a/vendor/github.com/docker/libnetwork/drivers/overlay/joinleave.go +++ b/vendor/github.com/docker/libnetwork/drivers/overlay/joinleave.go @@ -205,7 +205,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri return } - d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true) + d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false) } // Leave method is invoked when a Sandbox detaches from an endpoint. diff --git a/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go b/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go index d2c6f6784f..64e5744403 100644 --- a/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go +++ b/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go @@ -645,19 +645,28 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { continue } - if neigh.IP.To4() == nil { - if neigh.HardwareAddr != nil { - logrus.Debugf("Miss notification, l2 mac %v", neigh.HardwareAddr) - } + var ( + ip net.IP + mac net.HardwareAddr + l2Miss, l3Miss bool + ) + if neigh.IP.To4() != nil { + ip = neigh.IP + l3Miss = true + } else if neigh.HardwareAddr != nil { + mac = []byte(neigh.HardwareAddr) + ip = net.IP(mac[2:]) + l2Miss = true + } else { continue } // Not any of the network's subnets. Ignore. - if !n.contains(neigh.IP) { + if !n.contains(ip) { continue } - logrus.Debugf("miss notification for dest IP, %v", neigh.IP.String()) + logrus.Debugf("miss notification: dest IP %v, dest MAC %v", ip, mac) if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 { continue @@ -667,14 +676,14 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { continue } - mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, neigh.IP) + mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip) if err != nil { - logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err) + logrus.Errorf("could not resolve peer %q: %v", ip, err) continue } - if err := n.driver.peerAdd(n.id, "dummy", neigh.IP, IPmask, mac, vtep, true); err != nil { - logrus.Errorf("could not add neighbor entry for missed peer %q: %v", neigh.IP, err) + if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil { + logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err) } } } diff --git a/vendor/github.com/docker/libnetwork/drivers/overlay/ov_serf.go b/vendor/github.com/docker/libnetwork/drivers/overlay/ov_serf.go index 15c71a3d30..9002bce6b7 100644 --- a/vendor/github.com/docker/libnetwork/drivers/overlay/ov_serf.go +++ b/vendor/github.com/docker/libnetwork/drivers/overlay/ov_serf.go @@ -121,7 +121,7 @@ func (d *driver) processEvent(u serf.UserEvent) { switch action { case "join": if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, - net.ParseIP(vtepStr), true); err != nil { + net.ParseIP(vtepStr), true, false, false); err != nil { logrus.Errorf("Peer add failed in the driver: %v\n", err) } case "leave": diff --git a/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go b/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go index adc370ff0f..6551e7bcad 100644 --- a/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go +++ b/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go @@ -236,7 +236,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) { op := func() { if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, - false); err != nil { + false, false, false); err != nil { fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v", pKey.peerIP, pKey.peerMac, err) } @@ -254,7 +254,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) { } func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, - peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error { if err := validateID(nid, eid); err != nil { return err @@ -297,12 +297,12 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, } // Add neighbor entry for the peer IP - if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil { + if err := sbox.AddNeighbor(peerIP, peerMac, l3Miss, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil { return fmt.Errorf("could not add neighbor entry into the sandbox: %v", err) } // Add fdb entry to the bridge for the peer mac - if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName), + if err := sbox.AddNeighbor(vtep, peerMac, l2Miss, sbox.NeighborOptions().LinkName(s.vxlanName), sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil { return fmt.Errorf("could not add fdb entry into the sandbox: %v", err) } diff --git a/vendor/github.com/docker/libnetwork/drivers/solaris/overlay/peerdb.go b/vendor/github.com/docker/libnetwork/drivers/solaris/overlay/peerdb.go index d4b5e8e399..d1499e2132 100644 --- a/vendor/github.com/docker/libnetwork/drivers/solaris/overlay/peerdb.go +++ b/vendor/github.com/docker/libnetwork/drivers/solaris/overlay/peerdb.go @@ -279,7 +279,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, } // Add neighbor entry for the peer IP - if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil { + if err := sbox.AddNeighbor(peerIP, peerMac, false, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil { return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err) } diff --git a/vendor/github.com/docker/libnetwork/endpoint.go b/vendor/github.com/docker/libnetwork/endpoint.go index f47ea4e18f..a9008e4653 100644 --- a/vendor/github.com/docker/libnetwork/endpoint.go +++ b/vendor/github.com/docker/libnetwork/endpoint.go @@ -519,6 +519,14 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { return err } + defer func() { + if err != nil { + if e := ep.deleteDriverInfoFromCluster(); e != nil { + logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster on join failure: %v", ep.Name(), e) + } + } + }() + if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil { return sb.setupDefaultGW() } @@ -576,39 +584,64 @@ func doUpdateHostsFile(n *network, sb *sandbox) bool { } func (ep *endpoint) rename(name string) error { - var err error + var ( + err error + netWatch *netWatch + ok bool + ) + n := ep.getNetwork() if n == nil { return fmt.Errorf("network not connected for ep %q", ep.name) } - n.getController().Lock() - netWatch, ok := n.getController().nmap[n.ID()] - n.getController().Unlock() + c := n.getController() - if !ok { - return fmt.Errorf("watch null for network %q", n.Name()) + if c.isAgent() { + if err = ep.deleteServiceInfoFromCluster(); err != nil { + return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err) + } + } else { + c.Lock() + netWatch, ok = c.nmap[n.ID()] + c.Unlock() + if !ok { + return fmt.Errorf("watch null for network %q", n.Name()) + } + n.updateSvcRecord(ep, c.getLocalEps(netWatch), false) } - n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false) - oldName := ep.name oldAnonymous := ep.anonymous ep.name = name ep.anonymous = false - n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true) - defer func() { - if err != nil { - n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false) - ep.name = oldName - ep.anonymous = oldAnonymous - n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true) + if c.isAgent() { + if err = ep.addServiceInfoToCluster(); err != nil { + return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err) } - }() + defer func() { + if err != nil { + ep.deleteServiceInfoFromCluster() + ep.name = oldName + ep.anonymous = oldAnonymous + ep.addServiceInfoToCluster() + } + }() + } else { + n.updateSvcRecord(ep, c.getLocalEps(netWatch), true) + defer func() { + if err != nil { + n.updateSvcRecord(ep, c.getLocalEps(netWatch), false) + ep.name = oldName + ep.anonymous = oldAnonymous + n.updateSvcRecord(ep, c.getLocalEps(netWatch), true) + } + }() + } // Update the store with the updated name - if err = n.getController().updateToStore(ep); err != nil { + if err = c.updateToStore(ep); err != nil { return err } // After the name change do a dummy endpoint count update to diff --git a/vendor/github.com/docker/libnetwork/error.go b/vendor/github.com/docker/libnetwork/error.go index c0054ce70c..f62ac0caa5 100644 --- a/vendor/github.com/docker/libnetwork/error.go +++ b/vendor/github.com/docker/libnetwork/error.go @@ -183,3 +183,11 @@ func (mr ManagerRedirectError) Error() string { // Maskable denotes the type of this error func (mr ManagerRedirectError) Maskable() {} + +// ErrDataStoreNotInitialized is returned if an invalid data scope is passed +// for getting data store +type ErrDataStoreNotInitialized string + +func (dsni ErrDataStoreNotInitialized) Error() string { + return fmt.Sprintf("datastore for scope %q is not initialized", string(dsni)) +} diff --git a/vendor/github.com/docker/libnetwork/networkdb/broadcast.go b/vendor/github.com/docker/libnetwork/networkdb/broadcast.go index a9330cb3a2..5555634179 100644 --- a/vendor/github.com/docker/libnetwork/networkdb/broadcast.go +++ b/vendor/github.com/docker/libnetwork/networkdb/broadcast.go @@ -105,8 +105,7 @@ type tableEventMessage struct { } func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool { - otherm := other.(*tableEventMessage) - return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key + return false } func (m *tableEventMessage) Message() []byte { diff --git a/vendor/github.com/docker/libnetwork/osl/neigh_linux.go b/vendor/github.com/docker/libnetwork/osl/neigh_linux.go index c881f5df48..81b6cd9c1c 100644 --- a/vendor/github.com/docker/libnetwork/osl/neigh_linux.go +++ b/vendor/github.com/docker/libnetwork/osl/neigh_linux.go @@ -72,8 +72,11 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, nlnh.LinkIndex = iface.Attrs().Index } + // If the kernel deletion fails for the neighbor entry still remote it + // from the namespace cache. Otherwise if the neighbor moves back to the + // same host again, kernel update can fail. if err := nlh.NeighDel(nlnh); err != nil { - return fmt.Errorf("could not delete neighbor entry: %v", err) + logrus.Warnf("Deleting neighbor IP %s, mac %s failed, %v", dstIP, dstMac, err) } } @@ -85,21 +88,26 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, } } n.Unlock() + logrus.Debugf("Neighbor entry deleted for IP %v, mac %v", dstIP, dstMac) return nil } -func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error { +func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, options ...NeighOption) error { var ( iface netlink.Link err error ) + // If the namespace already has the neighbor entry but the AddNeighbor is called + // because of a miss notification (force flag) program the kernel anyway. nh := n.findNeighbor(dstIP, dstMac) if nh != nil { - logrus.Debugf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac) - // If it exists silently return - return nil + if !force { + logrus.Warnf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac) + return nil + } + logrus.Warnf("Force kernel update, Neighbor entry already present for IP %v, mac %v", dstIP, dstMac) } nh = &neigh{ @@ -150,6 +158,7 @@ func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, op n.Lock() n.neighbors = append(n.neighbors, nh) n.Unlock() + logrus.Debugf("Neighbor entry added for IP %v, mac %v", dstIP, dstMac) return nil } diff --git a/vendor/github.com/docker/libnetwork/osl/sandbox.go b/vendor/github.com/docker/libnetwork/osl/sandbox.go index 18085c9082..64288f9307 100644 --- a/vendor/github.com/docker/libnetwork/osl/sandbox.go +++ b/vendor/github.com/docker/libnetwork/osl/sandbox.go @@ -39,7 +39,7 @@ type Sandbox interface { RemoveStaticRoute(*types.StaticRoute) error // AddNeighbor adds a neighbor entry into the sandbox. - AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, option ...NeighOption) error + AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, option ...NeighOption) error // DeleteNeighbor deletes neighbor entry from the sandbox. DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error diff --git a/vendor/github.com/docker/libnetwork/store.go b/vendor/github.com/docker/libnetwork/store.go index 8df4f6918a..58e1d852f1 100644 --- a/vendor/github.com/docker/libnetwork/store.go +++ b/vendor/github.com/docker/libnetwork/store.go @@ -225,7 +225,7 @@ func (n *network) getEndpointsFromStore() ([]*endpoint, error) { func (c *controller) updateToStore(kvObject datastore.KVObject) error { cs := c.getStore(kvObject.DataScope()) if cs == nil { - return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) + return ErrDataStoreNotInitialized(kvObject.DataScope()) } if err := cs.PutObjectAtomic(kvObject); err != nil { @@ -241,7 +241,7 @@ func (c *controller) updateToStore(kvObject datastore.KVObject) error { func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { cs := c.getStore(kvObject.DataScope()) if cs == nil { - return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope()) + return ErrDataStoreNotInitialized(kvObject.DataScope()) } retry: