Merge pull request #46668 from corhere/libn/svc-record-update-without-store
libnetwork: svc record update without store
This commit is contained in:
commit
5b19725de2
6 changed files with 27 additions and 127 deletions
|
@ -87,10 +87,7 @@ type Controller struct {
|
|||
cfg *config.Config
|
||||
store *datastore.Store
|
||||
extKeyListener net.Listener
|
||||
watchCh chan *Endpoint
|
||||
unWatchCh chan *Endpoint
|
||||
svcRecords map[string]*svcInfo
|
||||
nmap map[string]*netWatch
|
||||
serviceBindings map[serviceKey]*service
|
||||
ingressSandbox *Sandbox
|
||||
agent *nwAgent
|
||||
|
@ -242,7 +239,7 @@ func (c *Controller) clusterAgentInit() {
|
|||
c.mu.Unlock()
|
||||
fallthrough
|
||||
case cluster.EventSocketChange, cluster.EventNodeReady:
|
||||
if keysAvailable && !c.isDistributedControl() {
|
||||
if keysAvailable && c.isSwarmNode() {
|
||||
c.agentOperationStart()
|
||||
if err := c.agentSetup(clusterProvider); err != nil {
|
||||
c.agentStopComplete()
|
||||
|
@ -452,8 +449,8 @@ func (c *Controller) isAgent() bool {
|
|||
return c.cfg.ClusterProvider.IsAgent()
|
||||
}
|
||||
|
||||
func (c *Controller) isDistributedControl() bool {
|
||||
return !c.isManager() && !c.isAgent()
|
||||
func (c *Controller) isSwarmNode() bool {
|
||||
return c.isManager() || c.isAgent()
|
||||
}
|
||||
|
||||
func (c *Controller) GetPluginGetter() plugingetter.PluginGetter {
|
||||
|
@ -554,7 +551,7 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
|
|||
|
||||
// At this point the network scope is still unknown if not set by user
|
||||
if (caps.DataScope == scope.Global || nw.scope == scope.Swarm) &&
|
||||
!c.isDistributedControl() && !nw.dynamic {
|
||||
c.isSwarmNode() && !nw.dynamic {
|
||||
if c.isManager() {
|
||||
// For non-distributed controlled environment, globalscoped non-dynamic networks are redirected to Manager
|
||||
return nil, ManagerRedirectError(name)
|
||||
|
@ -562,7 +559,7 @@ func (c *Controller) NewNetwork(networkType, name string, id string, options ...
|
|||
return nil, types.ForbiddenErrorf("Cannot create a multi-host network from a worker node. Please create the network from a manager node.")
|
||||
}
|
||||
|
||||
if nw.scope == scope.Swarm && c.isDistributedControl() {
|
||||
if nw.scope == scope.Swarm && !c.isSwarmNode() {
|
||||
return nil, types.ForbiddenErrorf("cannot create a swarm scoped network when swarm is not active")
|
||||
}
|
||||
|
||||
|
@ -705,7 +702,7 @@ addToStore:
|
|||
}
|
||||
}
|
||||
|
||||
if !c.isDistributedControl() {
|
||||
if c.isSwarmNode() {
|
||||
c.mu.Lock()
|
||||
arrangeIngressFilterRule()
|
||||
c.mu.Unlock()
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/docker/docker/libnetwork/ipamapi"
|
||||
"github.com/docker/docker/libnetwork/netlabel"
|
||||
"github.com/docker/docker/libnetwork/options"
|
||||
"github.com/docker/docker/libnetwork/scope"
|
||||
"github.com/docker/docker/libnetwork/types"
|
||||
)
|
||||
|
||||
|
@ -456,9 +457,10 @@ func (ep *Endpoint) sbJoin(sb *Sandbox, options ...EndpointOption) (err error) {
|
|||
}
|
||||
}()
|
||||
|
||||
// Watch for service records
|
||||
if !n.getController().isAgent() {
|
||||
n.getController().watchSvcRecord(ep)
|
||||
if !n.getController().isSwarmNode() || n.Scope() != scope.Swarm || !n.driverIsMultihost() {
|
||||
n.updateSvcRecord(ep, true)
|
||||
}
|
||||
}
|
||||
|
||||
// Do not update hosts file with internal networks endpoint IP
|
||||
|
@ -589,13 +591,6 @@ func (ep *Endpoint) rename(name string) error {
|
|||
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
|
||||
}
|
||||
} else {
|
||||
c.mu.Lock()
|
||||
_, ok = c.nmap[n.ID()]
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
// FIXME(thaJeztah): what is this check for, or is this to prevent a race condition (network removed)?
|
||||
return fmt.Errorf("watch null for network %q", n.Name())
|
||||
}
|
||||
n.updateSvcRecord(ep, false)
|
||||
}
|
||||
|
||||
|
@ -636,14 +631,6 @@ func (ep *Endpoint) rename(name string) error {
|
|||
if err = c.updateToStore(ep); err != nil {
|
||||
return err
|
||||
}
|
||||
// After the name change do a dummy endpoint count update to
|
||||
// trigger the service record update in the peer nodes
|
||||
|
||||
// Ignore the error because updateStore fail for EpCnt is a
|
||||
// benign error. Besides there is no meaningful recovery that
|
||||
// we can do. When the cluster recovers subsequent EpCnt update
|
||||
// will force the peers to get the correct EP name.
|
||||
_ = n.getEpCnt().updateStore()
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -818,8 +805,9 @@ func (ep *Endpoint) Delete(force bool) error {
|
|||
}
|
||||
}()
|
||||
|
||||
// unwatch for service records
|
||||
n.getController().unWatchSvcRecord(ep)
|
||||
if !n.getController().isSwarmNode() || n.Scope() != scope.Swarm || !n.driverIsMultihost() {
|
||||
n.updateSvcRecord(ep, false)
|
||||
}
|
||||
|
||||
if err = ep.deleteEndpoint(force); err != nil && !force {
|
||||
return err
|
||||
|
|
|
@ -1225,13 +1225,14 @@ func (n *Network) createEndpoint(name string, options ...EndpointOption) (*Endpo
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Watch for service records
|
||||
n.getController().watchSvcRecord(ep)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
n.getController().unWatchSvcRecord(ep)
|
||||
}
|
||||
}()
|
||||
if !n.getController().isSwarmNode() || n.Scope() != scope.Swarm || !n.driverIsMultihost() {
|
||||
n.updateSvcRecord(ep, true)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
n.updateSvcRecord(ep, false)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Increment endpoint count to indicate completion of endpoint addition
|
||||
if err = n.getEpCnt().IncEndpointCnt(); err != nil {
|
||||
|
|
|
@ -162,7 +162,7 @@ func (sb *Sandbox) delete(force bool) error {
|
|||
}
|
||||
// Retain the sanbdox if we can't obtain the network from store.
|
||||
if _, err := c.getNetworkFromStore(ep.getNetwork().ID()); err != nil {
|
||||
if c.isDistributedControl() {
|
||||
if !c.isSwarmNode() {
|
||||
retain = true
|
||||
}
|
||||
log.G(context.TODO()).Warnf("Failed getting network for ep %s during sandbox %s delete: %v", ep.ID(), sb.ID(), err)
|
||||
|
@ -459,7 +459,7 @@ func (sb *Sandbox) ResolveName(ctx context.Context, name string, ipType int) ([]
|
|||
// network, ingress network and docker_gwbridge network. Name resolution
|
||||
// should prioritize returning the VIP/IPs on user overlay network.
|
||||
newList := []*Endpoint{}
|
||||
if !sb.controller.isDistributedControl() {
|
||||
if sb.controller.isSwarmNode() {
|
||||
newList = append(newList, getDynamicNwEndpoints(epList)...)
|
||||
ingressEP := getIngressNwEndpoint(epList)
|
||||
if ingressEP != nil {
|
||||
|
|
|
@ -278,9 +278,11 @@ func (c *Controller) sandboxCleanup(activeSandboxes map[string]interface{}) erro
|
|||
}
|
||||
|
||||
for _, ep := range sb.endpoints {
|
||||
// Watch for service records
|
||||
if !c.isAgent() {
|
||||
c.watchSvcRecord(ep)
|
||||
n := ep.getNetwork()
|
||||
if !c.isSwarmNode() || n.Scope() != scope.Swarm || !n.driverIsMultihost() {
|
||||
n.updateSvcRecord(ep, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/containerd/log"
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
"github.com/docker/docker/libnetwork/scope"
|
||||
)
|
||||
|
||||
func (c *Controller) initStores() error {
|
||||
|
@ -20,7 +19,6 @@ func (c *Controller) initStores() error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.startWatch()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -185,92 +183,6 @@ retry:
|
|||
return nil
|
||||
}
|
||||
|
||||
type netWatch struct {
|
||||
localEps map[string]*Endpoint
|
||||
}
|
||||
|
||||
func (c *Controller) watchSvcRecord(ep *Endpoint) {
|
||||
c.watchCh <- ep
|
||||
}
|
||||
|
||||
func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
|
||||
c.unWatchCh <- ep
|
||||
}
|
||||
|
||||
func (c *Controller) processEndpointCreate(ep *Endpoint) {
|
||||
n := ep.getNetwork()
|
||||
if !c.isDistributedControl() && n.Scope() == scope.Swarm && n.driverIsMultihost() {
|
||||
return
|
||||
}
|
||||
|
||||
networkID := n.ID()
|
||||
endpointID := ep.ID()
|
||||
|
||||
// Update the svc db for the local endpoint join right away
|
||||
// Do this before adding this ep to localEps so that we don't
|
||||
// try to update this ep's container's svc records
|
||||
n.updateSvcRecord(ep, true)
|
||||
c.mu.Lock()
|
||||
_, ok := c.nmap[networkID]
|
||||
if !ok {
|
||||
c.nmap[networkID] = &netWatch{localEps: make(map[string]*Endpoint)}
|
||||
}
|
||||
c.nmap[networkID].localEps[endpointID] = ep
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Controller) processEndpointDelete(ep *Endpoint) {
|
||||
n := ep.getNetwork()
|
||||
if !c.isDistributedControl() && n.Scope() == scope.Swarm && n.driverIsMultihost() {
|
||||
return
|
||||
}
|
||||
|
||||
networkID := n.ID()
|
||||
endpointID := ep.ID()
|
||||
|
||||
c.mu.Lock()
|
||||
if nw, ok := c.nmap[networkID]; ok {
|
||||
delete(nw.localEps, endpointID)
|
||||
c.mu.Unlock()
|
||||
|
||||
// Update the svc db about local endpoint leave right away
|
||||
// Do this after we remove this ep from localEps so that we
|
||||
// don't try to remove this svc record from this ep's container.
|
||||
n.updateSvcRecord(ep, false)
|
||||
|
||||
c.mu.Lock()
|
||||
if len(nw.localEps) == 0 {
|
||||
// This is the last container going away for the network. Destroy
|
||||
// this network's svc db entry
|
||||
delete(c.svcRecords, networkID)
|
||||
delete(c.nmap, networkID)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Controller) watchLoop() {
|
||||
for {
|
||||
select {
|
||||
case ep := <-c.watchCh:
|
||||
c.processEndpointCreate(ep)
|
||||
case ep := <-c.unWatchCh:
|
||||
c.processEndpointDelete(ep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) startWatch() {
|
||||
if c.watchCh != nil {
|
||||
return
|
||||
}
|
||||
c.watchCh = make(chan *Endpoint)
|
||||
c.unWatchCh = make(chan *Endpoint)
|
||||
c.nmap = make(map[string]*netWatch)
|
||||
|
||||
go c.watchLoop()
|
||||
}
|
||||
|
||||
func (c *Controller) networkCleanup() {
|
||||
for _, n := range c.getNetworksFromStore() {
|
||||
if n.inDelete {
|
||||
|
|
Loading…
Reference in a new issue