buffer.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package contentutil
  2. import (
  3. "bytes"
  4. "context"
  5. "io/ioutil"
  6. "sync"
  7. "time"
  8. "github.com/containerd/containerd/content"
  9. "github.com/containerd/containerd/errdefs"
  10. digest "github.com/opencontainers/go-digest"
  11. ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
  12. "github.com/pkg/errors"
  13. )
  14. // Buffer is a content provider and ingester that keeps data in memory
  15. type Buffer interface {
  16. content.Provider
  17. content.Ingester
  18. }
  19. // NewBuffer returns a new buffer
  20. func NewBuffer() Buffer {
  21. return &buffer{
  22. buffers: map[digest.Digest][]byte{},
  23. refs: map[string]struct{}{},
  24. }
  25. }
  26. type buffer struct {
  27. mu sync.Mutex
  28. buffers map[digest.Digest][]byte
  29. refs map[string]struct{}
  30. }
  31. func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
  32. var wOpts content.WriterOpts
  33. for _, opt := range opts {
  34. if err := opt(&wOpts); err != nil {
  35. return nil, err
  36. }
  37. }
  38. b.mu.Lock()
  39. if _, ok := b.refs[wOpts.Ref]; ok {
  40. return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
  41. }
  42. b.mu.Unlock()
  43. return &bufferedWriter{
  44. main: b,
  45. digester: digest.Canonical.Digester(),
  46. buffer: bytes.NewBuffer(nil),
  47. expected: wOpts.Desc.Digest,
  48. releaseRef: func() {
  49. b.mu.Lock()
  50. delete(b.refs, wOpts.Ref)
  51. b.mu.Unlock()
  52. },
  53. }, nil
  54. }
  55. func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
  56. r, err := b.getBytesReader(ctx, desc.Digest)
  57. if err != nil {
  58. return nil, err
  59. }
  60. return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
  61. }
  62. func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
  63. b.mu.Lock()
  64. defer b.mu.Unlock()
  65. if dt, ok := b.buffers[dgst]; ok {
  66. return bytes.NewReader(dt), nil
  67. }
  68. return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
  69. }
  70. func (b *buffer) addValue(k digest.Digest, dt []byte) {
  71. b.mu.Lock()
  72. defer b.mu.Unlock()
  73. b.buffers[k] = dt
  74. }
  75. type bufferedWriter struct {
  76. main *buffer
  77. ref string
  78. offset int64
  79. total int64
  80. startedAt time.Time
  81. updatedAt time.Time
  82. buffer *bytes.Buffer
  83. expected digest.Digest
  84. digester digest.Digester
  85. releaseRef func()
  86. }
  87. func (w *bufferedWriter) Write(p []byte) (n int, err error) {
  88. n, err = w.buffer.Write(p)
  89. w.digester.Hash().Write(p[:n])
  90. w.offset += int64(len(p))
  91. w.updatedAt = time.Now()
  92. return n, err
  93. }
  94. func (w *bufferedWriter) Close() error {
  95. if w.buffer != nil {
  96. w.releaseRef()
  97. w.buffer = nil
  98. }
  99. return nil
  100. }
  101. func (w *bufferedWriter) Status() (content.Status, error) {
  102. return content.Status{
  103. Ref: w.ref,
  104. Offset: w.offset,
  105. Total: w.total,
  106. StartedAt: w.startedAt,
  107. UpdatedAt: w.updatedAt,
  108. }, nil
  109. }
  110. func (w *bufferedWriter) Digest() digest.Digest {
  111. return w.digester.Digest()
  112. }
  113. func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
  114. if w.buffer == nil {
  115. return errors.Errorf("can't commit already committed or closed")
  116. }
  117. if s := int64(w.buffer.Len()); size > 0 && size != s {
  118. return errors.Errorf("unexpected commit size %d, expected %d", s, size)
  119. }
  120. dgst := w.digester.Digest()
  121. if expected != "" && expected != dgst {
  122. return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
  123. }
  124. if w.expected != "" && w.expected != dgst {
  125. return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
  126. }
  127. w.main.addValue(dgst, w.buffer.Bytes())
  128. return w.Close()
  129. }
  130. func (w *bufferedWriter) Truncate(size int64) error {
  131. if size != 0 {
  132. return errors.New("Truncate: unsupported size")
  133. }
  134. w.offset = 0
  135. w.digester.Hash().Reset()
  136. w.buffer.Reset()
  137. return nil
  138. }