storage.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package blob
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/pkg/errors"
  9. )
  10. // Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers,
  11. // which can be written sequentially or treated as a io.Reader.
  12. type Bytes interface {
  13. io.WriterTo
  14. Length() int
  15. Reader() io.Reader
  16. }
  17. // Storage encapsulates API for connecting to blob storage.
  18. //
  19. // The underlying storage system must provide:
  20. //
  21. // * high durability, availability and bit-rot protection
  22. // * read-after-write - blob written using PubBlob() must be immediately readable using GetBlob() and ListBlobs()
  23. // * atomicity - it mustn't be possible to observe partial results of PubBlob() via either GetBlob() or ListBlobs()
  24. // * timestamps that don't go back in time (small clock skew up to minutes is allowed)
  25. // * reasonably low latency for retrievals
  26. //
  27. // The required semantics are provided by existing commercial cloud storage products (Google Cloud, AWS, Azure).
  28. type Storage interface {
  29. // PutBlob uploads the blob with given data to the repository or replaces existing blob with the provided
  30. // id with contents gathered from the specified list of slices.
  31. PutBlob(ctx context.Context, blobID ID, data Bytes) error
  32. // DeleteBlob removes the blob from storage. Future Get() operations will fail with ErrNotFound.
  33. DeleteBlob(ctx context.Context, blobID ID) error
  34. // GetBlob returns full or partial contents of a blob with given ID.
  35. // If length>0, the the function retrieves a range of bytes [offset,offset+length)
  36. // If length<0, the entire blob must be fetched.
  37. GetBlob(ctx context.Context, blobID ID, offset, length int64) ([]byte, error)
  38. // GetMetadata returns Metadata about single blob.
  39. GetMetadata(ctx context.Context, blobID ID) (Metadata, error)
  40. // ListBlobs invokes the provided callback for each blob in the storage.
  41. // Iteration continues until the callback returns an error or until all matching blobs have been reported.
  42. ListBlobs(ctx context.Context, blobIDPrefix ID, cb func(bm Metadata) error) error
  43. // ConnectionInfo returns JSON-serializable data structure containing information required to
  44. // connect to storage.
  45. ConnectionInfo() ConnectionInfo
  46. // Close releases all resources associated with storage.
  47. Close(ctx context.Context) error
  48. }
  49. // ID is a string that represents blob identifier.
  50. type ID string
  51. // Metadata represents metadata about a single BLOB in a storage.
  52. type Metadata struct {
  53. BlobID ID `json:"id"`
  54. Length int64 `json:"length"`
  55. Timestamp time.Time `json:"timestamp"`
  56. }
  57. func (m *Metadata) String() string {
  58. b, _ := json.Marshal(m)
  59. return string(b)
  60. }
  61. // ErrBlobNotFound is returned when a BLOB cannot be found in storage.
  62. var ErrBlobNotFound = errors.New("BLOB not found")
  63. // ListAllBlobs returns Metadata for all blobs in a given storage that have the provided name prefix.
  64. func ListAllBlobs(ctx context.Context, st Storage, prefix ID) ([]Metadata, error) {
  65. var result []Metadata
  66. err := st.ListBlobs(ctx, prefix, func(bm Metadata) error {
  67. result = append(result, bm)
  68. return nil
  69. })
  70. return result, err
  71. }
  72. // IterateAllPrefixesInParallel invokes the provided callback and returns the first error returned by the callback or nil.
  73. func IterateAllPrefixesInParallel(ctx context.Context, parallelism int, st Storage, prefixes []ID, callback func(Metadata) error) error {
  74. if len(prefixes) == 1 {
  75. return st.ListBlobs(ctx, prefixes[0], callback)
  76. }
  77. if parallelism <= 0 {
  78. parallelism = 1
  79. }
  80. var wg sync.WaitGroup
  81. semaphore := make(chan struct{}, parallelism)
  82. errch := make(chan error, len(prefixes))
  83. for _, prefix := range prefixes {
  84. wg.Add(1)
  85. prefix := prefix
  86. // acquire semaphore
  87. semaphore <- struct{}{}
  88. go func() {
  89. defer wg.Done()
  90. defer func() {
  91. <-semaphore // release semaphore
  92. }()
  93. if err := st.ListBlobs(ctx, prefix, callback); err != nil {
  94. errch <- err
  95. }
  96. }()
  97. }
  98. wg.Wait()
  99. close(errch)
  100. // return first error or nil
  101. return <-errch
  102. }