watch.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package networkdb
  2. import (
  3. "net"
  4. "github.com/docker/go-events"
  5. )
  6. type opType uint8
  7. const (
  8. opCreate opType = 1 + iota
  9. opUpdate
  10. opDelete
  11. )
  12. type event struct {
  13. Table string
  14. NetworkID string
  15. Key string
  16. Value []byte
  17. }
  18. // NodeTable represents table event for node join and leave
  19. const NodeTable = "NodeTable"
  20. // NodeAddr represents the value carried for node event in NodeTable
  21. type NodeAddr struct {
  22. Addr net.IP
  23. }
  24. // CreateEvent generates a table entry create event to the watchers
  25. type CreateEvent event
  26. // UpdateEvent generates a table entry update event to the watchers
  27. type UpdateEvent event
  28. // DeleteEvent generates a table entry delete event to the watchers
  29. type DeleteEvent event
  30. // Watch creates a watcher with filters for a particular table or
  31. // network or any combination of the tuple. If any of the
  32. // filter is an empty string it acts as a wildcard for that
  33. // field. Watch returns a channel of events, where the events will be
  34. // sent.
  35. func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) {
  36. var matcher events.Matcher
  37. if tname != "" || nid != "" {
  38. matcher = events.MatcherFunc(func(ev events.Event) bool {
  39. var evt event
  40. switch ev := ev.(type) {
  41. case CreateEvent:
  42. evt = event(ev)
  43. case UpdateEvent:
  44. evt = event(ev)
  45. case DeleteEvent:
  46. evt = event(ev)
  47. }
  48. if tname != "" && evt.Table != tname {
  49. return false
  50. }
  51. if nid != "" && evt.NetworkID != nid {
  52. return false
  53. }
  54. return true
  55. })
  56. }
  57. ch := events.NewChannel(0)
  58. sink := events.Sink(events.NewQueue(ch))
  59. if matcher != nil {
  60. sink = events.NewFilter(sink, matcher)
  61. }
  62. nDB.broadcaster.Add(sink)
  63. return ch, func() {
  64. nDB.broadcaster.Remove(sink)
  65. ch.Close()
  66. sink.Close()
  67. }
  68. }
  69. func makeEvent(op opType, tname, nid, key string, value []byte) events.Event {
  70. ev := event{
  71. Table: tname,
  72. NetworkID: nid,
  73. Key: key,
  74. Value: value,
  75. }
  76. switch op {
  77. case opCreate:
  78. return CreateEvent(ev)
  79. case opUpdate:
  80. return UpdateEvent(ev)
  81. case opDelete:
  82. return DeleteEvent(ev)
  83. }
  84. return nil
  85. }