buffer.go 4.9 KB


  1. package contentutil
  2. import (
  3. "bytes"
  4. "context"
  5. "io/ioutil"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/containerd/containerd/content"
  10. "github.com/containerd/containerd/errdefs"
  11. digest "github.com/opencontainers/go-digest"
  12. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  13. "github.com/pkg/errors"
  14. )
  15. // Buffer is a content provider and ingester that keeps data in memory
  16. type Buffer interface {
  17. content.Provider
  18. content.Ingester
  19. content.Manager
  20. }
  21. // NewBuffer returns a new buffer
  22. func NewBuffer() Buffer {
  23. return &buffer{
  24. buffers: map[digest.Digest][]byte{},
  25. infos: map[digest.Digest]content.Info{},
  26. refs: map[string]struct{}{},
  27. }
  28. }
  29. type buffer struct {
  30. mu sync.Mutex
  31. buffers map[digest.Digest][]byte
  32. infos map[digest.Digest]content.Info
  33. refs map[string]struct{}
  34. }
  35. func (b *buffer) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
  36. b.mu.Lock()
  37. v, ok := b.infos[dgst]
  38. b.mu.Unlock()
  39. if !ok {
  40. return content.Info{}, errdefs.ErrNotFound
  41. }
  42. return v, nil
  43. }
  44. func (b *buffer) Update(ctx context.Context, new content.Info, fieldpaths ...string) (content.Info, error) {
  45. b.mu.Lock()
  46. defer b.mu.Unlock()
  47. updated, ok := b.infos[new.Digest]
  48. if !ok {
  49. return content.Info{}, errdefs.ErrNotFound
  50. }
  51. if len(fieldpaths) == 0 {
  52. fieldpaths = []string{"labels"}
  53. }
  54. for _, path := range fieldpaths {
  55. if strings.HasPrefix(path, "labels.") {
  56. if updated.Labels == nil {
  57. updated.Labels = map[string]string{}
  58. }
  59. key := strings.TrimPrefix(path, "labels.")
  60. updated.Labels[key] = new.Labels[key]
  61. continue
  62. }
  63. if path == "labels" {
  64. updated.Labels = new.Labels
  65. }
  66. }
  67. b.infos[new.Digest] = updated
  68. return updated, nil
  69. }
  70. func (b *buffer) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
  71. return nil // not implemented
  72. }
  73. func (b *buffer) Delete(ctx context.Context, dgst digest.Digest) error {
  74. return nil // not implemented
  75. }
  76. func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
  77. var wOpts content.WriterOpts
  78. for _, opt := range opts {
  79. if err := opt(&wOpts); err != nil {
  80. return nil, err
  81. }
  82. }
  83. b.mu.Lock()
  84. if _, ok := b.refs[wOpts.Ref]; ok {
  85. return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
  86. }
  87. b.mu.Unlock()
  88. return &bufferedWriter{
  89. main: b,
  90. digester: digest.Canonical.Digester(),
  91. buffer: bytes.NewBuffer(nil),
  92. expected: wOpts.Desc.Digest,
  93. releaseRef: func() {
  94. b.mu.Lock()
  95. delete(b.refs, wOpts.Ref)
  96. b.mu.Unlock()
  97. },
  98. }, nil
  99. }
  100. func (b *buffer) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
  101. r, err := b.getBytesReader(ctx, desc.Digest)
  102. if err != nil {
  103. return nil, err
  104. }
  105. return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
  106. }
  107. func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
  108. b.mu.Lock()
  109. defer b.mu.Unlock()
  110. if dt, ok := b.buffers[dgst]; ok {
  111. return bytes.NewReader(dt), nil
  112. }
  113. return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
  114. }
  115. func (b *buffer) addValue(k digest.Digest, dt []byte) {
  116. b.mu.Lock()
  117. defer b.mu.Unlock()
  118. b.buffers[k] = dt
  119. b.infos[k] = content.Info{Digest: k, Size: int64(len(dt))}
  120. }
  121. type bufferedWriter struct {
  122. main *buffer
  123. ref string
  124. offset int64
  125. total int64
  126. startedAt time.Time
  127. updatedAt time.Time
  128. buffer *bytes.Buffer
  129. expected digest.Digest
  130. digester digest.Digester
  131. releaseRef func()
  132. }
  133. func (w *bufferedWriter) Write(p []byte) (n int, err error) {
  134. n, err = w.buffer.Write(p)
  135. w.digester.Hash().Write(p[:n])
  136. w.offset += int64(len(p))
  137. w.updatedAt = time.Now()
  138. return n, err
  139. }
  140. func (w *bufferedWriter) Close() error {
  141. if w.buffer != nil {
  142. w.releaseRef()
  143. w.buffer = nil
  144. }
  145. return nil
  146. }
  147. func (w *bufferedWriter) Status() (content.Status, error) {
  148. return content.Status{
  149. Ref: w.ref,
  150. Offset: w.offset,
  151. Total: w.total,
  152. StartedAt: w.startedAt,
  153. UpdatedAt: w.updatedAt,
  154. }, nil
  155. }
  156. func (w *bufferedWriter) Digest() digest.Digest {
  157. return w.digester.Digest()
  158. }
  159. func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
  160. if w.buffer == nil {
  161. return errors.Errorf("can't commit already committed or closed")
  162. }
  163. if s := int64(w.buffer.Len()); size > 0 && size != s {
  164. return errors.Errorf("unexpected commit size %d, expected %d", s, size)
  165. }
  166. dgst := w.digester.Digest()
  167. if expected != "" && expected != dgst {
  168. return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
  169. }
  170. if w.expected != "" && w.expected != dgst {
  171. return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
  172. }
  173. w.main.addValue(dgst, w.buffer.Bytes())
  174. return w.Close()
  175. }
  176. func (w *bufferedWriter) Truncate(size int64) error {
  177. if size != 0 {
  178. return errors.New("Truncate: unsupported size")
  179. }
  180. w.offset = 0
  181. w.digester.Hash().Reset()
  182. w.buffer.Reset()
  183. return nil
  184. }