123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package blobtesting
- import (
- cryptorand "crypto/rand"
- "encoding/hex"
- "fmt"
- "math/rand"
- "strings"
- "testing"
- "github.com/pkg/errors"
- "golang.org/x/sync/errgroup"
- "github.com/kopia/kopia/internal/gather"
- "github.com/kopia/kopia/internal/testlogging"
- "github.com/kopia/kopia/repo/blob"
- )
- // ConcurrentAccessOptions encapsulates parameters for VerifyConcurrentAccess.
- type ConcurrentAccessOptions struct {
- NumBlobs int // number of shared blobs in the pool
- Getters int
- Putters int
- Deleters int
- Listers int
- Iterations int
- RangeGetPercentage int // 0..100 - probability of issuing range get
- NonExistentListPrefixPercentage int // probability of issuing non-matching list prefix
- }
- // VerifyConcurrentAccess tests data races on a repository to ensure only clean errors are returned.
- //
- //nolint:gocognit,gocyclo,funlen,cyclop
- func VerifyConcurrentAccess(t *testing.T, st blob.Storage, options ConcurrentAccessOptions) {
- t.Helper()
- // generate random blob IDs for the pool
- var blobs []blob.ID
- for range options.NumBlobs {
- blobIDBytes := make([]byte, 32)
- cryptorand.Read(blobIDBytes)
- blobs = append(blobs, blob.ID(hex.EncodeToString(blobIDBytes)))
- }
- randomBlobID := func() blob.ID {
- return blobs[rand.Intn(len(blobs))]
- }
- eg, ctx := errgroup.WithContext(testlogging.Context(t))
- // start readers that will be reading random blob out of the pool
- for range options.Getters {
- eg.Go(func() error {
- var data gather.WriteBuffer
- defer data.Close()
- for range options.Iterations {
- blobID := randomBlobID()
- offset := int64(0)
- length := int64(-1)
- if rand.Intn(100) < options.RangeGetPercentage {
- offset = 10
- length = 3
- }
- err := st.GetBlob(ctx, blobID, offset, length, &data)
- switch {
- case err == nil:
- if got, want := string(data.ToByteSlice()), string(blobID); !strings.HasPrefix(got, want) {
- return errors.Wrapf(err, "GetBlob returned invalid data for %v: %v, want prefix of %v", blobID, got, want)
- }
- case errors.Is(err, blob.ErrBlobNotFound):
- // clean error
- default:
- return errors.Wrapf(err, "GetBlob %v returned unexpected error", blobID)
- }
- }
- return nil
- })
- }
- // start putters that will be writing random blob out of the pool
- for range options.Putters {
- eg.Go(func() error {
- for range options.Iterations {
- blobID := randomBlobID()
- data := fmt.Sprintf("%v-%v", blobID, rand.Int63())
- err := st.PutBlob(ctx, blobID, gather.FromSlice([]byte(data)), blob.PutOptions{})
- if err != nil {
- return errors.Wrapf(err, "PutBlob %v returned unexpected error", blobID)
- }
- }
- return nil
- })
- }
- // start deleters that will be deleting random blob out of the pool
- for range options.Deleters {
- eg.Go(func() error {
- for range options.Iterations {
- blobID := randomBlobID()
- err := st.DeleteBlob(ctx, blobID)
- switch {
- case err == nil:
- // clean success
- case errors.Is(err, blob.ErrBlobNotFound):
- // clean error
- default:
- return errors.Wrapf(err, "DeleteBlob %v returned unexpected error", blobID)
- }
- }
- return nil
- })
- }
- // start listers that will be listing blobs by random prefixes of existing objects.
- for range options.Listers {
- eg.Go(func() error {
- for range options.Iterations {
- blobID := randomBlobID()
- prefix := blobID[0:rand.Intn(len(blobID))]
- if rand.Intn(100) < options.NonExistentListPrefixPercentage {
- prefix = "zzz"
- }
- if err := st.ListBlobs(ctx, prefix, func(blob.Metadata) error {
- return nil
- }); err != nil {
- return errors.Wrapf(err, "ListBlobs(%v) returned unexpected error", prefix)
- }
- }
- return nil
- })
- }
- if err := eg.Wait(); err != nil {
- t.Errorf("unexpected error: %v", err)
- }
- }
|