store.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package libnetwork
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/docker/docker/libnetwork/datastore"
  6. "github.com/docker/libkv/store/boltdb"
  7. "github.com/docker/libkv/store/consul"
  8. "github.com/docker/libkv/store/etcd"
  9. "github.com/docker/libkv/store/zookeeper"
  10. "github.com/sirupsen/logrus"
  11. )
  12. func registerKVStores() {
  13. consul.Register()
  14. zookeeper.Register()
  15. etcd.Register()
  16. boltdb.Register()
  17. }
  18. func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
  19. store, err := datastore.NewDataStore(scope, scfg)
  20. if err != nil {
  21. return err
  22. }
  23. c.Lock()
  24. c.stores = append(c.stores, store)
  25. c.Unlock()
  26. return nil
  27. }
  28. func (c *controller) initStores() error {
  29. registerKVStores()
  30. c.Lock()
  31. if c.cfg == nil {
  32. c.Unlock()
  33. return nil
  34. }
  35. scopeConfigs := c.cfg.Scopes
  36. c.stores = nil
  37. c.Unlock()
  38. for scope, scfg := range scopeConfigs {
  39. if err := c.initScopedStore(scope, scfg); err != nil {
  40. return err
  41. }
  42. }
  43. c.startWatch()
  44. return nil
  45. }
  46. func (c *controller) closeStores() {
  47. for _, store := range c.getStores() {
  48. store.Close()
  49. }
  50. }
  51. func (c *controller) getStore(scope string) datastore.DataStore {
  52. c.Lock()
  53. defer c.Unlock()
  54. for _, store := range c.stores {
  55. if store.Scope() == scope {
  56. return store
  57. }
  58. }
  59. return nil
  60. }
  61. func (c *controller) getStores() []datastore.DataStore {
  62. c.Lock()
  63. defer c.Unlock()
  64. return c.stores
  65. }
  66. func (c *controller) getNetworkFromStore(nid string) (*network, error) {
  67. for _, n := range c.getNetworksFromStore() {
  68. if n.id == nid {
  69. return n, nil
  70. }
  71. }
  72. return nil, ErrNoSuchNetwork(nid)
  73. }
  74. func (c *controller) getNetworksForScope(scope string) ([]*network, error) {
  75. var nl []*network
  76. store := c.getStore(scope)
  77. if store == nil {
  78. return nil, nil
  79. }
  80. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
  81. &network{ctrlr: c})
  82. if err != nil && err != datastore.ErrKeyNotFound {
  83. return nil, fmt.Errorf("failed to get networks for scope %s: %v",
  84. scope, err)
  85. }
  86. for _, kvo := range kvol {
  87. n := kvo.(*network)
  88. n.ctrlr = c
  89. ec := &endpointCnt{n: n}
  90. err = store.GetObject(datastore.Key(ec.Key()...), ec)
  91. if err != nil && !n.inDelete {
  92. logrus.Warnf("Could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
  93. continue
  94. }
  95. n.epCnt = ec
  96. if n.scope == "" {
  97. n.scope = scope
  98. }
  99. nl = append(nl, n)
  100. }
  101. return nl, nil
  102. }
  103. func (c *controller) getNetworksFromStore() []*network {
  104. var nl []*network
  105. for _, store := range c.getStores() {
  106. kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), &network{ctrlr: c})
  107. // Continue searching in the next store if no keys found in this store
  108. if err != nil {
  109. if err != datastore.ErrKeyNotFound {
  110. logrus.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
  111. }
  112. continue
  113. }
  114. kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
  115. if err != nil && err != datastore.ErrKeyNotFound {
  116. logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
  117. }
  118. for _, kvo := range kvol {
  119. n := kvo.(*network)
  120. n.Lock()
  121. n.ctrlr = c
  122. ec := &endpointCnt{n: n}
  123. // Trim the leading & trailing "/" to make it consistent across all stores
  124. if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
  125. ec = val.(*endpointCnt)
  126. ec.n = n
  127. n.epCnt = ec
  128. }
  129. if n.scope == "" {
  130. n.scope = store.Scope()
  131. }
  132. n.Unlock()
  133. nl = append(nl, n)
  134. }
  135. }
  136. return nl
  137. }
  138. func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
  139. var errors []string
  140. for _, store := range n.ctrlr.getStores() {
  141. ep := &endpoint{id: eid, network: n}
  142. err := store.GetObject(datastore.Key(ep.Key()...), ep)
  143. // Continue searching in the next store if the key is not found in this store
  144. if err != nil {
  145. if err != datastore.ErrKeyNotFound {
  146. errors = append(errors, fmt.Sprintf("{%s:%v}, ", store.Scope(), err))
  147. logrus.Debugf("could not find endpoint %s in %s: %v", eid, store.Scope(), err)
  148. }
  149. continue
  150. }
  151. return ep, nil
  152. }
  153. return nil, fmt.Errorf("could not find endpoint %s: %v", eid, errors)
  154. }
  155. func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
  156. var epl []*endpoint
  157. tmp := endpoint{network: n}
  158. for _, store := range n.getController().getStores() {
  159. kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
  160. // Continue searching in the next store if no keys found in this store
  161. if err != nil {
  162. if err != datastore.ErrKeyNotFound {
  163. logrus.Debugf("failed to get endpoints for network %s scope %s: %v",
  164. n.Name(), store.Scope(), err)
  165. }
  166. continue
  167. }
  168. for _, kvo := range kvol {
  169. ep := kvo.(*endpoint)
  170. epl = append(epl, ep)
  171. }
  172. }
  173. return epl, nil
  174. }
  175. func (c *controller) updateToStore(kvObject datastore.KVObject) error {
  176. cs := c.getStore(kvObject.DataScope())
  177. if cs == nil {
  178. return ErrDataStoreNotInitialized(kvObject.DataScope())
  179. }
  180. if err := cs.PutObjectAtomic(kvObject); err != nil {
  181. if err == datastore.ErrKeyModified {
  182. return err
  183. }
  184. return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
  185. }
  186. return nil
  187. }
  188. func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
  189. cs := c.getStore(kvObject.DataScope())
  190. if cs == nil {
  191. return ErrDataStoreNotInitialized(kvObject.DataScope())
  192. }
  193. retry:
  194. if err := cs.DeleteObjectAtomic(kvObject); err != nil {
  195. if err == datastore.ErrKeyModified {
  196. if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
  197. return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
  198. }
  199. logrus.Warnf("Error (%v) deleting object %v, retrying....", err, kvObject.Key())
  200. goto retry
  201. }
  202. return err
  203. }
  204. return nil
  205. }
  206. type netWatch struct {
  207. localEps map[string]*endpoint
  208. remoteEps map[string]*endpoint
  209. stopCh chan struct{}
  210. }
  211. func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
  212. c.Lock()
  213. defer c.Unlock()
  214. var epl []*endpoint
  215. for _, ep := range nw.localEps {
  216. epl = append(epl, ep)
  217. }
  218. return epl
  219. }
  220. func (c *controller) watchSvcRecord(ep *endpoint) {
  221. c.watchCh <- ep
  222. }
  223. func (c *controller) unWatchSvcRecord(ep *endpoint) {
  224. c.unWatchCh <- ep
  225. }
  226. func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
  227. for {
  228. select {
  229. case <-nw.stopCh:
  230. return
  231. case o := <-ecCh:
  232. ec := o.(*endpointCnt)
  233. epl, err := ec.n.getEndpointsFromStore()
  234. if err != nil {
  235. break
  236. }
  237. c.Lock()
  238. var addEp []*endpoint
  239. delEpMap := make(map[string]*endpoint)
  240. renameEpMap := make(map[string]bool)
  241. for k, v := range nw.remoteEps {
  242. delEpMap[k] = v
  243. }
  244. for _, lEp := range epl {
  245. if _, ok := nw.localEps[lEp.ID()]; ok {
  246. continue
  247. }
  248. if ep, ok := nw.remoteEps[lEp.ID()]; ok {
  249. // On a container rename EP ID will remain
  250. // the same but the name will change. service
  251. // records should reflect the change.
  252. // Keep old EP entry in the delEpMap and add
  253. // EP from the store (which has the new name)
  254. // into the new list
  255. if lEp.name == ep.name {
  256. delete(delEpMap, lEp.ID())
  257. continue
  258. }
  259. renameEpMap[lEp.ID()] = true
  260. }
  261. nw.remoteEps[lEp.ID()] = lEp
  262. addEp = append(addEp, lEp)
  263. }
  264. // EPs whose name are to be deleted from the svc records
  265. // should also be removed from nw's remote EP list, except
  266. // the ones that are getting renamed.
  267. for _, lEp := range delEpMap {
  268. if !renameEpMap[lEp.ID()] {
  269. delete(nw.remoteEps, lEp.ID())
  270. }
  271. }
  272. c.Unlock()
  273. for _, lEp := range delEpMap {
  274. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
  275. }
  276. for _, lEp := range addEp {
  277. ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
  278. }
  279. }
  280. }
  281. }
  282. func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
  283. n := ep.getNetwork()
  284. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  285. return
  286. }
  287. networkID := n.ID()
  288. endpointID := ep.ID()
  289. c.Lock()
  290. nw, ok := nmap[networkID]
  291. c.Unlock()
  292. if ok {
  293. // Update the svc db for the local endpoint join right away
  294. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  295. c.Lock()
  296. nw.localEps[endpointID] = ep
  297. // If we had learned that from the kv store remove it
  298. // from remote ep list now that we know that this is
  299. // indeed a local endpoint
  300. delete(nw.remoteEps, endpointID)
  301. c.Unlock()
  302. return
  303. }
  304. nw = &netWatch{
  305. localEps: make(map[string]*endpoint),
  306. remoteEps: make(map[string]*endpoint),
  307. }
  308. // Update the svc db for the local endpoint join right away
  309. // Do this before adding this ep to localEps so that we don't
  310. // try to update this ep's container's svc records
  311. n.updateSvcRecord(ep, c.getLocalEps(nw), true)
  312. c.Lock()
  313. nw.localEps[endpointID] = ep
  314. nmap[networkID] = nw
  315. nw.stopCh = make(chan struct{})
  316. c.Unlock()
  317. store := c.getStore(n.DataScope())
  318. if store == nil {
  319. return
  320. }
  321. if !store.Watchable() {
  322. return
  323. }
  324. ch, err := store.Watch(n.getEpCnt(), nw.stopCh)
  325. if err != nil {
  326. logrus.Warnf("Error creating watch for network: %v", err)
  327. return
  328. }
  329. go c.networkWatchLoop(nw, ep, ch)
  330. }
  331. func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
  332. n := ep.getNetwork()
  333. if !c.isDistributedControl() && n.Scope() == datastore.SwarmScope && n.driverIsMultihost() {
  334. return
  335. }
  336. networkID := n.ID()
  337. endpointID := ep.ID()
  338. c.Lock()
  339. nw, ok := nmap[networkID]
  340. if ok {
  341. delete(nw.localEps, endpointID)
  342. c.Unlock()
  343. // Update the svc db about local endpoint leave right away
  344. // Do this after we remove this ep from localEps so that we
  345. // don't try to remove this svc record from this ep's container.
  346. n.updateSvcRecord(ep, c.getLocalEps(nw), false)
  347. c.Lock()
  348. if len(nw.localEps) == 0 {
  349. close(nw.stopCh)
  350. // This is the last container going away for the network. Destroy
  351. // this network's svc db entry
  352. delete(c.svcRecords, networkID)
  353. delete(nmap, networkID)
  354. }
  355. }
  356. c.Unlock()
  357. }
  358. func (c *controller) watchLoop() {
  359. for {
  360. select {
  361. case ep := <-c.watchCh:
  362. c.processEndpointCreate(c.nmap, ep)
  363. case ep := <-c.unWatchCh:
  364. c.processEndpointDelete(c.nmap, ep)
  365. }
  366. }
  367. }
  368. func (c *controller) startWatch() {
  369. if c.watchCh != nil {
  370. return
  371. }
  372. c.watchCh = make(chan *endpoint)
  373. c.unWatchCh = make(chan *endpoint)
  374. c.nmap = make(map[string]*netWatch)
  375. go c.watchLoop()
  376. }
  377. func (c *controller) networkCleanup() {
  378. for _, n := range c.getNetworksFromStore() {
  379. if n.inDelete {
  380. logrus.Infof("Removing stale network %s (%s)", n.Name(), n.ID())
  381. if err := n.delete(true, true); err != nil {
  382. logrus.Debugf("Error while removing stale network: %v", err)
  383. }
  384. }
  385. }
  386. }
  387. var populateSpecial NetworkWalker = func(nw Network) bool {
  388. if n := nw.(*network); n.hasSpecialDriver() && !n.ConfigOnly() {
  389. if err := n.getController().addNetwork(n); err != nil {
  390. logrus.Warnf("Failed to populate network %q with driver %q", nw.Name(), nw.Type())
  391. }
  392. }
  393. return false
  394. }