memory.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package memory
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/docker/docker/pkg/discovery"
  6. )
  7. // Discovery implements a discovery backend that keeps
  8. // data in memory.
  9. type Discovery struct {
  10. heartbeat time.Duration
  11. values []string
  12. mu sync.Mutex
  13. }
  14. func init() {
  15. Init()
  16. }
  17. // Init registers the memory backend on demand.
  18. func Init() {
  19. discovery.Register("memory", &Discovery{})
  20. }
  21. // Initialize sets the heartbeat for the memory backend.
  22. func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duration, _ map[string]string) error {
  23. s.heartbeat = heartbeat
  24. s.values = make([]string, 0)
  25. return nil
  26. }
  27. // Watch sends periodic discovery updates to a channel.
  28. func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  29. ch := make(chan discovery.Entries)
  30. errCh := make(chan error)
  31. ticker := time.NewTicker(s.heartbeat)
  32. go func() {
  33. defer close(errCh)
  34. defer close(ch)
  35. // Send the initial entries if available.
  36. var currentEntries discovery.Entries
  37. var err error
  38. s.mu.Lock()
  39. if len(s.values) > 0 {
  40. currentEntries, err = discovery.CreateEntries(s.values)
  41. }
  42. s.mu.Unlock()
  43. if err != nil {
  44. errCh <- err
  45. } else if currentEntries != nil {
  46. ch <- currentEntries
  47. }
  48. // Periodically send updates.
  49. for {
  50. select {
  51. case <-ticker.C:
  52. s.mu.Lock()
  53. newEntries, err := discovery.CreateEntries(s.values)
  54. s.mu.Unlock()
  55. if err != nil {
  56. errCh <- err
  57. continue
  58. }
  59. // Check if the file has really changed.
  60. if !newEntries.Equals(currentEntries) {
  61. ch <- newEntries
  62. }
  63. currentEntries = newEntries
  64. case <-stopCh:
  65. ticker.Stop()
  66. return
  67. }
  68. }
  69. }()
  70. return ch, errCh
  71. }
  72. // Register adds a new address to the discovery.
  73. func (s *Discovery) Register(addr string) error {
  74. s.mu.Lock()
  75. s.values = append(s.values, addr)
  76. s.mu.Unlock()
  77. return nil
  78. }