events_utils.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package system
  2. import (
  3. "encoding/json"
  4. "io"
  5. "sync"
  6. "github.com/Sirupsen/logrus"
  7. eventtypes "github.com/docker/engine-api/types/events"
  8. )
  9. // EventHandler is abstract interface for user to customize
  10. // own handle functions of each type of events
  11. type EventHandler interface {
  12. Handle(action string, h func(eventtypes.Message))
  13. Watch(c <-chan eventtypes.Message)
  14. }
  15. // InitEventHandler initializes and returns an EventHandler
  16. func InitEventHandler() EventHandler {
  17. return &eventHandler{handlers: make(map[string]func(eventtypes.Message))}
  18. }
  19. type eventHandler struct {
  20. handlers map[string]func(eventtypes.Message)
  21. mu sync.Mutex
  22. }
  23. func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
  24. w.mu.Lock()
  25. w.handlers[action] = h
  26. w.mu.Unlock()
  27. }
  28. // Watch ranges over the passed in event chan and processes the events based on the
  29. // handlers created for a given action.
  30. // To stop watching, close the event chan.
  31. func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
  32. for e := range c {
  33. w.mu.Lock()
  34. h, exists := w.handlers[e.Action]
  35. w.mu.Unlock()
  36. if !exists {
  37. continue
  38. }
  39. logrus.Debugf("event handler: received event: %v", e)
  40. go h(e)
  41. }
  42. }
  43. // DecodeEvents decodes event from input stream
  44. func DecodeEvents(input io.Reader, ep eventProcessor) error {
  45. dec := json.NewDecoder(input)
  46. for {
  47. var event eventtypes.Message
  48. err := dec.Decode(&event)
  49. if err != nil && err == io.EOF {
  50. break
  51. }
  52. if procErr := ep(event, err); procErr != nil {
  53. return procErr
  54. }
  55. }
  56. return nil
  57. }