hostdiscovery.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package hostdiscovery
  2. import (
  3. "net"
  4. "sync"
  5. "github.com/Sirupsen/logrus"
  6. mapset "github.com/deckarep/golang-set"
  7. "github.com/docker/docker/pkg/discovery"
  8. // Including KV
  9. _ "github.com/docker/docker/pkg/discovery/kv"
  10. "github.com/docker/libkv/store/consul"
  11. "github.com/docker/libkv/store/etcd"
  12. "github.com/docker/libkv/store/zookeeper"
  13. "github.com/docker/libnetwork/types"
  14. )
  15. type hostDiscovery struct {
  16. watcher discovery.Watcher
  17. nodes mapset.Set
  18. stopChan chan struct{}
  19. sync.Mutex
  20. }
  21. func init() {
  22. consul.Register()
  23. etcd.Register()
  24. zookeeper.Register()
  25. }
  26. // NewHostDiscovery function creates a host discovery object
  27. func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
  28. return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
  29. }
  30. func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
  31. h.Lock()
  32. d := h.watcher
  33. h.Unlock()
  34. if d == nil {
  35. return types.BadRequestErrorf("invalid discovery watcher")
  36. }
  37. discoveryCh, errCh := d.Watch(h.stopChan)
  38. go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
  39. return nil
  40. }
  41. func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
  42. activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
  43. for {
  44. select {
  45. case entries := <-ch:
  46. h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
  47. case err := <-errCh:
  48. if err != nil {
  49. logrus.Errorf("discovery error: %v", err)
  50. }
  51. case <-h.stopChan:
  52. return
  53. }
  54. }
  55. }
  56. func (h *hostDiscovery) StopDiscovery() error {
  57. h.Lock()
  58. stopChan := h.stopChan
  59. h.watcher = nil
  60. h.Unlock()
  61. close(stopChan)
  62. return nil
  63. }
  64. func (h *hostDiscovery) processCallback(entries discovery.Entries,
  65. activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
  66. updated := hosts(entries)
  67. h.Lock()
  68. existing := h.nodes
  69. added, removed := diff(existing, updated)
  70. h.nodes = updated
  71. h.Unlock()
  72. activeCallback()
  73. if len(added) > 0 {
  74. joinCallback(added)
  75. }
  76. if len(removed) > 0 {
  77. leaveCallback(removed)
  78. }
  79. }
  80. func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
  81. addSlice := updated.Difference(existing).ToSlice()
  82. removeSlice := existing.Difference(updated).ToSlice()
  83. for _, ip := range addSlice {
  84. added = append(added, net.ParseIP(ip.(string)))
  85. }
  86. for _, ip := range removeSlice {
  87. removed = append(removed, net.ParseIP(ip.(string)))
  88. }
  89. return
  90. }
  91. func (h *hostDiscovery) Fetch() []net.IP {
  92. h.Lock()
  93. defer h.Unlock()
  94. ips := []net.IP{}
  95. for _, ipstr := range h.nodes.ToSlice() {
  96. ips = append(ips, net.ParseIP(ipstr.(string)))
  97. }
  98. return ips
  99. }
  100. func hosts(entries discovery.Entries) mapset.Set {
  101. hosts := mapset.NewSet()
  102. for _, entry := range entries {
  103. hosts.Add(entry.Host)
  104. }
  105. return hosts
  106. }