agent.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. package libnetwork
  2. //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
  3. import (
  4. "fmt"
  5. "net"
  6. "os"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/go-events"
  9. "github.com/docker/libnetwork/datastore"
  10. "github.com/docker/libnetwork/discoverapi"
  11. "github.com/docker/libnetwork/driverapi"
  12. "github.com/docker/libnetwork/networkdb"
  13. "github.com/gogo/protobuf/proto"
  14. )
  15. type agent struct {
  16. networkDB *networkdb.NetworkDB
  17. bindAddr string
  18. epTblCancel func()
  19. driverCancelFuncs map[string][]func()
  20. }
  21. func getBindAddr(ifaceName string) (string, error) {
  22. iface, err := net.InterfaceByName(ifaceName)
  23. if err != nil {
  24. return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
  25. }
  26. addrs, err := iface.Addrs()
  27. if err != nil {
  28. return "", fmt.Errorf("failed to get interface addresses: %v", err)
  29. }
  30. for _, a := range addrs {
  31. addr, ok := a.(*net.IPNet)
  32. if !ok {
  33. continue
  34. }
  35. addrIP := addr.IP
  36. if addrIP.IsLinkLocalUnicast() {
  37. continue
  38. }
  39. return addrIP.String(), nil
  40. }
  41. return "", fmt.Errorf("failed to get bind address")
  42. }
  43. func resolveAddr(addrOrInterface string) (string, error) {
  44. // Try and see if this is a valid IP address
  45. if net.ParseIP(addrOrInterface) != nil {
  46. return addrOrInterface, nil
  47. }
  48. // If not a valid IP address, it should be a valid interface
  49. return getBindAddr(addrOrInterface)
  50. }
  51. func (c *controller) agentInit(bindAddrOrInterface string) error {
  52. if !c.cfg.Daemon.IsAgent {
  53. return nil
  54. }
  55. bindAddr, err := resolveAddr(bindAddrOrInterface)
  56. if err != nil {
  57. return err
  58. }
  59. hostname, _ := os.Hostname()
  60. nDB, err := networkdb.New(&networkdb.Config{
  61. BindAddr: bindAddr,
  62. NodeName: hostname,
  63. })
  64. if err != nil {
  65. return err
  66. }
  67. ch, cancel := nDB.Watch("endpoint_table", "", "")
  68. c.agent = &agent{
  69. networkDB: nDB,
  70. bindAddr: bindAddr,
  71. epTblCancel: cancel,
  72. driverCancelFuncs: make(map[string][]func()),
  73. }
  74. go c.handleTableEvents(ch, c.handleEpTableEvent)
  75. return nil
  76. }
  77. func (c *controller) agentJoin(remotes []string) error {
  78. if c.agent == nil {
  79. return nil
  80. }
  81. return c.agent.networkDB.Join(remotes)
  82. }
  83. func (c *controller) agentDriverNotify(d driverapi.Driver) {
  84. if c.agent == nil {
  85. return
  86. }
  87. d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
  88. Address: c.agent.bindAddr,
  89. Self: true,
  90. })
  91. }
  92. func (c *controller) agentClose() {
  93. if c.agent == nil {
  94. return
  95. }
  96. for _, cancelFuncs := range c.agent.driverCancelFuncs {
  97. for _, cancel := range cancelFuncs {
  98. cancel()
  99. }
  100. }
  101. c.agent.epTblCancel()
  102. c.agent.networkDB.Close()
  103. }
  104. func (n *network) isClusterEligible() bool {
  105. if n.driverScope() != datastore.GlobalScope {
  106. return false
  107. }
  108. c := n.getController()
  109. if c.agent == nil {
  110. return false
  111. }
  112. return true
  113. }
  114. func (n *network) joinCluster() error {
  115. if !n.isClusterEligible() {
  116. return nil
  117. }
  118. c := n.getController()
  119. return c.agent.networkDB.JoinNetwork(n.ID())
  120. }
  121. func (n *network) leaveCluster() error {
  122. if !n.isClusterEligible() {
  123. return nil
  124. }
  125. c := n.getController()
  126. return c.agent.networkDB.LeaveNetwork(n.ID())
  127. }
  128. func (ep *endpoint) addToCluster() error {
  129. n := ep.getNetwork()
  130. if !n.isClusterEligible() {
  131. return nil
  132. }
  133. c := n.getController()
  134. if !ep.isAnonymous() && ep.Iface().Address() != nil {
  135. if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
  136. return err
  137. }
  138. buf, err := proto.Marshal(&EndpointRecord{
  139. Name: ep.Name(),
  140. ServiceName: ep.svcName,
  141. ServiceID: ep.svcID,
  142. EndpointIP: ep.Iface().Address().IP.String(),
  143. })
  144. if err != nil {
  145. return err
  146. }
  147. if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
  148. return err
  149. }
  150. }
  151. for _, te := range ep.joinInfo.driverTableEntries {
  152. if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. func (ep *endpoint) deleteFromCluster() error {
  159. n := ep.getNetwork()
  160. if !n.isClusterEligible() {
  161. return nil
  162. }
  163. c := n.getController()
  164. if !ep.isAnonymous() {
  165. if ep.Iface().Address() != nil {
  166. if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
  167. return err
  168. }
  169. }
  170. if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
  171. return err
  172. }
  173. }
  174. if ep.joinInfo == nil {
  175. return nil
  176. }
  177. for _, te := range ep.joinInfo.driverTableEntries {
  178. if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
  179. return err
  180. }
  181. }
  182. return nil
  183. }
  184. func (n *network) addDriverWatches() {
  185. if !n.isClusterEligible() {
  186. return
  187. }
  188. c := n.getController()
  189. for _, tableName := range n.driverTables {
  190. ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
  191. c.Lock()
  192. c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
  193. c.Unlock()
  194. go c.handleTableEvents(ch, n.handleDriverTableEvent)
  195. d, err := n.driver(false)
  196. if err != nil {
  197. logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
  198. return
  199. }
  200. c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
  201. d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
  202. return false
  203. })
  204. }
  205. }
  206. func (n *network) cancelDriverWatches() {
  207. if !n.isClusterEligible() {
  208. return
  209. }
  210. c := n.getController()
  211. c.Lock()
  212. cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
  213. delete(c.agent.driverCancelFuncs, n.ID())
  214. c.Unlock()
  215. for _, cancel := range cancelFuncs {
  216. cancel()
  217. }
  218. }
  219. func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
  220. for {
  221. select {
  222. case ev, ok := <-ch:
  223. if !ok {
  224. return
  225. }
  226. fn(ev)
  227. }
  228. }
  229. }
  230. func (n *network) handleDriverTableEvent(ev events.Event) {
  231. d, err := n.driver(false)
  232. if err != nil {
  233. logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
  234. return
  235. }
  236. var (
  237. etype driverapi.EventType
  238. tname string
  239. key string
  240. value []byte
  241. )
  242. switch event := ev.(type) {
  243. case networkdb.CreateEvent:
  244. tname = event.Table
  245. key = event.Key
  246. value = event.Value
  247. etype = driverapi.Create
  248. case networkdb.DeleteEvent:
  249. tname = event.Table
  250. key = event.Key
  251. value = event.Value
  252. etype = driverapi.Delete
  253. case networkdb.UpdateEvent:
  254. tname = event.Table
  255. key = event.Key
  256. value = event.Value
  257. etype = driverapi.Delete
  258. }
  259. d.EventNotify(etype, n.ID(), tname, key, value)
  260. }
  261. func (c *controller) handleEpTableEvent(ev events.Event) {
  262. var (
  263. nid string
  264. eid string
  265. value []byte
  266. isAdd bool
  267. epRec EndpointRecord
  268. )
  269. switch event := ev.(type) {
  270. case networkdb.CreateEvent:
  271. nid = event.NetworkID
  272. eid = event.Key
  273. value = event.Value
  274. isAdd = true
  275. case networkdb.DeleteEvent:
  276. nid = event.NetworkID
  277. eid = event.Key
  278. value = event.Value
  279. case networkdb.UpdateEvent:
  280. logrus.Errorf("Unexpected update service table event = %#v", event)
  281. }
  282. nw, err := c.NetworkByID(nid)
  283. if err != nil {
  284. logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
  285. return
  286. }
  287. n := nw.(*network)
  288. err = proto.Unmarshal(value, &epRec)
  289. if err != nil {
  290. logrus.Errorf("Failed to unmarshal service table value: %v", err)
  291. return
  292. }
  293. name := epRec.Name
  294. svcName := epRec.ServiceName
  295. svcID := epRec.ServiceID
  296. ip := net.ParseIP(epRec.EndpointIP)
  297. if name == "" || ip == nil {
  298. logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
  299. return
  300. }
  301. if isAdd {
  302. if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
  303. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  304. return
  305. }
  306. n.addSvcRecords(name, ip, nil, true)
  307. } else {
  308. if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
  309. logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
  310. return
  311. }
  312. n.deleteSvcRecords(name, ip, nil, true)
  313. }
  314. }