123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package events
- import (
- "sync"
- "time"
- eventtypes "github.com/docker/docker/api/types/events"
- "github.com/docker/docker/pkg/pubsub"
- )
- const (
- eventsLimit = 256
- bufferSize = 1024
- )
- // Events is pubsub channel for events generated by the engine.
- type Events struct {
- mu sync.Mutex
- events []eventtypes.Message
- pub *pubsub.Publisher
- }
- // New returns new *Events instance
- func New() *Events {
- return &Events{
- events: make([]eventtypes.Message, 0, eventsLimit),
- pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize),
- }
- }
- // Subscribe adds new listener to events, returns slice of 256 stored
- // last events, a channel in which you can expect new events (in form
- // of interface{}, so you need type assertion), and a function to call
- // to stop the stream of events.
- func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
- eventSubscribers.Inc()
- e.mu.Lock()
- current := make([]eventtypes.Message, len(e.events))
- copy(current, e.events)
- l := e.pub.Subscribe()
- e.mu.Unlock()
- cancel := func() {
- e.Evict(l)
- }
- return current, l, cancel
- }
- // SubscribeTopic adds new listener to events, returns slice of 256 stored
- // last events, a channel in which you can expect new events (in form
- // of interface{}, so you need type assertion).
- func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
- eventSubscribers.Inc()
- e.mu.Lock()
- var topic func(m interface{}) bool
- if ef != nil && ef.filter.Len() > 0 {
- topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
- }
- buffered := e.loadBufferedEvents(since, until, topic)
- var ch chan interface{}
- if topic != nil {
- ch = e.pub.SubscribeTopic(topic)
- } else {
- // Subscribe to all events if there are no filters
- ch = e.pub.Subscribe()
- }
- e.mu.Unlock()
- return buffered, ch
- }
- // Evict evicts listener from pubsub
- func (e *Events) Evict(l chan interface{}) {
- eventSubscribers.Dec()
- e.pub.Evict(l)
- }
- // Log creates a local scope message and publishes it
- func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
- now := time.Now().UTC()
- jm := eventtypes.Message{
- Action: action,
- Type: eventType,
- Actor: actor,
- Scope: "local",
- Time: now.Unix(),
- TimeNano: now.UnixNano(),
- }
- // fill deprecated fields for container and images
- switch eventType {
- case eventtypes.ContainerEventType:
- jm.ID = actor.ID
- jm.Status = action
- jm.From = actor.Attributes["image"]
- case eventtypes.ImageEventType:
- jm.ID = actor.ID
- jm.Status = action
- }
- e.PublishMessage(jm)
- }
- // PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
- // receive the event or it will be skipped.
- func (e *Events) PublishMessage(jm eventtypes.Message) {
- eventsCounter.Inc()
- e.mu.Lock()
- if len(e.events) == cap(e.events) {
- // discard oldest event
- copy(e.events, e.events[1:])
- e.events[len(e.events)-1] = jm
- } else {
- e.events = append(e.events, jm)
- }
- e.mu.Unlock()
- e.pub.Publish(jm)
- }
- // SubscribersCount returns number of event listeners
- func (e *Events) SubscribersCount() int {
- return e.pub.Len()
- }
- // loadBufferedEvents iterates over the cached events in the buffer
- // and returns those that were emitted between two specific dates.
- // It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
- // It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
- func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
- var buffered []eventtypes.Message
- if since.IsZero() && until.IsZero() {
- return buffered
- }
- var sinceNanoUnix int64
- if !since.IsZero() {
- sinceNanoUnix = since.UnixNano()
- }
- var untilNanoUnix int64
- if !until.IsZero() {
- untilNanoUnix = until.UnixNano()
- }
- for i := len(e.events) - 1; i >= 0; i-- {
- ev := e.events[i]
- if ev.TimeNano < sinceNanoUnix {
- break
- }
- if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
- continue
- }
- if topic == nil || topic(ev) {
- buffered = append([]eventtypes.Message{ev}, buffered...)
- }
- }
- return buffered
- }
|