overlay.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package overlay
  2. //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
  3. import (
  4. "context"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "github.com/docker/libnetwork/datastore"
  9. "github.com/docker/libnetwork/discoverapi"
  10. "github.com/docker/libnetwork/driverapi"
  11. "github.com/docker/libnetwork/idm"
  12. "github.com/docker/libnetwork/netlabel"
  13. "github.com/docker/libnetwork/osl"
  14. "github.com/docker/libnetwork/types"
  15. "github.com/hashicorp/serf/serf"
  16. "github.com/sirupsen/logrus"
  17. )
  18. const (
  19. networkType = "overlay"
  20. vethPrefix = "veth"
  21. vethLen = 7
  22. vxlanIDStart = 256
  23. vxlanIDEnd = (1 << 24) - 1
  24. vxlanPort = 4789
  25. vxlanEncap = 50
  26. secureOption = "encrypted"
  27. )
  28. var initVxlanIdm = make(chan (bool), 1)
  29. type driver struct {
  30. eventCh chan serf.Event
  31. notifyCh chan ovNotify
  32. exitCh chan chan struct{}
  33. bindAddress string
  34. advertiseAddress string
  35. neighIP string
  36. config map[string]interface{}
  37. peerDb peerNetworkMap
  38. secMap *encrMap
  39. serfInstance *serf.Serf
  40. networks networkTable
  41. store datastore.DataStore
  42. localStore datastore.DataStore
  43. vxlanIdm *idm.Idm
  44. initOS sync.Once
  45. joinOnce sync.Once
  46. localJoinOnce sync.Once
  47. keys []*key
  48. peerOpCh chan *peerOperation
  49. peerOpCancel context.CancelFunc
  50. sync.Mutex
  51. }
  52. // Init registers a new instance of overlay driver
  53. func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
  54. c := driverapi.Capability{
  55. DataScope: datastore.GlobalScope,
  56. ConnectivityScope: datastore.GlobalScope,
  57. }
  58. d := &driver{
  59. networks: networkTable{},
  60. peerDb: peerNetworkMap{
  61. mp: map[string]*peerMap{},
  62. },
  63. secMap: &encrMap{nodes: map[string][]*spi{}},
  64. config: config,
  65. peerOpCh: make(chan *peerOperation),
  66. }
  67. // Launch the go routine for processing peer operations
  68. ctx, cancel := context.WithCancel(context.Background())
  69. d.peerOpCancel = cancel
  70. go d.peerOpRoutine(ctx, d.peerOpCh)
  71. if data, ok := config[netlabel.GlobalKVClient]; ok {
  72. var err error
  73. dsc, ok := data.(discoverapi.DatastoreConfigData)
  74. if !ok {
  75. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  76. }
  77. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  78. if err != nil {
  79. return types.InternalErrorf("failed to initialize data store: %v", err)
  80. }
  81. }
  82. if data, ok := config[netlabel.LocalKVClient]; ok {
  83. var err error
  84. dsc, ok := data.(discoverapi.DatastoreConfigData)
  85. if !ok {
  86. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  87. }
  88. d.localStore, err = datastore.NewDataStoreFromConfig(dsc)
  89. if err != nil {
  90. return types.InternalErrorf("failed to initialize local data store: %v", err)
  91. }
  92. }
  93. if err := d.restoreEndpoints(); err != nil {
  94. logrus.Warnf("Failure during overlay endpoints restore: %v", err)
  95. }
  96. // If an error happened when the network join the sandbox during the endpoints restore
  97. // we should reset it now along with the once variable, so that subsequent endpoint joins
  98. // outside of the restore path can potentially fix the network join and succeed.
  99. for nid, n := range d.networks {
  100. if n.initErr != nil {
  101. logrus.Infof("resetting init error and once variable for network %s after unsuccessful endpoint restore: %v", nid, n.initErr)
  102. n.initErr = nil
  103. n.once = &sync.Once{}
  104. }
  105. }
  106. return dc.RegisterDriver(networkType, d, c)
  107. }
  108. // Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
  109. func (d *driver) restoreEndpoints() error {
  110. if d.localStore == nil {
  111. logrus.Warn("Cannot restore overlay endpoints because local datastore is missing")
  112. return nil
  113. }
  114. kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
  115. if err != nil && err != datastore.ErrKeyNotFound {
  116. return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
  117. }
  118. if err == datastore.ErrKeyNotFound {
  119. return nil
  120. }
  121. for _, kvo := range kvol {
  122. ep := kvo.(*endpoint)
  123. n := d.network(ep.nid)
  124. if n == nil {
  125. logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
  126. logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
  127. if err := d.deleteEndpointFromStore(ep); err != nil {
  128. logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
  129. }
  130. continue
  131. }
  132. n.addEndpoint(ep)
  133. s := n.getSubnetforIP(ep.addr)
  134. if s == nil {
  135. return fmt.Errorf("could not find subnet for endpoint %s", ep.id)
  136. }
  137. if err := n.joinSandbox(true); err != nil {
  138. return fmt.Errorf("restore network sandbox failed: %v", err)
  139. }
  140. if err := n.joinSubnetSandbox(s, true); err != nil {
  141. return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err)
  142. }
  143. Ifaces := make(map[string][]osl.IfaceOption)
  144. vethIfaceOption := make([]osl.IfaceOption, 1)
  145. vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName))
  146. Ifaces[fmt.Sprintf("%s+%s", "veth", "veth")] = vethIfaceOption
  147. err := n.sbox.Restore(Ifaces, nil, nil, nil)
  148. if err != nil {
  149. return fmt.Errorf("failed to restore overlay sandbox: %v", err)
  150. }
  151. n.incEndpointCount()
  152. d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
  153. }
  154. return nil
  155. }
  156. // Fini cleans up the driver resources
  157. func Fini(drv driverapi.Driver) {
  158. d := drv.(*driver)
  159. // Notify the peer go routine to return
  160. if d.peerOpCancel != nil {
  161. d.peerOpCancel()
  162. }
  163. if d.exitCh != nil {
  164. waitCh := make(chan struct{})
  165. d.exitCh <- waitCh
  166. <-waitCh
  167. }
  168. }
  169. func (d *driver) configure() error {
  170. // Apply OS specific kernel configs if needed
  171. d.initOS.Do(applyOStweaks)
  172. if d.store == nil {
  173. return nil
  174. }
  175. if d.vxlanIdm == nil {
  176. return d.initializeVxlanIdm()
  177. }
  178. return nil
  179. }
  180. func (d *driver) initializeVxlanIdm() error {
  181. var err error
  182. initVxlanIdm <- true
  183. defer func() { <-initVxlanIdm }()
  184. if d.vxlanIdm != nil {
  185. return nil
  186. }
  187. d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
  188. if err != nil {
  189. return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
  190. }
  191. return nil
  192. }
  193. func (d *driver) Type() string {
  194. return networkType
  195. }
  196. func (d *driver) IsBuiltIn() bool {
  197. return true
  198. }
  199. func validateSelf(node string) error {
  200. advIP := net.ParseIP(node)
  201. if advIP == nil {
  202. return fmt.Errorf("invalid self address (%s)", node)
  203. }
  204. addrs, err := net.InterfaceAddrs()
  205. if err != nil {
  206. return fmt.Errorf("Unable to get interface addresses %v", err)
  207. }
  208. for _, addr := range addrs {
  209. ip, _, err := net.ParseCIDR(addr.String())
  210. if err == nil && ip.Equal(advIP) {
  211. return nil
  212. }
  213. }
  214. return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
  215. }
  216. func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
  217. if self && !d.isSerfAlive() {
  218. d.Lock()
  219. d.advertiseAddress = advertiseAddress
  220. d.bindAddress = bindAddress
  221. d.Unlock()
  222. // If containers are already running on this network update the
  223. // advertiseaddress in the peerDB
  224. d.localJoinOnce.Do(func() {
  225. d.peerDBUpdateSelf()
  226. })
  227. // If there is no cluster store there is no need to start serf.
  228. if d.store != nil {
  229. if err := validateSelf(advertiseAddress); err != nil {
  230. logrus.Warnf("%s", err.Error())
  231. }
  232. err := d.serfInit()
  233. if err != nil {
  234. logrus.Errorf("initializing serf instance failed: %v", err)
  235. d.Lock()
  236. d.advertiseAddress = ""
  237. d.bindAddress = ""
  238. d.Unlock()
  239. return
  240. }
  241. }
  242. }
  243. d.Lock()
  244. if !self {
  245. d.neighIP = advertiseAddress
  246. }
  247. neighIP := d.neighIP
  248. d.Unlock()
  249. if d.serfInstance != nil && neighIP != "" {
  250. var err error
  251. d.joinOnce.Do(func() {
  252. err = d.serfJoin(neighIP)
  253. if err == nil {
  254. d.pushLocalDb()
  255. }
  256. })
  257. if err != nil {
  258. logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
  259. d.Lock()
  260. d.joinOnce = sync.Once{}
  261. d.Unlock()
  262. return
  263. }
  264. }
  265. }
  266. func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
  267. n := d.network(nid)
  268. if n == nil {
  269. logrus.Debugf("Error pushing local endpoint event for network %s", nid)
  270. return
  271. }
  272. ep := n.endpoint(eid)
  273. if ep == nil {
  274. logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
  275. return
  276. }
  277. if !d.isSerfAlive() {
  278. return
  279. }
  280. d.notifyCh <- ovNotify{
  281. action: "join",
  282. nw: n,
  283. ep: ep,
  284. }
  285. }
  286. // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
  287. func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
  288. var err error
  289. switch dType {
  290. case discoverapi.NodeDiscovery:
  291. nodeData, ok := data.(discoverapi.NodeDiscoveryData)
  292. if !ok || nodeData.Address == "" {
  293. return fmt.Errorf("invalid discovery data")
  294. }
  295. d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
  296. case discoverapi.DatastoreConfig:
  297. if d.store != nil {
  298. return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
  299. }
  300. dsc, ok := data.(discoverapi.DatastoreConfigData)
  301. if !ok {
  302. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  303. }
  304. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  305. if err != nil {
  306. return types.InternalErrorf("failed to initialize data store: %v", err)
  307. }
  308. case discoverapi.EncryptionKeysConfig:
  309. encrData, ok := data.(discoverapi.DriverEncryptionConfig)
  310. if !ok {
  311. return fmt.Errorf("invalid encryption key notification data")
  312. }
  313. keys := make([]*key, 0, len(encrData.Keys))
  314. for i := 0; i < len(encrData.Keys); i++ {
  315. k := &key{
  316. value: encrData.Keys[i],
  317. tag: uint32(encrData.Tags[i]),
  318. }
  319. keys = append(keys, k)
  320. }
  321. if err := d.setKeys(keys); err != nil {
  322. logrus.Warn(err)
  323. }
  324. case discoverapi.EncryptionKeysUpdate:
  325. var newKey, delKey, priKey *key
  326. encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
  327. if !ok {
  328. return fmt.Errorf("invalid encryption key notification data")
  329. }
  330. if encrData.Key != nil {
  331. newKey = &key{
  332. value: encrData.Key,
  333. tag: uint32(encrData.Tag),
  334. }
  335. }
  336. if encrData.Primary != nil {
  337. priKey = &key{
  338. value: encrData.Primary,
  339. tag: uint32(encrData.PrimaryTag),
  340. }
  341. }
  342. if encrData.Prune != nil {
  343. delKey = &key{
  344. value: encrData.Prune,
  345. tag: uint32(encrData.PruneTag),
  346. }
  347. }
  348. if err := d.updateKeys(newKey, priKey, delKey); err != nil {
  349. logrus.Warn(err)
  350. }
  351. default:
  352. }
  353. return nil
  354. }
  355. // DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
  356. func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
  357. return nil
  358. }