events.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package client
  2. import (
  3. "encoding/json"
  4. "net/url"
  5. "time"
  6. "golang.org/x/net/context"
  7. "github.com/docker/docker/api/types"
  8. "github.com/docker/docker/api/types/events"
  9. "github.com/docker/docker/api/types/filters"
  10. timetypes "github.com/docker/docker/api/types/time"
  11. )
  12. // Events returns a stream of events in the daemon. It's up to the caller to close the stream
  13. // by cancelling the context. Once the stream has been completely read an io.EOF error will
  14. // be sent over the error channel. If an error is sent all processing will be stopped. It's up
  15. // to the caller to reopen the stream in the event of an error by reinvoking this method.
  16. func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
  17. messages := make(chan events.Message)
  18. errs := make(chan error, 1)
  19. go func() {
  20. defer close(errs)
  21. query, err := buildEventsQueryParams(cli.version, options)
  22. if err != nil {
  23. errs <- err
  24. return
  25. }
  26. resp, err := cli.get(ctx, "/events", query, nil)
  27. if err != nil {
  28. errs <- err
  29. return
  30. }
  31. defer resp.body.Close()
  32. decoder := json.NewDecoder(resp.body)
  33. for {
  34. select {
  35. case <-ctx.Done():
  36. errs <- ctx.Err()
  37. return
  38. default:
  39. var event events.Message
  40. if err := decoder.Decode(&event); err != nil {
  41. errs <- err
  42. return
  43. }
  44. select {
  45. case messages <- event:
  46. case <-ctx.Done():
  47. errs <- ctx.Err()
  48. return
  49. }
  50. }
  51. }
  52. }()
  53. return messages, errs
  54. }
  55. func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
  56. query := url.Values{}
  57. ref := time.Now()
  58. if options.Since != "" {
  59. ts, err := timetypes.GetTimestamp(options.Since, ref)
  60. if err != nil {
  61. return nil, err
  62. }
  63. query.Set("since", ts)
  64. }
  65. if options.Until != "" {
  66. ts, err := timetypes.GetTimestamp(options.Until, ref)
  67. if err != nil {
  68. return nil, err
  69. }
  70. query.Set("until", ts)
  71. }
  72. if options.Filters.Len() > 0 {
  73. filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
  74. if err != nil {
  75. return nil, err
  76. }
  77. query.Set("filters", filterJSON)
  78. }
  79. return query, nil
  80. }