store.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. package libnetwork
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/libkv/store/boltdb"
  7. "github.com/docker/libkv/store/consul"
  8. "github.com/docker/libkv/store/etcd"
  9. "github.com/docker/libkv/store/zookeeper"
  10. "github.com/docker/libnetwork/datastore"
  11. )
  12. func registerKVStores() {
  13. consul.Register()
  14. zookeeper.Register()
  15. etcd.Register()
  16. boltdb.Register()
  17. }
  18. func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
  19. store, err := datastore.NewDataStore(scope, scfg)
  20. if err != nil {
  21. return err
  22. }
  23. c.Lock()
  24. c.stores = append(c.stores, store)
  25. c.Unlock()
  26. return nil
  27. }
  28. func (c *controller) initStores() error {
  29. registerKVStores()
  30. c.Lock()
  31. if c.cfg == nil {
  32. c.Unlock()
  33. return nil
  34. }
  35. scopeConfigs := c.cfg.Scopes
  36. c.stores = nil
  37. c.Unlock()
  38. for scope, scfg := range scopeConfigs {
  39. if err := c.initScopedStore(scope, scfg); err != nil {
  40. return err
  41. }
  42. }
  43. c.startWatch()
  44. return nil
  45. }
  46. func (c *controller) closeStores() {
  47. for _, store := range c.getStores() {
  48. store.Close()
  49. }
  50. }
  51. func (c *controller) getStore(scope string) datastore.DataStore {
  52. c.Lock()
  53. defer c.Unlock()
  54. for _, store := range c.stores {
  55. if store.Scope() == scope {
  56. return store
  57. }
  58. }
  59. return nil
  60. }
  61. func (c *controller) getStores() []datastore.DataStore {
  62. c.Lock()
  63. defer c.Unlock()
  64. return c.stores
  65. }
  66. func (c *controller) getNetworkFromStore(nid string) (*network, error) {
  67. for _, store := range c.getStores() {
  68. n := &network{id: nid, ctrlr: c}
  69. err := store.GetObject(datastore.Key(n.Key()...), n)
  70. // Continue searching in the next store if the key is not found in this store
  71. if err != nil {
  72. if err != datastore.ErrKeyNotFound {
  73. logrus.Debugf("could not find network %s: %v", nid, err)
  74. }
  75. continue
  76. }
  77. ec := &endpointCnt{n: n}
  78. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  79. if err != nil && !n.inDelete {
  80. return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
  81. }
  82. n.epCnt = ec
  83. if n.scope == "" {
  84. n.scope = store.Scope()
  85. }
  86. return n, nil
  87. }
  88. return nil, fmt.Errorf("network %s not found", nid)
  89. }
  90. func (c *controller) getNetworksForScope(scope string) ([]*network, error) {
  91. var nl []*network
  92. store := c.getStore(scope)
  93. if store == nil {
  94. return nil, nil
  95. }
  96. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  97. &network{ctrlr: c})
  98. if err != nil && err != datastore.ErrKeyNotFound {
  99. return nil, fmt.Errorf("failed to get networks for scope %s: %v",
  100. scope, err)
  101. }
  102. for _, kvo := range kvol {
  103. n := kvo.(*network)
  104. n.ctrlr = c
  105. ec := &endpointCnt{n: n}
  106. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  107. if err != nil && !n.inDelete {
  108. logrus.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  109. continue
  110. }
  111. n.epCnt = ec
  112. if n.scope == "" {
  113. n.scope = scope
  114. }
  115. nl = append(nl, n)
  116. }
  117. return nl, nil
  118. }
  119. func (c *controller) getNetworksFromStore() ([]*network, error) {
  120. var nl []*network
  121. for _, store := range c.getStores() {
  122. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  123. &network{ctrlr: c})
  124. // Continue searching in the next store if no keys found in this store
  125. if err != nil {
  126. if err != datastore.ErrKeyNotFound {
  127. logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
  128. }
  129. continue
  130. }
  131. kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
  132. if err != nil {
  133. if err != datastore.ErrKeyNotFound {
  134. logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
  135. }
  136. }
  137. for _, kvo := range kvol {
  138. n := kvo.(*network)
  139. n.Lock()
  140. n.ctrlr = c
  141. ec := &endpointCnt{n: n}
  142. // Trim the leading & trailing "/" to make it consistent across all stores
  143. if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
  144. ec = val.(*endpointCnt)
  145. ec.n = n
  146. n.epCnt = ec
  147. }
  148. if n.scope == "" {
  149. n.scope = store.Scope()
  150. }
  151. n.Unlock()
  152. nl = append(nl, n)
  153. }
  154. }
  155. return nl, nil
  156. }
  157. func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
  158. var errors []string
  159. for _, store := range n.ctrlr.getStores() {
  160. ep := &endpoint{id: eid, network: n}
  161. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  162. // Continue searching in the next store if the key is not found in this store
  163. if err != nil {
  164. if err != datastore.ErrKeyNotFound {
  165. errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err))
  166. logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err)
  167. }
  168. continue
  169. }
  170. return ep, nil
  171. }
  172. return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors)
  173. }
  174. func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
  175. var epl []*endpoint
  176. tmp := endpoint{network: n}
  177. for _, store := range n.getController().getStores() {
  178. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
  179. // Continue searching in the next store if no keys found in this store
  180. if err != nil {
  181. if err != datastore.ErrKeyNotFound {
  182. logrus.Debugf("failed to get endpoints for network %s scope %s: %v",
  183. n.Name(), store.Scope(), err)
  184. }
  185. continue
  186. }
  187. for _, kvo := range kvol {
  188. ep := kvo.(*endpoint)
  189. epl = append(epl, ep)
  190. }
  191. }
  192. return epl, nil
  193. }
  194. func (c *controller) updateToStore(kvObject datastore.KVObject) error {
  195. cs := c.getStore(kvObject.DataScope())
  196. if cs == nil {
  197. return ErrDataStoreNotInitialized(kvObject.DataScope())
  198. }
  199. if err := cs.PutObjectAtomic(kvObject); err != nil {
  200. if err == datastore.ErrKeyModified {
  201. return err
  202. }
  203. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  204. }
  205. return nil
  206. }
  207. func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
  208. cs := c.getStore(kvObject.DataScope())
  209. if cs == nil {
  210. return ErrDataStoreNotInitialized(kvObject.DataScope())
  211. }
  212. retry:
  213. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  214. if err == datastore.ErrKeyModified {
  215. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  216. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  217. }
  218. goto retry
  219. }
  220. return err
  221. }
  222. return nil
  223. }
  224. type netWatch struct {
  225. localEps map[string]*endpoint
  226. remoteEps map[string]*endpoint
  227. stopCh chan struct{}
  228. }
  229. func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
  230. c.Lock()
  231. defer c.Unlock()
  232. var epl []*endpoint
  233. for _, ep := range nw.localEps {
  234. epl = append(epl, ep)
  235. }
  236. return epl
  237. }
  238. func (c *controller) watchSvcRecord(ep *endpoint) {
  239. c.watchCh <- ep
  240. }
  241. func (c *controller) unWatchSvcRecord(ep *endpoint) {
  242. c.unWatchCh <- ep
  243. }
  244. func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
  245. for {
  246. select {
  247. case <-nw.stopCh:
  248. return
  249. case o := <-ecCh:
  250. ec := o.(*endpointCnt)
  251. epl, err := ec.n.getEndpointsFromStore()
  252. if err != nil {
  253. break
  254. }
  255. c.Lock()
  256. var addEp []*endpoint
  257. delEpMap := make(map[string]*endpoint)
  258. renameEpMap := make(map[string]bool)
  259. for k, v := range nw.remoteEps {
  260. delEpMap[k] = v
  261. }
  262. for _, lEp := range epl {
  263. if _, ok := nw.localEps[lEp.ID()]; ok {
  264. continue
  265. }
  266. if ep, ok := nw.remoteEps[lEp.ID()]; ok {
  267. // On a container rename EP ID will remain
  268. // the same but the name will change. service
  269. // records should reflect the change.
  270. // Keep old EP entry in the delEpMap and add
  271. // EP from the store (which has the new name)
  272. // into the new list
  273. if lEp.name == ep.name {
  274. delete(delEpMap, lEp.ID())
  275. continue
  276. }
  277. renameEpMap[lEp.ID()] = true
  278. }
  279. nw.remoteEps[lEp.ID()] = lEp
  280. addEp = append(addEp, lEp)
  281. }
  282. // EPs whose name are to be deleted from the svc records
  283. // should also be removed from nw's remote EP list, except
  284. // the ones that are getting renamed.
  285. for _, lEp := range delEpMap {
  286. if !renameEpMap[lEp.ID()] {
  287. delete(nw.remoteEps, lEp.ID())
  288. }
  289. }
  290. c.Unlock()
  291. for _, lEp := range delEpMap {
  292. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
  293. }
  294. for _, lEp := range addEp {
  295. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
  296. }
  297. }
  298. }
  299. }
  300. func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
  301. n := ep.getNetwork()
  302. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  303. return
  304. }
  305. c.Lock()
  306. nw, ok := nmap[n.ID()]
  307. c.Unlock()
  308. if ok {
  309. // Update the svc db for the local endpoint join right away
  310. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  311. c.Lock()
  312. nw.localEps[ep.ID()] = ep
  313. // If we had learned that from the kv store remove it
  314. // from remote ep list now that we know that this is
  315. // indeed a local endpoint
  316. delete(nw.remoteEps, ep.ID())
  317. c.Unlock()
  318. return
  319. }
  320. nw = &netWatch{
  321. localEps: make(map[string]*endpoint),
  322. remoteEps: make(map[string]*endpoint),
  323. }
  324. // Update the svc db for the local endpoint join right away
  325. // Do this before adding this ep to localEps so that we don't
  326. // try to update this ep's container's svc records
  327. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  328. c.Lock()
  329. nw.localEps[ep.ID()] = ep
  330. nmap[n.ID()] = nw
  331. nw.stopCh = make(chan struct{})
  332. c.Unlock()
  333. store := c.getStore(n.DataScope())
  334. if store == nil {
  335. return
  336. }
  337. if !store.Watchable() {
  338. return
  339. }
  340. ch, err := store.Watch(n.getEpCnt(), nw.stopCh)
  341. if err != nil {
  342. logrus.Warnf("Error creating watch for network: %v", err)
  343. return
  344. }
  345. go c.networkWatchLoop(nw, ep, ch)
  346. }
  347. func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
  348. n := ep.getNetwork()
  349. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  350. return
  351. }
  352. c.Lock()
  353. nw, ok := nmap[n.ID()]
  354. if ok {
  355. delete(nw.localEps, ep.ID())
  356. c.Unlock()
  357. // Update the svc db about local endpoint leave right away
  358. // Do this after we remove this ep from localEps so that we
  359. // don't try to remove this svc record from this ep's container.
  360. n.updateSvcRecord(ep, c.getLocalEps(nw), false)
  361. c.Lock()
  362. if len(nw.localEps) == 0 {
  363. close(nw.stopCh)
  364. // This is the last container going away for the network. Destroy
  365. // this network's svc db entry
  366. delete(c.svcRecords, n.ID())
  367. delete(nmap, n.ID())
  368. }
  369. }
  370. c.Unlock()
  371. }
  372. func (c *controller) watchLoop() {
  373. for {
  374. select {
  375. case ep := <-c.watchCh:
  376. c.processEndpointCreate(c.nmap, ep)
  377. case ep := <-c.unWatchCh:
  378. c.processEndpointDelete(c.nmap, ep)
  379. }
  380. }
  381. }
  382. func (c *controller) startWatch() {
  383. if c.watchCh != nil {
  384. return
  385. }
  386. c.watchCh = make(chan *endpoint)
  387. c.unWatchCh = make(chan *endpoint)
  388. c.nmap = make(map[string]*netWatch)
  389. go c.watchLoop()
  390. }
  391. func (c *controller) networkCleanup() {
  392. networks, err := c.getNetworksFromStore()
  393. if err != nil {
  394. logrus.Warnf("Could not retrieve networks from store(s) during network cleanup: %v", err)
  395. return
  396. }
  397. for _, n := range networks {
  398. if n.inDelete {
  399. logrus.Infof("Removing stale network %s (%s)", n.Name(), n.ID())
  400. if err := n.delete(true); err != nil {
  401. logrus.Debugf("Error while removing stale network: %v", err)
  402. }
  403. }
  404. }
  405. }
  406. var populateSpecial NetworkWalker = func(nw Network) bool {
  407. if n := nw.(*network); n.hasSpecialDriver() && !n.ConfigOnly() {
  408. if err := n.getController().addNetwork(n); err != nil {
  409. logrus.Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
  410. }
  411. }
  412. return false
  413. }