store.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package libnetwork
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/containerd/containerd/log"
  7. "github.com/docker/docker/libnetwork/datastore"
  8. )
  9. func (c *Controller) initStores() error {
  10. c.mu.Lock()
  11. defer c.mu.Unlock()
  12. if c.cfg == nil {
  13. return nil
  14. }
  15. var err error
  16. c.store, err = datastore.New(c.cfg.Scope)
  17. if err != nil {
  18. return err
  19. }
  20. c.startWatch()
  21. return nil
  22. }
  23. func (c *Controller) closeStores() {
  24. if store := c.store; store != nil {
  25. store.Close()
  26. }
  27. }
  28. func (c *Controller) getStore() *datastore.Store {
  29. c.mu.Lock()
  30. defer c.mu.Unlock()
  31. return c.store
  32. }
  33. func (c *Controller) getNetworkFromStore(nid string) (*Network, error) {
  34. for _, n := range c.getNetworksFromStore() {
  35. if n.id == nid {
  36. return n, nil
  37. }
  38. }
  39. return nil, ErrNoSuchNetwork(nid)
  40. }
  41. func (c *Controller) getNetworks() ([]*Network, error) {
  42. var nl []*Network
  43. store := c.getStore()
  44. if store == nil {
  45. return nil, nil
  46. }
  47. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  48. &Network{ctrlr: c})
  49. if err != nil && err != datastore.ErrKeyNotFound {
  50. return nil, fmt.Errorf("failed to get networks: %w", err)
  51. }
  52. for _, kvo := range kvol {
  53. n := kvo.(*Network)
  54. n.ctrlr = c
  55. ec := &endpointCnt{n: n}
  56. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  57. if err != nil && !n.inDelete {
  58. log.G(context.TODO()).Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  59. continue
  60. }
  61. n.epCnt = ec
  62. if n.scope == "" {
  63. n.scope = store.Scope()
  64. }
  65. nl = append(nl, n)
  66. }
  67. return nl, nil
  68. }
  69. func (c *Controller) getNetworksFromStore() []*Network { // FIXME: unify with c.getNetworks()
  70. var nl []*Network
  71. store := c.getStore()
  72. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &Network{ctrlr: c})
  73. if err != nil {
  74. if err != datastore.ErrKeyNotFound {
  75. log.G(context.TODO()).Debugf("failed to get networks from store: %v", err)
  76. }
  77. return nil
  78. }
  79. kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
  80. if err != nil && err != datastore.ErrKeyNotFound {
  81. log.G(context.TODO()).Warnf("failed to get endpoint_count map from store: %v", err)
  82. }
  83. for _, kvo := range kvol {
  84. n := kvo.(*Network)
  85. n.mu.Lock()
  86. n.ctrlr = c
  87. ec := &endpointCnt{n: n}
  88. // Trim the leading & trailing "/" to make it consistent across all stores
  89. if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
  90. ec = val.(*endpointCnt)
  91. ec.n = n
  92. n.epCnt = ec
  93. }
  94. if n.scope == "" {
  95. n.scope = store.Scope()
  96. }
  97. n.mu.Unlock()
  98. nl = append(nl, n)
  99. }
  100. return nl
  101. }
  102. func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) {
  103. store := n.ctrlr.getStore()
  104. ep := &Endpoint{id: eid, network: n}
  105. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  106. if err != nil {
  107. return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
  108. }
  109. return ep, nil
  110. }
  111. func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
  112. var epl []*Endpoint
  113. tmp := Endpoint{network: n}
  114. store := n.getController().getStore()
  115. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
  116. if err != nil {
  117. if err != datastore.ErrKeyNotFound {
  118. return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w",
  119. n.Name(), store.Scope(), err)
  120. }
  121. return nil, nil
  122. }
  123. for _, kvo := range kvol {
  124. ep := kvo.(*Endpoint)
  125. epl = append(epl, ep)
  126. }
  127. return epl, nil
  128. }
  129. func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
  130. cs := c.getStore()
  131. if cs == nil {
  132. return ErrDataStoreNotInitialized
  133. }
  134. if err := cs.PutObjectAtomic(kvObject); err != nil {
  135. if err == datastore.ErrKeyModified {
  136. return err
  137. }
  138. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  139. }
  140. return nil
  141. }
  142. func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error {
  143. cs := c.getStore()
  144. if cs == nil {
  145. return ErrDataStoreNotInitialized
  146. }
  147. retry:
  148. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  149. if err == datastore.ErrKeyModified {
  150. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  151. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  152. }
  153. log.G(context.TODO()).Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
  154. goto retry
  155. }
  156. return err
  157. }
  158. return nil
  159. }
  160. type netWatch struct {
  161. localEps map[string]*Endpoint
  162. remoteEps map[string]*Endpoint
  163. }
  164. func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint {
  165. c.mu.Lock()
  166. defer c.mu.Unlock()
  167. var epl []*Endpoint
  168. for _, ep := range nw.localEps {
  169. epl = append(epl, ep)
  170. }
  171. return epl
  172. }
  173. func (c *Controller) watchSvcRecord(ep *Endpoint) {
  174. c.watchCh <- ep
  175. }
  176. func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
  177. c.unWatchCh <- ep
  178. }
  179. func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) {
  180. n := ep.getNetwork()
  181. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  182. return
  183. }
  184. networkID := n.ID()
  185. endpointID := ep.ID()
  186. c.mu.Lock()
  187. nw, ok := nmap[networkID]
  188. c.mu.Unlock()
  189. if ok {
  190. // Update the svc db for the local endpoint join right away
  191. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  192. c.mu.Lock()
  193. nw.localEps[endpointID] = ep
  194. // If we had learned that from the kv store remove it
  195. // from remote ep list now that we know that this is
  196. // indeed a local endpoint
  197. delete(nw.remoteEps, endpointID)
  198. c.mu.Unlock()
  199. return
  200. }
  201. nw = &netWatch{
  202. localEps: make(map[string]*Endpoint),
  203. remoteEps: make(map[string]*Endpoint),
  204. }
  205. // Update the svc db for the local endpoint join right away
  206. // Do this before adding this ep to localEps so that we don't
  207. // try to update this ep's container's svc records
  208. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  209. c.mu.Lock()
  210. nw.localEps[endpointID] = ep
  211. nmap[networkID] = nw
  212. c.mu.Unlock()
  213. }
  214. func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) {
  215. n := ep.getNetwork()
  216. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  217. return
  218. }
  219. networkID := n.ID()
  220. endpointID := ep.ID()
  221. c.mu.Lock()
  222. nw, ok := nmap[networkID]
  223. if ok {
  224. delete(nw.localEps, endpointID)
  225. c.mu.Unlock()
  226. // Update the svc db about local endpoint leave right away
  227. // Do this after we remove this ep from localEps so that we
  228. // don't try to remove this svc record from this ep's container.
  229. n.updateSvcRecord(ep, c.getLocalEps(nw), false)
  230. c.mu.Lock()
  231. if len(nw.localEps) == 0 {
  232. // This is the last container going away for the network. Destroy
  233. // this network's svc db entry
  234. delete(c.svcRecords, networkID)
  235. delete(nmap, networkID)
  236. }
  237. }
  238. c.mu.Unlock()
  239. }
  240. func (c *Controller) watchLoop() {
  241. for {
  242. select {
  243. case ep := <-c.watchCh:
  244. c.processEndpointCreate(c.nmap, ep)
  245. case ep := <-c.unWatchCh:
  246. c.processEndpointDelete(c.nmap, ep)
  247. }
  248. }
  249. }
  250. func (c *Controller) startWatch() {
  251. if c.watchCh != nil {
  252. return
  253. }
  254. c.watchCh = make(chan *Endpoint)
  255. c.unWatchCh = make(chan *Endpoint)
  256. c.nmap = make(map[string]*netWatch)
  257. go c.watchLoop()
  258. }
  259. func (c *Controller) networkCleanup() {
  260. for _, n := range c.getNetworksFromStore() {
  261. if n.inDelete {
  262. log.G(context.TODO()).Infof("Removing stale network %s (%s)", n.Name(), n.ID())
  263. if err := n.delete(true, true); err != nil {
  264. log.G(context.TODO()).Debugf("Error while removing stale network: %v", err)
  265. }
  266. }
  267. }
  268. }
  269. var populateSpecial NetworkWalker = func(nw *Network) bool {
  270. if n := nw; n.hasSpecialDriver() && !n.ConfigOnly() {
  271. if err := n.getController().addNetwork(n); err != nil {
  272. log.G(context.TODO()).Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
  273. }
  274. }
  275. return false
  276. }