overlay.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package overlay
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/libkv/store"
  7. "github.com/docker/libnetwork/config"
  8. "github.com/docker/libnetwork/datastore"
  9. "github.com/docker/libnetwork/driverapi"
  10. "github.com/docker/libnetwork/idm"
  11. "github.com/docker/libnetwork/netlabel"
  12. "github.com/hashicorp/serf/serf"
  13. )
  14. const (
  15. networkType = "overlay"
  16. vethPrefix = "veth"
  17. vethLen = 7
  18. vxlanIDStart = 256
  19. vxlanIDEnd = 1000
  20. vxlanPort = 4789
  21. vxlanVethMTU = 1450
  22. )
  23. type driver struct {
  24. eventCh chan serf.Event
  25. notifyCh chan ovNotify
  26. exitCh chan chan struct{}
  27. bindAddress string
  28. neighIP string
  29. config map[string]interface{}
  30. peerDb peerNetworkMap
  31. serfInstance *serf.Serf
  32. networks networkTable
  33. store datastore.DataStore
  34. ipAllocator *idm.Idm
  35. vxlanIdm *idm.Idm
  36. once sync.Once
  37. joinOnce sync.Once
  38. sync.Mutex
  39. }
  40. // Init registers a new instance of overlay driver
  41. func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
  42. c := driverapi.Capability{
  43. DataScope: datastore.GlobalScope,
  44. }
  45. d := &driver{
  46. networks: networkTable{},
  47. peerDb: peerNetworkMap{
  48. mp: map[string]peerMap{},
  49. },
  50. config: config,
  51. }
  52. return dc.RegisterDriver(networkType, d, c)
  53. }
  54. // Fini cleans up the driver resources
  55. func Fini(drv driverapi.Driver) {
  56. d := drv.(*driver)
  57. if d.exitCh != nil {
  58. waitCh := make(chan struct{})
  59. d.exitCh <- waitCh
  60. <-waitCh
  61. }
  62. }
  63. func (d *driver) configure() error {
  64. var err error
  65. if len(d.config) == 0 {
  66. return nil
  67. }
  68. d.once.Do(func() {
  69. provider, provOk := d.config[netlabel.KVProvider]
  70. provURL, urlOk := d.config[netlabel.KVProviderURL]
  71. if provOk && urlOk {
  72. cfg := &config.DatastoreCfg{
  73. Client: config.DatastoreClientCfg{
  74. Provider: provider.(string),
  75. Address: provURL.(string),
  76. },
  77. }
  78. provConfig, confOk := d.config[netlabel.KVProviderConfig]
  79. if confOk {
  80. cfg.Client.Config = provConfig.(*store.Config)
  81. }
  82. d.store, err = datastore.NewDataStore(cfg)
  83. if err != nil {
  84. err = fmt.Errorf("failed to initialize data store: %v", err)
  85. return
  86. }
  87. }
  88. d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
  89. if err != nil {
  90. err = fmt.Errorf("failed to initialize vxlan id manager: %v", err)
  91. return
  92. }
  93. d.ipAllocator, err = idm.New(d.store, "ipam-id", 1, 0xFFFF-2)
  94. if err != nil {
  95. err = fmt.Errorf("failed to initalize ipam id manager: %v", err)
  96. return
  97. }
  98. })
  99. return err
  100. }
  101. func (d *driver) Type() string {
  102. return networkType
  103. }
  104. func (d *driver) nodeJoin(node string, self bool) {
  105. if self && !d.isSerfAlive() {
  106. d.Lock()
  107. d.bindAddress = node
  108. d.Unlock()
  109. err := d.serfInit()
  110. if err != nil {
  111. logrus.Errorf("initializing serf instance failed: %v", err)
  112. return
  113. }
  114. }
  115. d.Lock()
  116. if !self {
  117. d.neighIP = node
  118. }
  119. neighIP := d.neighIP
  120. d.Unlock()
  121. if d.serfInstance != nil && neighIP != "" {
  122. var err error
  123. d.joinOnce.Do(func() {
  124. err = d.serfJoin(neighIP)
  125. if err == nil {
  126. d.pushLocalDb()
  127. }
  128. })
  129. if err != nil {
  130. logrus.Errorf("joining serf neighbor %s failed: %v", node, err)
  131. d.Lock()
  132. d.joinOnce = sync.Once{}
  133. d.Unlock()
  134. return
  135. }
  136. }
  137. }
  138. func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
  139. if !d.isSerfAlive() {
  140. return
  141. }
  142. d.notifyCh <- ovNotify{
  143. action: "join",
  144. nid: nid,
  145. eid: eid,
  146. }
  147. }
  148. // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
  149. func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
  150. if dType == driverapi.NodeDiscovery {
  151. nodeData, ok := data.(driverapi.NodeDiscoveryData)
  152. if !ok || nodeData.Address == "" {
  153. return fmt.Errorf("invalid discovery data")
  154. }
  155. d.nodeJoin(nodeData.Address, nodeData.Self)
  156. }
  157. return nil
  158. }
  159. // DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
  160. func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
  161. return nil
  162. }