ov_serf.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package overlay
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/hashicorp/serf/serf"
  9. )
  10. type ovNotify struct {
  11. action string
  12. ep *endpoint
  13. nw *network
  14. }
  15. type logWriter struct{}
  16. func (l *logWriter) Write(p []byte) (int, error) {
  17. str := string(p)
  18. switch {
  19. case strings.Contains(str, "[WARN]"):
  20. logrus.Warn(str)
  21. case strings.Contains(str, "[DEBUG]"):
  22. logrus.Debug(str)
  23. case strings.Contains(str, "[INFO]"):
  24. logrus.Info(str)
  25. case strings.Contains(str, "[ERR]"):
  26. logrus.Error(str)
  27. }
  28. return len(p), nil
  29. }
  30. func (d *driver) serfInit() error {
  31. var err error
  32. config := serf.DefaultConfig()
  33. config.Init()
  34. config.MemberlistConfig.BindAddr = d.advertiseAddress
  35. d.eventCh = make(chan serf.Event, 4)
  36. config.EventCh = d.eventCh
  37. config.UserCoalescePeriod = 1 * time.Second
  38. config.UserQuiescentPeriod = 50 * time.Millisecond
  39. config.LogOutput = &logWriter{}
  40. config.MemberlistConfig.LogOutput = config.LogOutput
  41. s, err := serf.Create(config)
  42. if err != nil {
  43. return fmt.Errorf("failed to create cluster node: %v", err)
  44. }
  45. defer func() {
  46. if err != nil {
  47. s.Shutdown()
  48. }
  49. }()
  50. d.serfInstance = s
  51. d.notifyCh = make(chan ovNotify)
  52. d.exitCh = make(chan chan struct{})
  53. go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
  54. return nil
  55. }
  56. func (d *driver) serfJoin(neighIP string) error {
  57. if neighIP == "" {
  58. return fmt.Errorf("no neighbor to join")
  59. }
  60. if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
  61. return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
  62. neighIP, err)
  63. }
  64. return nil
  65. }
  66. func (d *driver) notifyEvent(event ovNotify) {
  67. ep := event.ep
  68. ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
  69. net.IP(ep.addr.Mask).String(), ep.mac.String())
  70. eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
  71. event.nw.id, ep.id)
  72. if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
  73. logrus.Errorf("Sending user event failed: %v\n", err)
  74. }
  75. }
  76. func (d *driver) processEvent(u serf.UserEvent) {
  77. logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
  78. string(u.Payload), uint64(u.LTime))
  79. var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
  80. if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
  81. fmt.Printf("Failed to scan name string: %v\n", err)
  82. }
  83. if _, err := fmt.Sscan(string(u.Payload), &action,
  84. &ipStr, &maskStr, &macStr); err != nil {
  85. fmt.Printf("Failed to scan value string: %v\n", err)
  86. }
  87. logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
  88. mac, err := net.ParseMAC(macStr)
  89. if err != nil {
  90. logrus.Errorf("Failed to parse mac: %v\n", err)
  91. }
  92. if d.serfInstance.LocalMember().Addr.String() == vtepStr {
  93. return
  94. }
  95. switch action {
  96. case "join":
  97. if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
  98. net.ParseIP(vtepStr), true, false, false); err != nil {
  99. logrus.Errorf("Peer add failed in the driver: %v\n", err)
  100. }
  101. case "leave":
  102. if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
  103. net.ParseIP(vtepStr), true); err != nil {
  104. logrus.Errorf("Peer delete failed in the driver: %v\n", err)
  105. }
  106. }
  107. }
  108. func (d *driver) processQuery(q *serf.Query) {
  109. logrus.Debugf("Received query name:%s, payload:%s\n", q.Name,
  110. string(q.Payload))
  111. var nid, ipStr string
  112. if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil {
  113. fmt.Printf("Failed to scan query payload string: %v\n", err)
  114. }
  115. peerMac, peerIPMask, vtep, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
  116. if err != nil {
  117. return
  118. }
  119. logrus.Debugf("Sending peer query resp mac %s, mask %s, vtep %s", peerMac, net.IP(peerIPMask), vtep)
  120. q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String())))
  121. }
  122. func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
  123. if d.serfInstance == nil {
  124. return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized")
  125. }
  126. qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String())
  127. resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
  128. if err != nil {
  129. return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
  130. }
  131. respCh := resp.ResponseCh()
  132. select {
  133. case r := <-respCh:
  134. var macStr, maskStr, vtepStr string
  135. if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil {
  136. return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
  137. }
  138. mac, err := net.ParseMAC(macStr)
  139. if err != nil {
  140. return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
  141. }
  142. logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
  143. return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
  144. case <-time.After(time.Second):
  145. return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
  146. }
  147. }
  148. func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
  149. exitCh chan chan struct{}) {
  150. for {
  151. select {
  152. case notify, ok := <-notifyCh:
  153. if !ok {
  154. break
  155. }
  156. d.notifyEvent(notify)
  157. case ch, ok := <-exitCh:
  158. if !ok {
  159. break
  160. }
  161. if err := d.serfInstance.Leave(); err != nil {
  162. logrus.Errorf("failed leaving the cluster: %v\n", err)
  163. }
  164. d.serfInstance.Shutdown()
  165. close(ch)
  166. return
  167. case e, ok := <-eventCh:
  168. if !ok {
  169. break
  170. }
  171. if e.EventType() == serf.EventQuery {
  172. d.processQuery(e.(*serf.Query))
  173. break
  174. }
  175. u, ok := e.(serf.UserEvent)
  176. if !ok {
  177. break
  178. }
  179. d.processEvent(u)
  180. }
  181. }
  182. }
  183. func (d *driver) isSerfAlive() bool {
  184. d.Lock()
  185. serfInstance := d.serfInstance
  186. d.Unlock()
  187. if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
  188. return false
  189. }
  190. return true
  191. }