123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- package libnetwork
- import (
- "context"
- "fmt"
- "strings"
- "github.com/containerd/containerd/log"
- "github.com/docker/docker/libnetwork/datastore"
- )
- func (c *Controller) initStores() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.cfg == nil {
- return nil
- }
- var err error
- c.store, err = datastore.New(c.cfg.Scope)
- if err != nil {
- return err
- }
- c.startWatch()
- return nil
- }
- func (c *Controller) closeStores() {
- if store := c.store; store != nil {
- store.Close()
- }
- }
- func (c *Controller) getStore() *datastore.Store {
- c.mu.Lock()
- defer c.mu.Unlock()
- return c.store
- }
- func (c *Controller) getNetworkFromStore(nid string) (*Network, error) {
- for _, n := range c.getNetworksFromStore() {
- if n.id == nid {
- return n, nil
- }
- }
- return nil, ErrNoSuchNetwork(nid)
- }
- func (c *Controller) getNetworks() ([]*Network, error) {
- var nl []*Network
- store := c.getStore()
- if store == nil {
- return nil, nil
- }
- kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
- &Network{ctrlr: c})
- if err != nil && err != datastore.ErrKeyNotFound {
- return nil, fmt.Errorf("failed to get networks: %w", err)
- }
- for _, kvo := range kvol {
- n := kvo.(*Network)
- n.ctrlr = c
- ec := &endpointCnt{n: n}
- err = store.GetObject(datastore.Key(ec.Key()...), ec)
- if err != nil && !n.inDelete {
- log.G(context.TODO()).Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
- continue
- }
- n.epCnt = ec
- if n.scope == "" {
- n.scope = store.Scope()
- }
- nl = append(nl, n)
- }
- return nl, nil
- }
- func (c *Controller) getNetworksFromStore() []*Network { // FIXME: unify with c.getNetworks()
- var nl []*Network
- store := c.getStore()
- kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &Network{ctrlr: c})
- if err != nil {
- if err != datastore.ErrKeyNotFound {
- log.G(context.TODO()).Debugf("failed to get networks from store: %v", err)
- }
- return nil
- }
- kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
- if err != nil && err != datastore.ErrKeyNotFound {
- log.G(context.TODO()).Warnf("failed to get endpoint_count map from store: %v", err)
- }
- for _, kvo := range kvol {
- n := kvo.(*Network)
- n.mu.Lock()
- n.ctrlr = c
- ec := &endpointCnt{n: n}
- // Trim the leading & trailing "/" to make it consistent across all stores
- if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
- ec = val.(*endpointCnt)
- ec.n = n
- n.epCnt = ec
- }
- if n.scope == "" {
- n.scope = store.Scope()
- }
- n.mu.Unlock()
- nl = append(nl, n)
- }
- return nl
- }
- func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) {
- store := n.ctrlr.getStore()
- ep := &Endpoint{id: eid, network: n}
- err := store.GetObject(datastore.Key(ep.Key()...), ep)
- if err != nil {
- return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
- }
- return ep, nil
- }
- func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
- var epl []*Endpoint
- tmp := Endpoint{network: n}
- store := n.getController().getStore()
- kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
- if err != nil {
- if err != datastore.ErrKeyNotFound {
- return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w",
- n.Name(), store.Scope(), err)
- }
- return nil, nil
- }
- for _, kvo := range kvol {
- ep := kvo.(*Endpoint)
- epl = append(epl, ep)
- }
- return epl, nil
- }
- func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
- cs := c.getStore()
- if cs == nil {
- return ErrDataStoreNotInitialized
- }
- if err := cs.PutObjectAtomic(kvObject); err != nil {
- if err == datastore.ErrKeyModified {
- return err
- }
- return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
- }
- return nil
- }
- func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error {
- cs := c.getStore()
- if cs == nil {
- return ErrDataStoreNotInitialized
- }
- retry:
- if err := cs.DeleteObjectAtomic(kvObject); err != nil {
- if err == datastore.ErrKeyModified {
- if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
- return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
- }
- log.G(context.TODO()).Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
- goto retry
- }
- return err
- }
- return nil
- }
- type netWatch struct {
- localEps map[string]*Endpoint
- remoteEps map[string]*Endpoint
- }
- func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint {
- c.mu.Lock()
- defer c.mu.Unlock()
- var epl []*Endpoint
- for _, ep := range nw.localEps {
- epl = append(epl, ep)
- }
- return epl
- }
- func (c *Controller) watchSvcRecord(ep *Endpoint) {
- c.watchCh <- ep
- }
- func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
- c.unWatchCh <- ep
- }
- func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) {
- n := ep.getNetwork()
- if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
- return
- }
- networkID := n.ID()
- endpointID := ep.ID()
- c.mu.Lock()
- nw, ok := nmap[networkID]
- c.mu.Unlock()
- if ok {
- // Update the svc db for the local endpoint join right away
- n.updateSvcRecord(ep, c.getLocalEps(nw), true)
- c.mu.Lock()
- nw.localEps[endpointID] = ep
- // If we had learned that from the kv store remove it
- // from remote ep list now that we know that this is
- // indeed a local endpoint
- delete(nw.remoteEps, endpointID)
- c.mu.Unlock()
- return
- }
- nw = &netWatch{
- localEps: make(map[string]*Endpoint),
- remoteEps: make(map[string]*Endpoint),
- }
- // Update the svc db for the local endpoint join right away
- // Do this before adding this ep to localEps so that we don't
- // try to update this ep's container's svc records
- n.updateSvcRecord(ep, c.getLocalEps(nw), true)
- c.mu.Lock()
- nw.localEps[endpointID] = ep
- nmap[networkID] = nw
- c.mu.Unlock()
- }
- func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) {
- n := ep.getNetwork()
- if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
- return
- }
- networkID := n.ID()
- endpointID := ep.ID()
- c.mu.Lock()
- nw, ok := nmap[networkID]
- if ok {
- delete(nw.localEps, endpointID)
- c.mu.Unlock()
- // Update the svc db about local endpoint leave right away
- // Do this after we remove this ep from localEps so that we
- // don't try to remove this svc record from this ep's container.
- n.updateSvcRecord(ep, c.getLocalEps(nw), false)
- c.mu.Lock()
- if len(nw.localEps) == 0 {
- // This is the last container going away for the network. Destroy
- // this network's svc db entry
- delete(c.svcRecords, networkID)
- delete(nmap, networkID)
- }
- }
- c.mu.Unlock()
- }
- func (c *Controller) watchLoop() {
- for {
- select {
- case ep := <-c.watchCh:
- c.processEndpointCreate(c.nmap, ep)
- case ep := <-c.unWatchCh:
- c.processEndpointDelete(c.nmap, ep)
- }
- }
- }
- func (c *Controller) startWatch() {
- if c.watchCh != nil {
- return
- }
- c.watchCh = make(chan *Endpoint)
- c.unWatchCh = make(chan *Endpoint)
- c.nmap = make(map[string]*netWatch)
- go c.watchLoop()
- }
- func (c *Controller) networkCleanup() {
- for _, n := range c.getNetworksFromStore() {
- if n.inDelete {
- log.G(context.TODO()).Infof("Removing stale network %s (%s)", n.Name(), n.ID())
- if err := n.delete(true, true); err != nil {
- log.G(context.TODO()).Debugf("Error while removing stale network: %v", err)
- }
- }
- }
- }
- var populateSpecial NetworkWalker = func(nw *Network) bool {
- if n := nw; n.hasSpecialDriver() && !n.ConfigOnly() {
- if err := n.getController().addNetwork(n); err != nil {
- log.G(context.TODO()).Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
- }
- }
- return false
- }
|