123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package contentutil
- import (
- "bytes"
- "context"
- "io/ioutil"
- "sync"
- "time"
- "github.com/containerd/containerd/content"
- "github.com/containerd/containerd/errdefs"
- digest "github.com/opencontainers/go-digest"
- ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- )
- // Buffer is a content provider and ingester that keeps data in memory
- type Buffer interface {
- content.Provider
- content.Ingester
- }
- // NewBuffer returns a new buffer
- func NewBuffer() Buffer {
- return &buffer{
- buffers: map[digest.Digest][]byte{},
- refs: map[string]struct{}{},
- }
- }
- type buffer struct {
- mu sync.Mutex
- buffers map[digest.Digest][]byte
- refs map[string]struct{}
- }
- func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
- var wOpts content.WriterOpts
- for _, opt := range opts {
- if err := opt(&wOpts); err != nil {
- return nil, err
- }
- }
- b.mu.Lock()
- if _, ok := b.refs[wOpts.Ref]; ok {
- return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
- }
- b.mu.Unlock()
- return &bufferedWriter{
- main: b,
- digester: digest.Canonical.Digester(),
- buffer: bytes.NewBuffer(nil),
- expected: wOpts.Desc.Digest,
- releaseRef: func() {
- b.mu.Lock()
- delete(b.refs, wOpts.Ref)
- b.mu.Unlock()
- },
- }, nil
- }
- func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
- r, err := b.getBytesReader(ctx, desc.Digest)
- if err != nil {
- return nil, err
- }
- return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
- }
- func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
- b.mu.Lock()
- defer b.mu.Unlock()
- if dt, ok := b.buffers[dgst]; ok {
- return bytes.NewReader(dt), nil
- }
- return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
- }
- func (b *buffer) addValue(k digest.Digest, dt []byte) {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.buffers[k] = dt
- }
- type bufferedWriter struct {
- main *buffer
- ref string
- offset int64
- total int64
- startedAt time.Time
- updatedAt time.Time
- buffer *bytes.Buffer
- expected digest.Digest
- digester digest.Digester
- releaseRef func()
- }
- func (w *bufferedWriter) Write(p []byte) (n int, err error) {
- n, err = w.buffer.Write(p)
- w.digester.Hash().Write(p[:n])
- w.offset += int64(len(p))
- w.updatedAt = time.Now()
- return n, err
- }
- func (w *bufferedWriter) Close() error {
- if w.buffer != nil {
- w.releaseRef()
- w.buffer = nil
- }
- return nil
- }
- func (w *bufferedWriter) Status() (content.Status, error) {
- return content.Status{
- Ref: w.ref,
- Offset: w.offset,
- Total: w.total,
- StartedAt: w.startedAt,
- UpdatedAt: w.updatedAt,
- }, nil
- }
- func (w *bufferedWriter) Digest() digest.Digest {
- return w.digester.Digest()
- }
- func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
- if w.buffer == nil {
- return errors.Errorf("can't commit already committed or closed")
- }
- if s := int64(w.buffer.Len()); size > 0 && size != s {
- return errors.Errorf("unexpected commit size %d, expected %d", s, size)
- }
- dgst := w.digester.Digest()
- if expected != "" && expected != dgst {
- return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
- }
- if w.expected != "" && w.expected != dgst {
- return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
- }
- w.main.addValue(dgst, w.buffer.Bytes())
- return w.Close()
- }
- func (w *bufferedWriter) Truncate(size int64) error {
- if size != 0 {
- return errors.New("Truncate: unsupported size")
- }
- w.offset = 0
- w.digester.Hash().Reset()
- w.buffer.Reset()
- return nil
- }
|