overlay.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package overlay
  2. //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
  3. import (
  4. "fmt"
  5. "net"
  6. "sync"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/libnetwork/datastore"
  9. "github.com/docker/libnetwork/discoverapi"
  10. "github.com/docker/libnetwork/driverapi"
  11. "github.com/docker/libnetwork/idm"
  12. "github.com/docker/libnetwork/netlabel"
  13. "github.com/docker/libnetwork/osl"
  14. "github.com/docker/libnetwork/types"
  15. "github.com/hashicorp/serf/serf"
  16. )
  17. const (
  18. networkType = "overlay"
  19. vethPrefix = "veth"
  20. vethLen = 7
  21. vxlanIDStart = 256
  22. vxlanIDEnd = (1 << 24) - 1
  23. vxlanPort = 4789
  24. vxlanEncap = 50
  25. secureOption = "encrypted"
  26. )
  27. var initVxlanIdm = make(chan (bool), 1)
  28. type driver struct {
  29. eventCh chan serf.Event
  30. notifyCh chan ovNotify
  31. exitCh chan chan struct{}
  32. bindAddress string
  33. advertiseAddress string
  34. neighIP string
  35. config map[string]interface{}
  36. peerDb peerNetworkMap
  37. secMap *encrMap
  38. serfInstance *serf.Serf
  39. networks networkTable
  40. store datastore.DataStore
  41. localStore datastore.DataStore
  42. vxlanIdm *idm.Idm
  43. once sync.Once
  44. joinOnce sync.Once
  45. localJoinOnce sync.Once
  46. keys []*key
  47. sync.Mutex
  48. }
  49. // Init registers a new instance of overlay driver
  50. func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
  51. c := driverapi.Capability{
  52. DataScope: datastore.GlobalScope,
  53. ConnectivityScope: datastore.GlobalScope,
  54. }
  55. d := &driver{
  56. networks: networkTable{},
  57. peerDb: peerNetworkMap{
  58. mp: map[string]*peerMap{},
  59. },
  60. secMap: &encrMap{nodes: map[string][]*spi{}},
  61. config: config,
  62. }
  63. if data, ok := config[netlabel.GlobalKVClient]; ok {
  64. var err error
  65. dsc, ok := data.(discoverapi.DatastoreConfigData)
  66. if !ok {
  67. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  68. }
  69. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  70. if err != nil {
  71. return types.InternalErrorf("failed to initialize data store: %v", err)
  72. }
  73. }
  74. if data, ok := config[netlabel.LocalKVClient]; ok {
  75. var err error
  76. dsc, ok := data.(discoverapi.DatastoreConfigData)
  77. if !ok {
  78. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  79. }
  80. d.localStore, err = datastore.NewDataStoreFromConfig(dsc)
  81. if err != nil {
  82. return types.InternalErrorf("failed to initialize local data store: %v", err)
  83. }
  84. }
  85. if err := d.restoreEndpoints(); err != nil {
  86. logrus.Warnf("Failure during overlay endpoints restore: %v", err)
  87. }
  88. // If an error happened when the network join the sandbox during the endpoints restore
  89. // we should reset it now along with the once variable, so that subsequent endpoint joins
  90. // outside of the restore path can potentially fix the network join and succeed.
  91. for nid, n := range d.networks {
  92. if n.initErr != nil {
  93. logrus.Infof("resetting init error and once variable for network %s after unsuccessful endpoint restore: %v", nid, n.initErr)
  94. n.initErr = nil
  95. n.once = &sync.Once{}
  96. }
  97. }
  98. return dc.RegisterDriver(networkType, d, c)
  99. }
  100. // Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
  101. func (d *driver) restoreEndpoints() error {
  102. if d.localStore == nil {
  103. logrus.Warn("Cannot restore overlay endpoints because local datastore is missing")
  104. return nil
  105. }
  106. kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
  107. if err != nil && err != datastore.ErrKeyNotFound {
  108. return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
  109. }
  110. if err == datastore.ErrKeyNotFound {
  111. return nil
  112. }
  113. for _, kvo := range kvol {
  114. ep := kvo.(*endpoint)
  115. n := d.network(ep.nid)
  116. if n == nil {
  117. logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
  118. logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
  119. if err := d.deleteEndpointFromStore(ep); err != nil {
  120. logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
  121. }
  122. continue
  123. }
  124. n.addEndpoint(ep)
  125. s := n.getSubnetforIP(ep.addr)
  126. if s == nil {
  127. return fmt.Errorf("could not find subnet for endpoint %s", ep.id)
  128. }
  129. if err := n.joinSandbox(true); err != nil {
  130. return fmt.Errorf("restore network sandbox failed: %v", err)
  131. }
  132. if err := n.joinSubnetSandbox(s, true); err != nil {
  133. return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err)
  134. }
  135. Ifaces := make(map[string][]osl.IfaceOption)
  136. vethIfaceOption := make([]osl.IfaceOption, 1)
  137. vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName))
  138. Ifaces[fmt.Sprintf("%s+%s", "veth", "veth")] = vethIfaceOption
  139. err := n.sbox.Restore(Ifaces, nil, nil, nil)
  140. if err != nil {
  141. return fmt.Errorf("failed to restore overlay sandbox: %v", err)
  142. }
  143. n.incEndpointCount()
  144. d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
  145. }
  146. return nil
  147. }
  148. // Fini cleans up the driver resources
  149. func Fini(drv driverapi.Driver) {
  150. d := drv.(*driver)
  151. if d.exitCh != nil {
  152. waitCh := make(chan struct{})
  153. d.exitCh <- waitCh
  154. <-waitCh
  155. }
  156. }
  157. func (d *driver) configure() error {
  158. if d.store == nil {
  159. return nil
  160. }
  161. if d.vxlanIdm == nil {
  162. return d.initializeVxlanIdm()
  163. }
  164. return nil
  165. }
  166. func (d *driver) initializeVxlanIdm() error {
  167. var err error
  168. initVxlanIdm <- true
  169. defer func() { <-initVxlanIdm }()
  170. if d.vxlanIdm != nil {
  171. return nil
  172. }
  173. d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
  174. if err != nil {
  175. return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
  176. }
  177. return nil
  178. }
  179. func (d *driver) Type() string {
  180. return networkType
  181. }
  182. func (d *driver) IsBuiltIn() bool {
  183. return true
  184. }
  185. func validateSelf(node string) error {
  186. advIP := net.ParseIP(node)
  187. if advIP == nil {
  188. return fmt.Errorf("invalid self address (%s)", node)
  189. }
  190. addrs, err := net.InterfaceAddrs()
  191. if err != nil {
  192. return fmt.Errorf("Unable to get interface addresses %v", err)
  193. }
  194. for _, addr := range addrs {
  195. ip, _, err := net.ParseCIDR(addr.String())
  196. if err == nil && ip.Equal(advIP) {
  197. return nil
  198. }
  199. }
  200. return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
  201. }
  202. func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
  203. if self && !d.isSerfAlive() {
  204. d.Lock()
  205. d.advertiseAddress = advertiseAddress
  206. d.bindAddress = bindAddress
  207. d.Unlock()
  208. // If containers are already running on this network update the
  209. // advertiseaddress in the peerDB
  210. d.localJoinOnce.Do(func() {
  211. d.peerDBUpdateSelf()
  212. })
  213. // If there is no cluster store there is no need to start serf.
  214. if d.store != nil {
  215. if err := validateSelf(advertiseAddress); err != nil {
  216. logrus.Warnf("%s", err.Error())
  217. }
  218. err := d.serfInit()
  219. if err != nil {
  220. logrus.Errorf("initializing serf instance failed: %v", err)
  221. d.Lock()
  222. d.advertiseAddress = ""
  223. d.bindAddress = ""
  224. d.Unlock()
  225. return
  226. }
  227. }
  228. }
  229. d.Lock()
  230. if !self {
  231. d.neighIP = advertiseAddress
  232. }
  233. neighIP := d.neighIP
  234. d.Unlock()
  235. if d.serfInstance != nil && neighIP != "" {
  236. var err error
  237. d.joinOnce.Do(func() {
  238. err = d.serfJoin(neighIP)
  239. if err == nil {
  240. d.pushLocalDb()
  241. }
  242. })
  243. if err != nil {
  244. logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
  245. d.Lock()
  246. d.joinOnce = sync.Once{}
  247. d.Unlock()
  248. return
  249. }
  250. }
  251. }
  252. func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
  253. n := d.network(nid)
  254. if n == nil {
  255. logrus.Debugf("Error pushing local endpoint event for network %s", nid)
  256. return
  257. }
  258. ep := n.endpoint(eid)
  259. if ep == nil {
  260. logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
  261. return
  262. }
  263. if !d.isSerfAlive() {
  264. return
  265. }
  266. d.notifyCh <- ovNotify{
  267. action: "join",
  268. nw: n,
  269. ep: ep,
  270. }
  271. }
  272. // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
  273. func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
  274. var err error
  275. switch dType {
  276. case discoverapi.NodeDiscovery:
  277. nodeData, ok := data.(discoverapi.NodeDiscoveryData)
  278. if !ok || nodeData.Address == "" {
  279. return fmt.Errorf("invalid discovery data")
  280. }
  281. d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
  282. case discoverapi.DatastoreConfig:
  283. if d.store != nil {
  284. return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
  285. }
  286. dsc, ok := data.(discoverapi.DatastoreConfigData)
  287. if !ok {
  288. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  289. }
  290. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  291. if err != nil {
  292. return types.InternalErrorf("failed to initialize data store: %v", err)
  293. }
  294. case discoverapi.EncryptionKeysConfig:
  295. encrData, ok := data.(discoverapi.DriverEncryptionConfig)
  296. if !ok {
  297. return fmt.Errorf("invalid encryption key notification data")
  298. }
  299. keys := make([]*key, 0, len(encrData.Keys))
  300. for i := 0; i < len(encrData.Keys); i++ {
  301. k := &key{
  302. value: encrData.Keys[i],
  303. tag: uint32(encrData.Tags[i]),
  304. }
  305. keys = append(keys, k)
  306. }
  307. if err := d.setKeys(keys); err != nil {
  308. logrus.Warn(err)
  309. }
  310. case discoverapi.EncryptionKeysUpdate:
  311. var newKey, delKey, priKey *key
  312. encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
  313. if !ok {
  314. return fmt.Errorf("invalid encryption key notification data")
  315. }
  316. if encrData.Key != nil {
  317. newKey = &key{
  318. value: encrData.Key,
  319. tag: uint32(encrData.Tag),
  320. }
  321. }
  322. if encrData.Primary != nil {
  323. priKey = &key{
  324. value: encrData.Primary,
  325. tag: uint32(encrData.PrimaryTag),
  326. }
  327. }
  328. if encrData.Prune != nil {
  329. delKey = &key{
  330. value: encrData.Prune,
  331. tag: uint32(encrData.PruneTag),
  332. }
  333. }
  334. if err := d.updateKeys(newKey, priKey, delKey); err != nil {
  335. logrus.Warn(err)
  336. }
  337. default:
  338. }
  339. return nil
  340. }
  341. // DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
  342. func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
  343. return nil
  344. }