events.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package events
  2. import (
  3. "sync"
  4. "time"
  5. eventtypes "github.com/docker/docker/api/types/events"
  6. "github.com/docker/docker/pkg/pubsub"
  7. )
  8. const (
  9. eventsLimit = 256
  10. bufferSize = 1024
  11. )
  12. // Events is pubsub channel for events generated by the engine.
  13. type Events struct {
  14. mu sync.Mutex
  15. events []eventtypes.Message
  16. pub *pubsub.Publisher
  17. }
  18. // New returns new *Events instance
  19. func New() *Events {
  20. return &Events{
  21. events: make([]eventtypes.Message, 0, eventsLimit),
  22. pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize),
  23. }
  24. }
  25. // Subscribe adds new listener to events, returns slice of 256 stored
  26. // last events, a channel in which you can expect new events (in form
  27. // of interface{}, so you need type assertion), and a function to call
  28. // to stop the stream of events.
  29. func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
  30. eventSubscribers.Inc()
  31. e.mu.Lock()
  32. current := make([]eventtypes.Message, len(e.events))
  33. copy(current, e.events)
  34. l := e.pub.Subscribe()
  35. e.mu.Unlock()
  36. cancel := func() {
  37. e.Evict(l)
  38. }
  39. return current, l, cancel
  40. }
  41. // SubscribeTopic adds new listener to events, returns slice of 256 stored
  42. // last events, a channel in which you can expect new events (in form
  43. // of interface{}, so you need type assertion).
  44. func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
  45. eventSubscribers.Inc()
  46. e.mu.Lock()
  47. var topic func(m interface{}) bool
  48. if ef != nil && ef.filter.Len() > 0 {
  49. topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
  50. }
  51. buffered := e.loadBufferedEvents(since, until, topic)
  52. var ch chan interface{}
  53. if topic != nil {
  54. ch = e.pub.SubscribeTopic(topic)
  55. } else {
  56. // Subscribe to all events if there are no filters
  57. ch = e.pub.Subscribe()
  58. }
  59. e.mu.Unlock()
  60. return buffered, ch
  61. }
  62. // Evict evicts listener from pubsub
  63. func (e *Events) Evict(l chan interface{}) {
  64. eventSubscribers.Dec()
  65. e.pub.Evict(l)
  66. }
  67. // Log creates a local scope message and publishes it
  68. func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
  69. now := time.Now().UTC()
  70. jm := eventtypes.Message{
  71. Action: action,
  72. Type: eventType,
  73. Actor: actor,
  74. Scope: "local",
  75. Time: now.Unix(),
  76. TimeNano: now.UnixNano(),
  77. }
  78. // fill deprecated fields for container and images
  79. switch eventType {
  80. case eventtypes.ContainerEventType:
  81. jm.ID = actor.ID
  82. jm.Status = action
  83. jm.From = actor.Attributes["image"]
  84. case eventtypes.ImageEventType:
  85. jm.ID = actor.ID
  86. jm.Status = action
  87. }
  88. e.PublishMessage(jm)
  89. }
  90. // PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
  91. // receive the event or it will be skipped.
  92. func (e *Events) PublishMessage(jm eventtypes.Message) {
  93. eventsCounter.Inc()
  94. e.mu.Lock()
  95. if len(e.events) == cap(e.events) {
  96. // discard oldest event
  97. copy(e.events, e.events[1:])
  98. e.events[len(e.events)-1] = jm
  99. } else {
  100. e.events = append(e.events, jm)
  101. }
  102. e.mu.Unlock()
  103. e.pub.Publish(jm)
  104. }
  105. // SubscribersCount returns number of event listeners
  106. func (e *Events) SubscribersCount() int {
  107. return e.pub.Len()
  108. }
  109. // loadBufferedEvents iterates over the cached events in the buffer
  110. // and returns those that were emitted between two specific dates.
  111. // It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
  112. // It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
  113. func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
  114. var buffered []eventtypes.Message
  115. if since.IsZero() && until.IsZero() {
  116. return buffered
  117. }
  118. var sinceNanoUnix int64
  119. if !since.IsZero() {
  120. sinceNanoUnix = since.UnixNano()
  121. }
  122. var untilNanoUnix int64
  123. if !until.IsZero() {
  124. untilNanoUnix = until.UnixNano()
  125. }
  126. for i := len(e.events) - 1; i >= 0; i-- {
  127. ev := e.events[i]
  128. if ev.TimeNano < sinceNanoUnix {
  129. break
  130. }
  131. if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
  132. continue
  133. }
  134. if topic == nil || topic(ev) {
  135. buffered = append([]eventtypes.Message{ev}, buffered...)
  136. }
  137. }
  138. return buffered
  139. }