overlay.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package overlay
  2. //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
  3. import (
  4. "fmt"
  5. "net"
  6. "sync"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/libnetwork/datastore"
  9. "github.com/docker/libnetwork/discoverapi"
  10. "github.com/docker/libnetwork/driverapi"
  11. "github.com/docker/libnetwork/idm"
  12. "github.com/docker/libnetwork/netlabel"
  13. "github.com/docker/libnetwork/osl"
  14. "github.com/docker/libnetwork/types"
  15. "github.com/hashicorp/serf/serf"
  16. )
  17. // XXX OVERLAY_SOLARIS
  18. // Might need changes for names/constant values in solaris
  19. const (
  20. networkType = "overlay"
  21. vethPrefix = "veth"
  22. vethLen = 7
  23. vxlanIDStart = 256
  24. vxlanIDEnd = (1 << 24) - 1
  25. vxlanPort = 4789
  26. vxlanEncap = 50
  27. secureOption = "encrypted"
  28. )
  29. var initVxlanIdm = make(chan (bool), 1)
  30. type driver struct {
  31. eventCh chan serf.Event
  32. notifyCh chan ovNotify
  33. exitCh chan chan struct{}
  34. bindAddress string
  35. advertiseAddress string
  36. neighIP string
  37. config map[string]interface{}
  38. peerDb peerNetworkMap
  39. secMap *encrMap
  40. serfInstance *serf.Serf
  41. networks networkTable
  42. store datastore.DataStore
  43. localStore datastore.DataStore
  44. vxlanIdm *idm.Idm
  45. once sync.Once
  46. joinOnce sync.Once
  47. keys []*key
  48. sync.Mutex
  49. }
  50. // Init registers a new instance of overlay driver
  51. func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
  52. c := driverapi.Capability{
  53. DataScope: datastore.GlobalScope,
  54. ConnectivityScope: datastore.GlobalScope,
  55. }
  56. d := &driver{
  57. networks: networkTable{},
  58. peerDb: peerNetworkMap{
  59. mp: map[string]*peerMap{},
  60. },
  61. secMap: &encrMap{nodes: map[string][]*spi{}},
  62. config: config,
  63. }
  64. if data, ok := config[netlabel.GlobalKVClient]; ok {
  65. var err error
  66. dsc, ok := data.(discoverapi.DatastoreConfigData)
  67. if !ok {
  68. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  69. }
  70. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  71. if err != nil {
  72. return types.InternalErrorf("failed to initialize data store: %v", err)
  73. }
  74. }
  75. if data, ok := config[netlabel.LocalKVClient]; ok {
  76. var err error
  77. dsc, ok := data.(discoverapi.DatastoreConfigData)
  78. if !ok {
  79. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  80. }
  81. d.localStore, err = datastore.NewDataStoreFromConfig(dsc)
  82. if err != nil {
  83. return types.InternalErrorf("failed to initialize local data store: %v", err)
  84. }
  85. }
  86. d.restoreEndpoints()
  87. return dc.RegisterDriver(networkType, d, c)
  88. }
  89. // Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
  90. func (d *driver) restoreEndpoints() error {
  91. if d.localStore == nil {
  92. logrus.Warnf("Cannot restore overlay endpoints because local datastore is missing")
  93. return nil
  94. }
  95. kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
  96. if err != nil && err != datastore.ErrKeyNotFound {
  97. return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
  98. }
  99. if err == datastore.ErrKeyNotFound {
  100. return nil
  101. }
  102. for _, kvo := range kvol {
  103. ep := kvo.(*endpoint)
  104. n := d.network(ep.nid)
  105. if n == nil {
  106. logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
  107. logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
  108. if err := d.deleteEndpointFromStore(ep); err != nil {
  109. logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
  110. }
  111. continue
  112. }
  113. n.addEndpoint(ep)
  114. s := n.getSubnetforIP(ep.addr)
  115. if s == nil {
  116. return fmt.Errorf("could not find subnet for endpoint %s", ep.id)
  117. }
  118. if err := n.joinSandbox(true); err != nil {
  119. return fmt.Errorf("restore network sandbox failed: %v", err)
  120. }
  121. if err := n.joinSubnetSandbox(s, true); err != nil {
  122. return fmt.Errorf("restore subnet sandbox failed for %q: %v", s.subnetIP.String(), err)
  123. }
  124. Ifaces := make(map[string][]osl.IfaceOption)
  125. vethIfaceOption := make([]osl.IfaceOption, 1)
  126. vethIfaceOption = append(vethIfaceOption, n.sbox.InterfaceOptions().Master(s.brName))
  127. Ifaces[fmt.Sprintf("%s+%s", "veth", "veth")] = vethIfaceOption
  128. err := n.sbox.Restore(Ifaces, nil, nil, nil)
  129. if err != nil {
  130. return fmt.Errorf("failed to restore overlay sandbox: %v", err)
  131. }
  132. n.incEndpointCount()
  133. d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
  134. }
  135. return nil
  136. }
  137. // Fini cleans up the driver resources
  138. func Fini(drv driverapi.Driver) {
  139. d := drv.(*driver)
  140. if d.exitCh != nil {
  141. waitCh := make(chan struct{})
  142. d.exitCh <- waitCh
  143. <-waitCh
  144. }
  145. }
  146. func (d *driver) configure() error {
  147. if d.store == nil {
  148. return nil
  149. }
  150. if d.vxlanIdm == nil {
  151. return d.initializeVxlanIdm()
  152. }
  153. return nil
  154. }
  155. func (d *driver) initializeVxlanIdm() error {
  156. var err error
  157. initVxlanIdm <- true
  158. defer func() { <-initVxlanIdm }()
  159. if d.vxlanIdm != nil {
  160. return nil
  161. }
  162. d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
  163. if err != nil {
  164. return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
  165. }
  166. return nil
  167. }
  168. func (d *driver) Type() string {
  169. return networkType
  170. }
  171. func (d *driver) IsBuiltIn() bool {
  172. return true
  173. }
  174. func validateSelf(node string) error {
  175. advIP := net.ParseIP(node)
  176. if advIP == nil {
  177. return fmt.Errorf("invalid self address (%s)", node)
  178. }
  179. addrs, err := net.InterfaceAddrs()
  180. if err != nil {
  181. return fmt.Errorf("Unable to get interface addresses %v", err)
  182. }
  183. for _, addr := range addrs {
  184. ip, _, err := net.ParseCIDR(addr.String())
  185. if err == nil && ip.Equal(advIP) {
  186. return nil
  187. }
  188. }
  189. return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
  190. }
  191. func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
  192. if self && !d.isSerfAlive() {
  193. d.Lock()
  194. d.advertiseAddress = advertiseAddress
  195. d.bindAddress = bindAddress
  196. d.Unlock()
  197. // If there is no cluster store there is no need to start serf.
  198. if d.store != nil {
  199. if err := validateSelf(advertiseAddress); err != nil {
  200. logrus.Warnf("%s", err.Error())
  201. }
  202. err := d.serfInit()
  203. if err != nil {
  204. logrus.Errorf("initializing serf instance failed: %v", err)
  205. d.Lock()
  206. d.advertiseAddress = ""
  207. d.bindAddress = ""
  208. d.Unlock()
  209. return
  210. }
  211. }
  212. }
  213. d.Lock()
  214. if !self {
  215. d.neighIP = advertiseAddress
  216. }
  217. neighIP := d.neighIP
  218. d.Unlock()
  219. if d.serfInstance != nil && neighIP != "" {
  220. var err error
  221. d.joinOnce.Do(func() {
  222. err = d.serfJoin(neighIP)
  223. if err == nil {
  224. d.pushLocalDb()
  225. }
  226. })
  227. if err != nil {
  228. logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
  229. d.Lock()
  230. d.joinOnce = sync.Once{}
  231. d.Unlock()
  232. return
  233. }
  234. }
  235. }
  236. func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
  237. n := d.network(nid)
  238. if n == nil {
  239. logrus.Debugf("Error pushing local endpoint event for network %s", nid)
  240. return
  241. }
  242. ep := n.endpoint(eid)
  243. if ep == nil {
  244. logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
  245. return
  246. }
  247. if !d.isSerfAlive() {
  248. return
  249. }
  250. d.notifyCh <- ovNotify{
  251. action: "join",
  252. nw: n,
  253. ep: ep,
  254. }
  255. }
  256. // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
  257. func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
  258. var err error
  259. switch dType {
  260. case discoverapi.NodeDiscovery:
  261. nodeData, ok := data.(discoverapi.NodeDiscoveryData)
  262. if !ok || nodeData.Address == "" {
  263. return fmt.Errorf("invalid discovery data")
  264. }
  265. d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
  266. case discoverapi.DatastoreConfig:
  267. if d.store != nil {
  268. return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
  269. }
  270. dsc, ok := data.(discoverapi.DatastoreConfigData)
  271. if !ok {
  272. return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
  273. }
  274. d.store, err = datastore.NewDataStoreFromConfig(dsc)
  275. if err != nil {
  276. return types.InternalErrorf("failed to initialize data store: %v", err)
  277. }
  278. case discoverapi.EncryptionKeysConfig:
  279. encrData, ok := data.(discoverapi.DriverEncryptionConfig)
  280. if !ok {
  281. return fmt.Errorf("invalid encryption key notification data")
  282. }
  283. keys := make([]*key, 0, len(encrData.Keys))
  284. for i := 0; i < len(encrData.Keys); i++ {
  285. k := &key{
  286. value: encrData.Keys[i],
  287. tag: uint32(encrData.Tags[i]),
  288. }
  289. keys = append(keys, k)
  290. }
  291. d.setKeys(keys)
  292. case discoverapi.EncryptionKeysUpdate:
  293. var newKey, delKey, priKey *key
  294. encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
  295. if !ok {
  296. return fmt.Errorf("invalid encryption key notification data")
  297. }
  298. if encrData.Key != nil {
  299. newKey = &key{
  300. value: encrData.Key,
  301. tag: uint32(encrData.Tag),
  302. }
  303. }
  304. if encrData.Primary != nil {
  305. priKey = &key{
  306. value: encrData.Primary,
  307. tag: uint32(encrData.PrimaryTag),
  308. }
  309. }
  310. if encrData.Prune != nil {
  311. delKey = &key{
  312. value: encrData.Prune,
  313. tag: uint32(encrData.PruneTag),
  314. }
  315. }
  316. d.updateKeys(newKey, priKey, delKey)
  317. default:
  318. }
  319. return nil
  320. }
  321. // DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
  322. func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
  323. return nil
  324. }