concurrent.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package blobtesting
  2. import (
  3. cryptorand "crypto/rand"
  4. "encoding/hex"
  5. "fmt"
  6. "math/rand"
  7. "strings"
  8. "testing"
  9. "github.com/pkg/errors"
  10. "golang.org/x/sync/errgroup"
  11. "github.com/kopia/kopia/internal/gather"
  12. "github.com/kopia/kopia/internal/testlogging"
  13. "github.com/kopia/kopia/repo/blob"
  14. )
  15. // ConcurrentAccessOptions encapsulates parameters for VerifyConcurrentAccess.
  16. type ConcurrentAccessOptions struct {
  17. NumBlobs int // number of shared blobs in the pool
  18. Getters int
  19. Putters int
  20. Deleters int
  21. Listers int
  22. Iterations int
  23. RangeGetPercentage int // 0..100 - probability of issuing range get
  24. NonExistentListPrefixPercentage int // probability of issuing non-matching list prefix
  25. }
  26. // VerifyConcurrentAccess tests data races on a repository to ensure only clean errors are returned.
  27. //
  28. //nolint:gocognit,gocyclo,funlen,cyclop
  29. func VerifyConcurrentAccess(t *testing.T, st blob.Storage, options ConcurrentAccessOptions) {
  30. t.Helper()
  31. // generate random blob IDs for the pool
  32. var blobs []blob.ID
  33. for range options.NumBlobs {
  34. blobIDBytes := make([]byte, 32)
  35. cryptorand.Read(blobIDBytes)
  36. blobs = append(blobs, blob.ID(hex.EncodeToString(blobIDBytes)))
  37. }
  38. randomBlobID := func() blob.ID {
  39. return blobs[rand.Intn(len(blobs))]
  40. }
  41. eg, ctx := errgroup.WithContext(testlogging.Context(t))
  42. // start readers that will be reading random blob out of the pool
  43. for range options.Getters {
  44. eg.Go(func() error {
  45. var data gather.WriteBuffer
  46. defer data.Close()
  47. for range options.Iterations {
  48. blobID := randomBlobID()
  49. offset := int64(0)
  50. length := int64(-1)
  51. if rand.Intn(100) < options.RangeGetPercentage {
  52. offset = 10
  53. length = 3
  54. }
  55. err := st.GetBlob(ctx, blobID, offset, length, &data)
  56. switch {
  57. case err == nil:
  58. if got, want := string(data.ToByteSlice()), string(blobID); !strings.HasPrefix(got, want) {
  59. return errors.Wrapf(err, "GetBlob returned invalid data for %v: %v, want prefix of %v", blobID, got, want)
  60. }
  61. case errors.Is(err, blob.ErrBlobNotFound):
  62. // clean error
  63. default:
  64. return errors.Wrapf(err, "GetBlob %v returned unexpected error", blobID)
  65. }
  66. }
  67. return nil
  68. })
  69. }
  70. // start putters that will be writing random blob out of the pool
  71. for range options.Putters {
  72. eg.Go(func() error {
  73. for range options.Iterations {
  74. blobID := randomBlobID()
  75. data := fmt.Sprintf("%v-%v", blobID, rand.Int63())
  76. err := st.PutBlob(ctx, blobID, gather.FromSlice([]byte(data)), blob.PutOptions{})
  77. if err != nil {
  78. return errors.Wrapf(err, "PutBlob %v returned unexpected error", blobID)
  79. }
  80. }
  81. return nil
  82. })
  83. }
  84. // start deleters that will be deleting random blob out of the pool
  85. for range options.Deleters {
  86. eg.Go(func() error {
  87. for range options.Iterations {
  88. blobID := randomBlobID()
  89. err := st.DeleteBlob(ctx, blobID)
  90. switch {
  91. case err == nil:
  92. // clean success
  93. case errors.Is(err, blob.ErrBlobNotFound):
  94. // clean error
  95. default:
  96. return errors.Wrapf(err, "DeleteBlob %v returned unexpected error", blobID)
  97. }
  98. }
  99. return nil
  100. })
  101. }
  102. // start listers that will be listing blobs by random prefixes of existing objects.
  103. for range options.Listers {
  104. eg.Go(func() error {
  105. for range options.Iterations {
  106. blobID := randomBlobID()
  107. prefix := blobID[0:rand.Intn(len(blobID))]
  108. if rand.Intn(100) < options.NonExistentListPrefixPercentage {
  109. prefix = "zzz"
  110. }
  111. if err := st.ListBlobs(ctx, prefix, func(blob.Metadata) error {
  112. return nil
  113. }); err != nil {
  114. return errors.Wrapf(err, "ListBlobs(%v) returned unexpected error", prefix)
  115. }
  116. }
  117. return nil
  118. })
  119. }
  120. if err := eg.Wait(); err != nil {
  121. t.Errorf("unexpected error: %v", err)
  122. }
  123. }