blobstore.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package plugin
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/distribution/xfer"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/layer"
  12. "github.com/docker/docker/pkg/archive"
  13. "github.com/docker/docker/pkg/progress"
  14. "github.com/opencontainers/go-digest"
  15. "github.com/pkg/errors"
  16. "golang.org/x/net/context"
  17. )
  18. type blobstore interface {
  19. New() (WriteCommitCloser, error)
  20. Get(dgst digest.Digest) (io.ReadCloser, error)
  21. Size(dgst digest.Digest) (int64, error)
  22. }
  23. type basicBlobStore struct {
  24. path string
  25. }
  26. func newBasicBlobStore(p string) (*basicBlobStore, error) {
  27. tmpdir := filepath.Join(p, "tmp")
  28. if err := os.MkdirAll(tmpdir, 0700); err != nil {
  29. return nil, errors.Wrapf(err, "failed to mkdir %v", p)
  30. }
  31. return &basicBlobStore{path: p}, nil
  32. }
  33. func (b *basicBlobStore) New() (WriteCommitCloser, error) {
  34. f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
  35. if err != nil {
  36. return nil, errors.Wrap(err, "failed to create temp file")
  37. }
  38. return newInsertion(f), nil
  39. }
  40. func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
  41. return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  42. }
  43. func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
  44. stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  45. if err != nil {
  46. return 0, err
  47. }
  48. return stat.Size(), nil
  49. }
  50. func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
  51. for _, alg := range []string{string(digest.Canonical)} {
  52. items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
  53. if err != nil {
  54. continue
  55. }
  56. for _, fi := range items {
  57. if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
  58. p := filepath.Join(b.path, alg, fi.Name())
  59. err := os.RemoveAll(p)
  60. logrus.Debugf("cleaned up blob %v: %v", p, err)
  61. }
  62. }
  63. }
  64. }
  65. // WriteCommitCloser defines object that can be committed to blobstore.
  66. type WriteCommitCloser interface {
  67. io.WriteCloser
  68. Commit() (digest.Digest, error)
  69. }
  70. type insertion struct {
  71. io.Writer
  72. f *os.File
  73. digester digest.Digester
  74. closed bool
  75. }
  76. func newInsertion(tempFile *os.File) *insertion {
  77. digester := digest.Canonical.Digester()
  78. return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
  79. }
  80. func (i *insertion) Commit() (digest.Digest, error) {
  81. p := i.f.Name()
  82. d := filepath.Join(filepath.Join(p, "../../"))
  83. i.f.Sync()
  84. defer os.RemoveAll(p)
  85. if err := i.f.Close(); err != nil {
  86. return "", err
  87. }
  88. i.closed = true
  89. dgst := i.digester.Digest()
  90. if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
  91. return "", errors.Wrapf(err, "failed to mkdir %v", d)
  92. }
  93. if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
  94. return "", errors.Wrapf(err, "failed to rename %v", p)
  95. }
  96. return dgst, nil
  97. }
  98. func (i *insertion) Close() error {
  99. if i.closed {
  100. return nil
  101. }
  102. defer os.RemoveAll(i.f.Name())
  103. return i.f.Close()
  104. }
  105. type downloadManager struct {
  106. blobStore blobstore
  107. tmpDir string
  108. blobs []digest.Digest
  109. configDigest digest.Digest
  110. }
  111. func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  112. for _, l := range layers {
  113. b, err := dm.blobStore.New()
  114. if err != nil {
  115. return initialRootFS, nil, err
  116. }
  117. defer b.Close()
  118. rc, _, err := l.Download(ctx, progressOutput)
  119. if err != nil {
  120. return initialRootFS, nil, errors.Wrap(err, "failed to download")
  121. }
  122. defer rc.Close()
  123. r := io.TeeReader(rc, b)
  124. inflatedLayerData, err := archive.DecompressStream(r)
  125. if err != nil {
  126. return initialRootFS, nil, err
  127. }
  128. digester := digest.Canonical.Digester()
  129. if _, err := archive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
  130. return initialRootFS, nil, err
  131. }
  132. initialRootFS.Append(layer.DiffID(digester.Digest()))
  133. d, err := b.Commit()
  134. if err != nil {
  135. return initialRootFS, nil, err
  136. }
  137. dm.blobs = append(dm.blobs, d)
  138. }
  139. return initialRootFS, nil, nil
  140. }
  141. func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
  142. b, err := dm.blobStore.New()
  143. if err != nil {
  144. return "", err
  145. }
  146. defer b.Close()
  147. n, err := b.Write(dt)
  148. if err != nil {
  149. return "", err
  150. }
  151. if n != len(dt) {
  152. return "", io.ErrShortWrite
  153. }
  154. d, err := b.Commit()
  155. dm.configDigest = d
  156. return d, err
  157. }
  158. func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
  159. return nil, fmt.Errorf("digest not found")
  160. }
  161. func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
  162. return configToRootFS(c)
  163. }