logfile_race_test.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. //go:build race
  2. package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
  3. import (
  4. "context"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/docker/docker/api/types/backend"
  11. "github.com/docker/docker/daemon/logger"
  12. "github.com/docker/docker/pkg/tailfile"
  13. "golang.org/x/sync/errgroup"
  14. "gotest.tools/v3/assert"
  15. )
  16. func TestConcurrentLogging(t *testing.T) {
  17. const (
  18. containers = 5
  19. loggers = 3 // loggers per container
  20. messages = 50 // messages per logger
  21. capacity = 256
  22. maxFiles = 3
  23. compress = true
  24. )
  25. getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
  26. return tailfile.NewTailReader(ctx, r, lines)
  27. }
  28. createDecoder := func(io.Reader) Decoder {
  29. return dummyDecoder{}
  30. }
  31. marshal := func(msg *logger.Message) []byte {
  32. return []byte(fmt.Sprintf(
  33. "Line=%q Source=%q Timestamp=%v Attrs=%v PLogMetaData=%#v Err=%v",
  34. msg.Line, msg.Source, msg.Timestamp, msg.Attrs, msg.PLogMetaData, msg.Err,
  35. ))
  36. }
  37. g, ctx := errgroup.WithContext(context.Background())
  38. for ct := 0; ct < containers; ct++ {
  39. ct := ct
  40. dir := t.TempDir()
  41. g.Go(func() (err error) {
  42. logfile, err := NewLogFile(filepath.Join(dir, "log.log"), capacity, maxFiles, compress, createDecoder, 0o644, getTailReader)
  43. if err != nil {
  44. return err
  45. }
  46. defer func() {
  47. if cErr := logfile.Close(); cErr != nil && err == nil {
  48. err = cErr
  49. }
  50. }()
  51. lg, ctx := errgroup.WithContext(ctx)
  52. for ln := 0; ln < loggers; ln++ {
  53. ln := ln
  54. lg.Go(func() error {
  55. for m := 0; m < messages; m++ {
  56. select {
  57. case <-ctx.Done():
  58. return ctx.Err()
  59. default:
  60. }
  61. timestamp := time.Now()
  62. msg := logger.NewMessage()
  63. msg.Line = append(msg.Line, fmt.Sprintf("container=%v logger=%v msg=%v", ct, ln, m)...)
  64. msg.Source = "stdout"
  65. msg.Timestamp = timestamp
  66. msg.Attrs = append(msg.Attrs, backend.LogAttr{Key: "foo", Value: "bar"})
  67. msg.PLogMetaData = &backend.PartialLogMetaData{ID: fmt.Sprintf("%v %v %v", ct, ln, m), Ordinal: 1, Last: true}
  68. marshalled := marshal(msg)
  69. logger.PutMessage(msg)
  70. if err := logfile.WriteLogEntry(timestamp, marshalled); err != nil {
  71. return err
  72. }
  73. }
  74. return nil
  75. })
  76. }
  77. return lg.Wait()
  78. })
  79. }
  80. assert.NilError(t, g.Wait())
  81. }