store.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package libnetwork
  2. import (
  3. "fmt"
  4. log "github.com/Sirupsen/logrus"
  5. "github.com/docker/libnetwork/datastore"
  6. )
  7. func (c *controller) initStores() error {
  8. c.Lock()
  9. if c.cfg == nil {
  10. c.Unlock()
  11. return nil
  12. }
  13. scopeConfigs := c.cfg.Scopes
  14. c.Unlock()
  15. for scope, scfg := range scopeConfigs {
  16. store, err := datastore.NewDataStore(scope, scfg)
  17. if err != nil {
  18. return err
  19. }
  20. c.Lock()
  21. c.stores = append(c.stores, store)
  22. c.Unlock()
  23. }
  24. c.startWatch()
  25. return nil
  26. }
  27. func (c *controller) closeStores() {
  28. for _, store := range c.getStores() {
  29. store.Close()
  30. }
  31. }
  32. func (c *controller) getStore(scope string) datastore.DataStore {
  33. c.Lock()
  34. defer c.Unlock()
  35. for _, store := range c.stores {
  36. if store.Scope() == scope {
  37. return store
  38. }
  39. }
  40. return nil
  41. }
  42. func (c *controller) getStores() []datastore.DataStore {
  43. c.Lock()
  44. defer c.Unlock()
  45. return c.stores
  46. }
  47. func (c *controller) getNetworkFromStore(nid string) (*network, error) {
  48. for _, store := range c.getStores() {
  49. n := &network{id: nid, ctrlr: c}
  50. err := store.GetObject(datastore.Key(n.Key()...), n)
  51. if err != nil && err != datastore.ErrKeyNotFound {
  52. return nil, fmt.Errorf("could not find network %s: %v", nid, err)
  53. }
  54. // Continue searching in the next store if the key is not found in this store
  55. if err == datastore.ErrKeyNotFound {
  56. continue
  57. }
  58. ec := &endpointCnt{n: n}
  59. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  60. if err != nil {
  61. return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
  62. }
  63. n.epCnt = ec
  64. return n, nil
  65. }
  66. return nil, fmt.Errorf("network %s not found", nid)
  67. }
  68. func (c *controller) getNetworksFromStore() ([]*network, error) {
  69. var nl []*network
  70. for _, store := range c.getStores() {
  71. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  72. &network{ctrlr: c})
  73. if err != nil && err != datastore.ErrKeyNotFound {
  74. return nil, fmt.Errorf("failed to get networks for scope %s: %v",
  75. store.Scope(), err)
  76. }
  77. // Continue searching in the next store if no keys found in this store
  78. if err == datastore.ErrKeyNotFound {
  79. continue
  80. }
  81. for _, kvo := range kvol {
  82. n := kvo.(*network)
  83. n.ctrlr = c
  84. ec := &endpointCnt{n: n}
  85. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  86. if err != nil {
  87. return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  88. }
  89. n.epCnt = ec
  90. nl = append(nl, n)
  91. }
  92. }
  93. return nl, nil
  94. }
  95. func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
  96. for _, store := range n.ctrlr.getStores() {
  97. ep := &endpoint{id: eid, network: n}
  98. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  99. if err != nil && err != datastore.ErrKeyNotFound {
  100. return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err)
  101. }
  102. // Continue searching in the next store if the key is not found in this store
  103. if err == datastore.ErrKeyNotFound {
  104. continue
  105. }
  106. return ep, nil
  107. }
  108. return nil, fmt.Errorf("endpoint %s not found", eid)
  109. }
  110. func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
  111. var epl []*endpoint
  112. tmp := endpoint{network: n}
  113. for _, store := range n.getController().getStores() {
  114. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
  115. if err != nil && err != datastore.ErrKeyNotFound {
  116. return nil,
  117. fmt.Errorf("failed to get endpoints for network %s scope %s: %v",
  118. n.Name(), store.Scope(), err)
  119. }
  120. // Continue searching in the next store if no keys found in this store
  121. if err == datastore.ErrKeyNotFound {
  122. continue
  123. }
  124. for _, kvo := range kvol {
  125. ep := kvo.(*endpoint)
  126. ep.network = n
  127. epl = append(epl, ep)
  128. }
  129. }
  130. return epl, nil
  131. }
  132. func (c *controller) updateToStore(kvObject datastore.KVObject) error {
  133. cs := c.getStore(kvObject.DataScope())
  134. if cs == nil {
  135. log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
  136. return nil
  137. }
  138. if err := cs.PutObjectAtomic(kvObject); err != nil {
  139. if err == datastore.ErrKeyModified {
  140. return err
  141. }
  142. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  143. }
  144. return nil
  145. }
  146. func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
  147. cs := c.getStore(kvObject.DataScope())
  148. if cs == nil {
  149. log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
  150. return nil
  151. }
  152. retry:
  153. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  154. if err == datastore.ErrKeyModified {
  155. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  156. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  157. }
  158. goto retry
  159. }
  160. return err
  161. }
  162. return nil
  163. }
  164. type netWatch struct {
  165. localEps map[string]*endpoint
  166. remoteEps map[string]*endpoint
  167. stopCh chan struct{}
  168. }
  169. func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
  170. c.Lock()
  171. defer c.Unlock()
  172. var epl []*endpoint
  173. for _, ep := range nw.localEps {
  174. epl = append(epl, ep)
  175. }
  176. return epl
  177. }
  178. func (c *controller) watchSvcRecord(ep *endpoint) {
  179. c.watchCh <- ep
  180. }
  181. func (c *controller) unWatchSvcRecord(ep *endpoint) {
  182. c.unWatchCh <- ep
  183. }
  184. func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
  185. for {
  186. select {
  187. case <-nw.stopCh:
  188. return
  189. case o := <-ecCh:
  190. ec := o.(*endpointCnt)
  191. epl, err := ec.n.getEndpointsFromStore()
  192. if err != nil {
  193. break
  194. }
  195. c.Lock()
  196. var addEp []*endpoint
  197. delEpMap := make(map[string]*endpoint)
  198. for k, v := range nw.remoteEps {
  199. delEpMap[k] = v
  200. }
  201. for _, lEp := range epl {
  202. if _, ok := nw.localEps[lEp.ID()]; ok {
  203. continue
  204. }
  205. if _, ok := nw.remoteEps[lEp.ID()]; ok {
  206. delete(delEpMap, lEp.ID())
  207. continue
  208. }
  209. nw.remoteEps[lEp.ID()] = lEp
  210. addEp = append(addEp, lEp)
  211. }
  212. c.Unlock()
  213. for _, lEp := range addEp {
  214. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
  215. }
  216. for _, lEp := range delEpMap {
  217. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
  218. }
  219. }
  220. }
  221. }
  222. func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
  223. c.Lock()
  224. nw, ok := nmap[ep.getNetwork().ID()]
  225. c.Unlock()
  226. if ok {
  227. // Update the svc db for the local endpoint join right away
  228. ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
  229. c.Lock()
  230. nw.localEps[ep.ID()] = ep
  231. c.Unlock()
  232. return
  233. }
  234. nw = &netWatch{
  235. localEps: make(map[string]*endpoint),
  236. remoteEps: make(map[string]*endpoint),
  237. }
  238. // Update the svc db for the local endpoint join right away
  239. // Do this before adding this ep to localEps so that we don't
  240. // try to update this ep's container's svc records
  241. ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
  242. c.Lock()
  243. nw.localEps[ep.ID()] = ep
  244. nmap[ep.getNetwork().ID()] = nw
  245. nw.stopCh = make(chan struct{})
  246. c.Unlock()
  247. store := c.getStore(ep.getNetwork().DataScope())
  248. if store == nil {
  249. return
  250. }
  251. if !store.Watchable() {
  252. return
  253. }
  254. ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh)
  255. if err != nil {
  256. log.Warnf("Error creating watch for network: %v", err)
  257. return
  258. }
  259. go c.networkWatchLoop(nw, ep, ch)
  260. }
  261. func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
  262. c.Lock()
  263. nw, ok := nmap[ep.getNetwork().ID()]
  264. if ok {
  265. delete(nw.localEps, ep.ID())
  266. c.Unlock()
  267. // Update the svc db about local endpoint leave right away
  268. // Do this after we remove this ep from localEps so that we
  269. // don't try to remove this svc record from this ep's container.
  270. ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false)
  271. c.Lock()
  272. if len(nw.localEps) == 0 {
  273. close(nw.stopCh)
  274. delete(nmap, ep.getNetwork().ID())
  275. }
  276. }
  277. c.Unlock()
  278. }
  279. func (c *controller) watchLoop(nmap map[string]*netWatch) {
  280. for {
  281. select {
  282. case ep := <-c.watchCh:
  283. c.processEndpointCreate(nmap, ep)
  284. case ep := <-c.unWatchCh:
  285. c.processEndpointDelete(nmap, ep)
  286. }
  287. }
  288. }
  289. func (c *controller) startWatch() {
  290. c.watchCh = make(chan *endpoint)
  291. c.unWatchCh = make(chan *endpoint)
  292. nmap := make(map[string]*netWatch)
  293. go c.watchLoop(nmap)
  294. }