blobstore.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package plugin
  2. import (
  3. "io"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/distribution/digest"
  9. "github.com/docker/docker/distribution/xfer"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/layer"
  12. "github.com/docker/docker/pkg/archive"
  13. "github.com/docker/docker/pkg/progress"
  14. "github.com/pkg/errors"
  15. "golang.org/x/net/context"
  16. )
  17. type blobstore interface {
  18. New() (WriteCommitCloser, error)
  19. Get(dgst digest.Digest) (io.ReadCloser, error)
  20. Size(dgst digest.Digest) (int64, error)
  21. }
  22. type basicBlobStore struct {
  23. path string
  24. }
  25. func newBasicBlobStore(p string) (*basicBlobStore, error) {
  26. tmpdir := filepath.Join(p, "tmp")
  27. if err := os.MkdirAll(tmpdir, 0700); err != nil {
  28. return nil, errors.Wrapf(err, "failed to mkdir %v", p)
  29. }
  30. return &basicBlobStore{path: p}, nil
  31. }
  32. func (b *basicBlobStore) New() (WriteCommitCloser, error) {
  33. f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
  34. if err != nil {
  35. return nil, errors.Wrap(err, "failed to create temp file")
  36. }
  37. return newInsertion(f), nil
  38. }
  39. func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
  40. return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  41. }
  42. func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
  43. stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  44. if err != nil {
  45. return 0, err
  46. }
  47. return stat.Size(), nil
  48. }
  49. func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
  50. for _, alg := range []string{string(digest.Canonical)} {
  51. items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
  52. if err != nil {
  53. continue
  54. }
  55. for _, fi := range items {
  56. if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
  57. p := filepath.Join(b.path, alg, fi.Name())
  58. err := os.RemoveAll(p)
  59. logrus.Debugf("cleaned up blob %v: %v", p, err)
  60. }
  61. }
  62. }
  63. }
  64. // WriteCommitCloser defines object that can be committed to blobstore.
  65. type WriteCommitCloser interface {
  66. io.WriteCloser
  67. Commit() (digest.Digest, error)
  68. }
  69. type insertion struct {
  70. io.Writer
  71. f *os.File
  72. digester digest.Digester
  73. closed bool
  74. }
  75. func newInsertion(tempFile *os.File) *insertion {
  76. digester := digest.Canonical.New()
  77. return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
  78. }
  79. func (i *insertion) Commit() (digest.Digest, error) {
  80. p := i.f.Name()
  81. d := filepath.Join(filepath.Join(p, "../../"))
  82. i.f.Sync()
  83. defer os.RemoveAll(p)
  84. if err := i.f.Close(); err != nil {
  85. return "", err
  86. }
  87. i.closed = true
  88. dgst := i.digester.Digest()
  89. if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
  90. return "", errors.Wrapf(err, "failed to mkdir %v", d)
  91. }
  92. if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
  93. return "", errors.Wrapf(err, "failed to rename %v", p)
  94. }
  95. return dgst, nil
  96. }
  97. func (i *insertion) Close() error {
  98. if i.closed {
  99. return nil
  100. }
  101. defer os.RemoveAll(i.f.Name())
  102. return i.f.Close()
  103. }
  104. type downloadManager struct {
  105. blobStore blobstore
  106. tmpDir string
  107. blobs []digest.Digest
  108. configDigest digest.Digest
  109. }
  110. func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  111. for _, l := range layers {
  112. b, err := dm.blobStore.New()
  113. if err != nil {
  114. return initialRootFS, nil, err
  115. }
  116. defer b.Close()
  117. rc, _, err := l.Download(ctx, progressOutput)
  118. if err != nil {
  119. return initialRootFS, nil, errors.Wrap(err, "failed to download")
  120. }
  121. defer rc.Close()
  122. r := io.TeeReader(rc, b)
  123. inflatedLayerData, err := archive.DecompressStream(r)
  124. if err != nil {
  125. return initialRootFS, nil, err
  126. }
  127. digester := digest.Canonical.New()
  128. if _, err := archive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
  129. return initialRootFS, nil, err
  130. }
  131. initialRootFS.Append(layer.DiffID(digester.Digest()))
  132. d, err := b.Commit()
  133. if err != nil {
  134. return initialRootFS, nil, err
  135. }
  136. dm.blobs = append(dm.blobs, d)
  137. }
  138. return initialRootFS, nil, nil
  139. }
  140. func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
  141. b, err := dm.blobStore.New()
  142. if err != nil {
  143. return "", err
  144. }
  145. defer b.Close()
  146. n, err := b.Write(dt)
  147. if err != nil {
  148. return "", err
  149. }
  150. if n != len(dt) {
  151. return "", io.ErrShortWrite
  152. }
  153. d, err := b.Commit()
  154. dm.configDigest = d
  155. return d, err
  156. }
  157. func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
  158. return nil, digest.ErrDigestNotFound
  159. }
  160. func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
  161. return configToRootFS(c)
  162. }