store.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package libnetwork
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/containerd/containerd/log"
  7. "github.com/docker/docker/libnetwork/datastore"
  8. "github.com/docker/docker/libnetwork/internal/kvstore/boltdb"
  9. )
  10. func registerKVStores() {
  11. boltdb.Register()
  12. }
  13. func (c *Controller) initStores() error {
  14. registerKVStores()
  15. c.mu.Lock()
  16. defer c.mu.Unlock()
  17. if c.cfg == nil {
  18. return nil
  19. }
  20. var err error
  21. c.store, err = datastore.NewDataStore(c.cfg.Scope)
  22. if err != nil {
  23. return err
  24. }
  25. c.startWatch()
  26. return nil
  27. }
  28. func (c *Controller) closeStores() {
  29. if store := c.store; store != nil {
  30. store.Close()
  31. }
  32. }
  33. func (c *Controller) getStore() datastore.DataStore {
  34. c.mu.Lock()
  35. defer c.mu.Unlock()
  36. return c.store
  37. }
  38. func (c *Controller) getNetworkFromStore(nid string) (*network, error) {
  39. for _, n := range c.getNetworksFromStore() {
  40. if n.id == nid {
  41. return n, nil
  42. }
  43. }
  44. return nil, ErrNoSuchNetwork(nid)
  45. }
  46. func (c *Controller) getNetworks() ([]*network, error) {
  47. var nl []*network
  48. store := c.getStore()
  49. if store == nil {
  50. return nil, nil
  51. }
  52. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  53. &network{ctrlr: c})
  54. if err != nil && err != datastore.ErrKeyNotFound {
  55. return nil, fmt.Errorf("failed to get networks: %w", err)
  56. }
  57. for _, kvo := range kvol {
  58. n := kvo.(*network)
  59. n.ctrlr = c
  60. ec := &endpointCnt{n: n}
  61. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  62. if err != nil && !n.inDelete {
  63. log.G(context.TODO()).Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  64. continue
  65. }
  66. n.epCnt = ec
  67. if n.scope == "" {
  68. n.scope = store.Scope()
  69. }
  70. nl = append(nl, n)
  71. }
  72. return nl, nil
  73. }
  74. func (c *Controller) getNetworksFromStore() []*network { // FIXME: unify with c.getNetworks()
  75. var nl []*network
  76. store := c.getStore()
  77. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c})
  78. if err != nil {
  79. if err != datastore.ErrKeyNotFound {
  80. log.G(context.TODO()).Debugf("failed to get networks from store: %v", err)
  81. }
  82. return nil
  83. }
  84. kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
  85. if err != nil && err != datastore.ErrKeyNotFound {
  86. log.G(context.TODO()).Warnf("failed to get endpoint_count map from store: %v", err)
  87. }
  88. for _, kvo := range kvol {
  89. n := kvo.(*network)
  90. n.mu.Lock()
  91. n.ctrlr = c
  92. ec := &endpointCnt{n: n}
  93. // Trim the leading & trailing "/" to make it consistent across all stores
  94. if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
  95. ec = val.(*endpointCnt)
  96. ec.n = n
  97. n.epCnt = ec
  98. }
  99. if n.scope == "" {
  100. n.scope = store.Scope()
  101. }
  102. n.mu.Unlock()
  103. nl = append(nl, n)
  104. }
  105. return nl
  106. }
  107. func (n *network) getEndpointFromStore(eid string) (*Endpoint, error) {
  108. store := n.ctrlr.getStore()
  109. ep := &Endpoint{id: eid, network: n}
  110. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  111. if err != nil {
  112. return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
  113. }
  114. return ep, nil
  115. }
  116. func (n *network) getEndpointsFromStore() ([]*Endpoint, error) {
  117. var epl []*Endpoint
  118. tmp := Endpoint{network: n}
  119. store := n.getController().getStore()
  120. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
  121. if err != nil {
  122. if err != datastore.ErrKeyNotFound {
  123. return nil, fmt.Errorf("failed to get endpoints for network %s scope %s: %w",
  124. n.Name(), store.Scope(), err)
  125. }
  126. return nil, nil
  127. }
  128. for _, kvo := range kvol {
  129. ep := kvo.(*Endpoint)
  130. epl = append(epl, ep)
  131. }
  132. return epl, nil
  133. }
  134. func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
  135. cs := c.getStore()
  136. if cs == nil {
  137. return ErrDataStoreNotInitialized
  138. }
  139. if err := cs.PutObjectAtomic(kvObject); err != nil {
  140. if err == datastore.ErrKeyModified {
  141. return err
  142. }
  143. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  144. }
  145. return nil
  146. }
  147. func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error {
  148. cs := c.getStore()
  149. if cs == nil {
  150. return ErrDataStoreNotInitialized
  151. }
  152. retry:
  153. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  154. if err == datastore.ErrKeyModified {
  155. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  156. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  157. }
  158. log.G(context.TODO()).Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
  159. goto retry
  160. }
  161. return err
  162. }
  163. return nil
  164. }
  165. type netWatch struct {
  166. localEps map[string]*Endpoint
  167. remoteEps map[string]*Endpoint
  168. }
  169. func (c *Controller) getLocalEps(nw *netWatch) []*Endpoint {
  170. c.mu.Lock()
  171. defer c.mu.Unlock()
  172. var epl []*Endpoint
  173. for _, ep := range nw.localEps {
  174. epl = append(epl, ep)
  175. }
  176. return epl
  177. }
  178. func (c *Controller) watchSvcRecord(ep *Endpoint) {
  179. c.watchCh <- ep
  180. }
  181. func (c *Controller) unWatchSvcRecord(ep *Endpoint) {
  182. c.unWatchCh <- ep
  183. }
  184. func (c *Controller) processEndpointCreate(nmap map[string]*netWatch, ep *Endpoint) {
  185. n := ep.getNetwork()
  186. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  187. return
  188. }
  189. networkID := n.ID()
  190. endpointID := ep.ID()
  191. c.mu.Lock()
  192. nw, ok := nmap[networkID]
  193. c.mu.Unlock()
  194. if ok {
  195. // Update the svc db for the local endpoint join right away
  196. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  197. c.mu.Lock()
  198. nw.localEps[endpointID] = ep
  199. // If we had learned that from the kv store remove it
  200. // from remote ep list now that we know that this is
  201. // indeed a local endpoint
  202. delete(nw.remoteEps, endpointID)
  203. c.mu.Unlock()
  204. return
  205. }
  206. nw = &netWatch{
  207. localEps: make(map[string]*Endpoint),
  208. remoteEps: make(map[string]*Endpoint),
  209. }
  210. // Update the svc db for the local endpoint join right away
  211. // Do this before adding this ep to localEps so that we don't
  212. // try to update this ep's container's svc records
  213. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  214. c.mu.Lock()
  215. nw.localEps[endpointID] = ep
  216. nmap[networkID] = nw
  217. c.mu.Unlock()
  218. }
  219. func (c *Controller) processEndpointDelete(nmap map[string]*netWatch, ep *Endpoint) {
  220. n := ep.getNetwork()
  221. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  222. return
  223. }
  224. networkID := n.ID()
  225. endpointID := ep.ID()
  226. c.mu.Lock()
  227. nw, ok := nmap[networkID]
  228. if ok {
  229. delete(nw.localEps, endpointID)
  230. c.mu.Unlock()
  231. // Update the svc db about local endpoint leave right away
  232. // Do this after we remove this ep from localEps so that we
  233. // don't try to remove this svc record from this ep's container.
  234. n.updateSvcRecord(ep, c.getLocalEps(nw), false)
  235. c.mu.Lock()
  236. if len(nw.localEps) == 0 {
  237. // This is the last container going away for the network. Destroy
  238. // this network's svc db entry
  239. delete(c.svcRecords, networkID)
  240. delete(nmap, networkID)
  241. }
  242. }
  243. c.mu.Unlock()
  244. }
  245. func (c *Controller) watchLoop() {
  246. for {
  247. select {
  248. case ep := <-c.watchCh:
  249. c.processEndpointCreate(c.nmap, ep)
  250. case ep := <-c.unWatchCh:
  251. c.processEndpointDelete(c.nmap, ep)
  252. }
  253. }
  254. }
  255. func (c *Controller) startWatch() {
  256. if c.watchCh != nil {
  257. return
  258. }
  259. c.watchCh = make(chan *Endpoint)
  260. c.unWatchCh = make(chan *Endpoint)
  261. c.nmap = make(map[string]*netWatch)
  262. go c.watchLoop()
  263. }
  264. func (c *Controller) networkCleanup() {
  265. for _, n := range c.getNetworksFromStore() {
  266. if n.inDelete {
  267. log.G(context.TODO()).Infof("Removing stale network %s (%s)", n.Name(), n.ID())
  268. if err := n.delete(true, true); err != nil {
  269. log.G(context.TODO()).Debugf("Error while removing stale network: %v", err)
  270. }
  271. }
  272. }
  273. }
  274. var populateSpecial NetworkWalker = func(nw Network) bool {
  275. if n := nw.(*network); n.hasSpecialDriver() && !n.ConfigOnly() {
  276. if err := n.getController().addNetwork(n); err != nil {
  277. log.G(context.TODO()).Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
  278. }
  279. }
  280. return false
  281. }