file.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package file
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "strings"
  6. "time"
  7. "github.com/docker/docker/pkg/discovery"
  8. )
  9. // Discovery is exported
  10. type Discovery struct {
  11. heartbeat time.Duration
  12. path string
  13. }
  14. func init() {
  15. Init()
  16. }
  17. // Init is exported
  18. func Init() {
  19. discovery.Register("file", &Discovery{})
  20. }
  21. // Initialize is exported
  22. func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration, _ map[string]string) error {
  23. s.path = path
  24. s.heartbeat = heartbeat
  25. return nil
  26. }
  27. func parseFileContent(content []byte) []string {
  28. var result []string
  29. for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") {
  30. line = strings.TrimSpace(line)
  31. // Ignoring line starts with #
  32. if strings.HasPrefix(line, "#") {
  33. continue
  34. }
  35. // Inlined # comment also ignored.
  36. if strings.Contains(line, "#") {
  37. line = line[0:strings.Index(line, "#")]
  38. // Trim additional spaces caused by above stripping.
  39. line = strings.TrimSpace(line)
  40. }
  41. result = append(result, discovery.Generate(line)...)
  42. }
  43. return result
  44. }
  45. func (s *Discovery) fetch() (discovery.Entries, error) {
  46. fileContent, err := ioutil.ReadFile(s.path)
  47. if err != nil {
  48. return nil, fmt.Errorf("failed to read '%s': %v", s.path, err)
  49. }
  50. return discovery.CreateEntries(parseFileContent(fileContent))
  51. }
  52. // Watch is exported
  53. func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
  54. ch := make(chan discovery.Entries)
  55. errCh := make(chan error)
  56. ticker := time.NewTicker(s.heartbeat)
  57. go func() {
  58. defer close(errCh)
  59. defer close(ch)
  60. // Send the initial entries if available.
  61. currentEntries, err := s.fetch()
  62. if err != nil {
  63. errCh <- err
  64. } else {
  65. ch <- currentEntries
  66. }
  67. // Periodically send updates.
  68. for {
  69. select {
  70. case <-ticker.C:
  71. newEntries, err := s.fetch()
  72. if err != nil {
  73. errCh <- err
  74. continue
  75. }
  76. // Check if the file has really changed.
  77. if !newEntries.Equals(currentEntries) {
  78. ch <- newEntries
  79. }
  80. currentEntries = newEntries
  81. case <-stopCh:
  82. ticker.Stop()
  83. return
  84. }
  85. }
  86. }()
  87. return ch, errCh
  88. }
  89. // Register is exported
  90. func (s *Discovery) Register(addr string) error {
  91. return discovery.ErrNotImplemented
  92. }