store.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package libnetwork
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/containerd/log"
  7. "github.com/docker/docker/libnetwork/datastore"
  8. "github.com/docker/docker/libnetwork/scope"
  9. )
  10. func (c *Controller) initStores() error {
  11. if c.cfg == nil {
  12. return nil
  13. }
  14. var err error
  15. c.store, err = datastore.New(c.cfg.Scope)
  16. if err != nil {
  17. return err
  18. }
  19. return nil
  20. }
  21. func (c *Controller) closeStores() {
  22. if store := c.store; store != nil {
  23. store.Close()
  24. }
  25. }
  26. func (c *Controller) getStore() *datastore.Store {
  27. return c.store
  28. }
  29. func (c *Controller) getNetworkFromStore(nid string) (*Network, error) {
  30. for _, n := range c.getNetworksFromStore(context.TODO()) {
  31. if n.id == nid {
  32. return n, nil
  33. }
  34. }
  35. return nil, ErrNoSuchNetwork(nid)
  36. }
  37. func (c *Controller) getNetworks() ([]*Network, error) {
  38. var nl []*Network
  39. store := c.getStore()
  40. if store == nil {
  41. return nil, nil
  42. }
  43. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  44. &Network{ctrlr: c})
  45. if err != nil && err != datastore.ErrKeyNotFound {
  46. return nil, fmt.Errorf("failed to get networks: %w", err)
  47. }
  48. for _, kvo := range kvol {
  49. n := kvo.(*Network)
  50. n.ctrlr = c
  51. ec := &endpointCnt{n: n}
  52. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  53. if err != nil && !n.inDelete {
  54. log.G(context.TODO()).Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  55. continue
  56. }
  57. n.epCnt = ec
  58. if n.scope == "" {
  59. n.scope = scope.Local
  60. }
  61. nl = append(nl, n)
  62. }
  63. return nl, nil
  64. }
  65. func (c *Controller) getNetworksFromStore(ctx context.Context) []*Network { // FIXME: unify with c.getNetworks()
  66. var nl []*Network
  67. store := c.getStore()
  68. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &Network{ctrlr: c})
  69. if err != nil {
  70. if err != datastore.ErrKeyNotFound {
  71. log.G(ctx).Debugf("failed to get networks from store: %v", err)
  72. }
  73. return nil
  74. }
  75. kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
  76. if err != nil && err != datastore.ErrKeyNotFound {
  77. log.G(ctx).Warnf("failed to get endpoint_count map from store: %v", err)
  78. }
  79. for _, kvo := range kvol {
  80. n := kvo.(*Network)
  81. n.mu.Lock()
  82. n.ctrlr = c
  83. ec := &endpointCnt{n: n}
  84. // Trim the leading & trailing "/" to make it consistent across all stores
  85. if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
  86. ec = val.(*endpointCnt)
  87. ec.n = n
  88. n.epCnt = ec
  89. }
  90. if n.scope == "" {
  91. n.scope = scope.Local
  92. }
  93. n.mu.Unlock()
  94. nl = append(nl, n)
  95. }
  96. return nl
  97. }
  98. func (n *Network) getEndpointFromStore(eid string) (*Endpoint, error) {
  99. store := n.ctrlr.getStore()
  100. ep := &Endpoint{id: eid, network: n}
  101. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  102. if err != nil {
  103. return nil, fmt.Errorf("could not find endpoint %s: %w", eid, err)
  104. }
  105. return ep, nil
  106. }
  107. func (n *Network) getEndpointsFromStore() ([]*Endpoint, error) {
  108. var epl []*Endpoint
  109. tmp := Endpoint{network: n}
  110. store := n.getController().getStore()
  111. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &Endpoint{network: n})
  112. if err != nil {
  113. if err != datastore.ErrKeyNotFound {
  114. return nil, fmt.Errorf("failed to get endpoints for network %s: %w",
  115. n.Name(), err)
  116. }
  117. return nil, nil
  118. }
  119. for _, kvo := range kvol {
  120. ep := kvo.(*Endpoint)
  121. epl = append(epl, ep)
  122. }
  123. return epl, nil
  124. }
  125. func (c *Controller) updateToStore(kvObject datastore.KVObject) error {
  126. cs := c.getStore()
  127. if cs == nil {
  128. return fmt.Errorf("datastore is not initialized")
  129. }
  130. if err := cs.PutObjectAtomic(kvObject); err != nil {
  131. if err == datastore.ErrKeyModified {
  132. return err
  133. }
  134. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  135. }
  136. return nil
  137. }
  138. func (c *Controller) deleteFromStore(kvObject datastore.KVObject) error {
  139. cs := c.getStore()
  140. if cs == nil {
  141. return fmt.Errorf("datastore is not initialized")
  142. }
  143. retry:
  144. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  145. if err == datastore.ErrKeyModified {
  146. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  147. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  148. }
  149. log.G(context.TODO()).Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
  150. goto retry
  151. }
  152. return err
  153. }
  154. return nil
  155. }
  156. func (c *Controller) networkCleanup() {
  157. for _, n := range c.getNetworksFromStore(context.TODO()) {
  158. if n.inDelete {
  159. log.G(context.TODO()).Infof("Removing stale network %s (%s)", n.Name(), n.ID())
  160. if err := n.delete(true, true); err != nil {
  161. log.G(context.TODO()).Debugf("Error while removing stale network: %v", err)
  162. }
  163. }
  164. }
  165. }