watch.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package state
  2. import (
  3. "github.com/docker/go-events"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/watch"
  6. )
  7. // EventCommit delineates a transaction boundary.
  8. type EventCommit struct {
  9. Version *api.Version
  10. }
  11. // Matches returns true if this event is a commit event.
  12. func (e EventCommit) Matches(watchEvent events.Event) bool {
  13. _, ok := watchEvent.(EventCommit)
  14. return ok
  15. }
  16. // TaskCheckStateGreaterThan is a TaskCheckFunc for checking task state.
  17. func TaskCheckStateGreaterThan(t1, t2 *api.Task) bool {
  18. return t2.Status.State > t1.Status.State
  19. }
  20. // NodeCheckState is a NodeCheckFunc for matching node state.
  21. func NodeCheckState(n1, n2 *api.Node) bool {
  22. return n1.Status.State == n2.Status.State
  23. }
  24. // Watch takes a variable number of events to match against. The subscriber
  25. // will receive events that match any of the arguments passed to Watch.
  26. //
  27. // Examples:
  28. //
  29. // // subscribe to all events
  30. // Watch(q)
  31. //
  32. // // subscribe to all UpdateTask events
  33. // Watch(q, EventUpdateTask{})
  34. //
  35. // // subscribe to all task-related events
  36. // Watch(q, EventUpdateTask{}, EventCreateTask{}, EventDeleteTask{})
  37. //
  38. // // subscribe to UpdateTask for node 123
  39. // Watch(q, EventUpdateTask{Task: &api.Task{NodeID: 123},
  40. // Checks: []TaskCheckFunc{TaskCheckNodeID}})
  41. //
  42. // // subscribe to UpdateTask for node 123, as well as CreateTask
  43. // // for node 123 that also has ServiceID set to "abc"
  44. // Watch(q, EventUpdateTask{Task: &api.Task{NodeID: 123},
  45. // Checks: []TaskCheckFunc{TaskCheckNodeID}},
  46. // EventCreateTask{Task: &api.Task{NodeID: 123, ServiceID: "abc"},
  47. // Checks: []TaskCheckFunc{TaskCheckNodeID,
  48. // func(t1, t2 *api.Task) bool {
  49. // return t1.ServiceID == t2.ServiceID
  50. // }}})
  51. func Watch(queue *watch.Queue, specifiers ...api.Event) (eventq chan events.Event, cancel func()) {
  52. if len(specifiers) == 0 {
  53. return queue.Watch()
  54. }
  55. return queue.CallbackWatch(Matcher(specifiers...))
  56. }
  57. // Matcher returns an events.Matcher that Matches the specifiers with OR logic.
  58. func Matcher(specifiers ...api.Event) events.MatcherFunc {
  59. return events.MatcherFunc(func(event events.Event) bool {
  60. for _, s := range specifiers {
  61. if s.Matches(event) {
  62. return true
  63. }
  64. }
  65. return false
  66. })
  67. }