events.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package client // import "github.com/docker/docker/client"
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/url"
  6. "time"
  7. "github.com/docker/docker/api/types"
  8. "github.com/docker/docker/api/types/events"
  9. "github.com/docker/docker/api/types/filters"
  10. timetypes "github.com/docker/docker/api/types/time"
  11. )
  12. // Events returns a stream of events in the daemon. It's up to the caller to close the stream
  13. // by cancelling the context. Once the stream has been completely read an io.EOF error will
  14. // be sent over the error channel. If an error is sent all processing will be stopped. It's up
  15. // to the caller to reopen the stream in the event of an error by reinvoking this method.
  16. func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
  17. messages := make(chan events.Message)
  18. errs := make(chan error, 1)
  19. started := make(chan struct{})
  20. go func() {
  21. defer close(errs)
  22. query, err := buildEventsQueryParams(cli.version, options)
  23. if err != nil {
  24. close(started)
  25. errs <- err
  26. return
  27. }
  28. resp, err := cli.get(ctx, "/events", query, nil)
  29. if err != nil {
  30. close(started)
  31. errs <- err
  32. return
  33. }
  34. defer resp.body.Close()
  35. decoder := json.NewDecoder(resp.body)
  36. close(started)
  37. for {
  38. select {
  39. case <-ctx.Done():
  40. errs <- ctx.Err()
  41. return
  42. default:
  43. var event events.Message
  44. if err := decoder.Decode(&event); err != nil {
  45. errs <- err
  46. return
  47. }
  48. select {
  49. case messages <- event:
  50. case <-ctx.Done():
  51. errs <- ctx.Err()
  52. return
  53. }
  54. }
  55. }
  56. }()
  57. <-started
  58. return messages, errs
  59. }
  60. func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
  61. query := url.Values{}
  62. ref := time.Now()
  63. if options.Since != "" {
  64. ts, err := timetypes.GetTimestamp(options.Since, ref)
  65. if err != nil {
  66. return nil, err
  67. }
  68. query.Set("since", ts)
  69. }
  70. if options.Until != "" {
  71. ts, err := timetypes.GetTimestamp(options.Until, ref)
  72. if err != nil {
  73. return nil, err
  74. }
  75. query.Set("until", ts)
  76. }
  77. if options.Filters.Len() > 0 {
  78. //nolint:staticcheck // ignore SA1019 for old code
  79. filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
  80. if err != nil {
  81. return nil, err
  82. }
  83. query.Set("filters", filterJSON)
  84. }
  85. return query, nil
  86. }