blobstore.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package plugin
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/distribution/xfer"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/layer"
  12. "github.com/docker/docker/pkg/archive"
  13. "github.com/docker/docker/pkg/chrootarchive"
  14. "github.com/docker/docker/pkg/progress"
  15. "github.com/opencontainers/go-digest"
  16. "github.com/pkg/errors"
  17. "golang.org/x/net/context"
  18. )
  19. type blobstore interface {
  20. New() (WriteCommitCloser, error)
  21. Get(dgst digest.Digest) (io.ReadCloser, error)
  22. Size(dgst digest.Digest) (int64, error)
  23. }
  24. type basicBlobStore struct {
  25. path string
  26. }
  27. func newBasicBlobStore(p string) (*basicBlobStore, error) {
  28. tmpdir := filepath.Join(p, "tmp")
  29. if err := os.MkdirAll(tmpdir, 0700); err != nil {
  30. return nil, errors.Wrapf(err, "failed to mkdir %v", p)
  31. }
  32. return &basicBlobStore{path: p}, nil
  33. }
  34. func (b *basicBlobStore) New() (WriteCommitCloser, error) {
  35. f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
  36. if err != nil {
  37. return nil, errors.Wrap(err, "failed to create temp file")
  38. }
  39. return newInsertion(f), nil
  40. }
  41. func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
  42. return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  43. }
  44. func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
  45. stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  46. if err != nil {
  47. return 0, err
  48. }
  49. return stat.Size(), nil
  50. }
  51. func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
  52. for _, alg := range []string{string(digest.Canonical)} {
  53. items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
  54. if err != nil {
  55. continue
  56. }
  57. for _, fi := range items {
  58. if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
  59. p := filepath.Join(b.path, alg, fi.Name())
  60. err := os.RemoveAll(p)
  61. logrus.Debugf("cleaned up blob %v: %v", p, err)
  62. }
  63. }
  64. }
  65. }
  66. // WriteCommitCloser defines object that can be committed to blobstore.
  67. type WriteCommitCloser interface {
  68. io.WriteCloser
  69. Commit() (digest.Digest, error)
  70. }
  71. type insertion struct {
  72. io.Writer
  73. f *os.File
  74. digester digest.Digester
  75. closed bool
  76. }
  77. func newInsertion(tempFile *os.File) *insertion {
  78. digester := digest.Canonical.Digester()
  79. return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
  80. }
  81. func (i *insertion) Commit() (digest.Digest, error) {
  82. p := i.f.Name()
  83. d := filepath.Join(filepath.Join(p, "../../"))
  84. i.f.Sync()
  85. defer os.RemoveAll(p)
  86. if err := i.f.Close(); err != nil {
  87. return "", err
  88. }
  89. i.closed = true
  90. dgst := i.digester.Digest()
  91. if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
  92. return "", errors.Wrapf(err, "failed to mkdir %v", d)
  93. }
  94. if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
  95. return "", errors.Wrapf(err, "failed to rename %v", p)
  96. }
  97. return dgst, nil
  98. }
  99. func (i *insertion) Close() error {
  100. if i.closed {
  101. return nil
  102. }
  103. defer os.RemoveAll(i.f.Name())
  104. return i.f.Close()
  105. }
  106. type downloadManager struct {
  107. blobStore blobstore
  108. tmpDir string
  109. blobs []digest.Digest
  110. configDigest digest.Digest
  111. }
  112. func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  113. for _, l := range layers {
  114. b, err := dm.blobStore.New()
  115. if err != nil {
  116. return initialRootFS, nil, err
  117. }
  118. defer b.Close()
  119. rc, _, err := l.Download(ctx, progressOutput)
  120. if err != nil {
  121. return initialRootFS, nil, errors.Wrap(err, "failed to download")
  122. }
  123. defer rc.Close()
  124. r := io.TeeReader(rc, b)
  125. inflatedLayerData, err := archive.DecompressStream(r)
  126. if err != nil {
  127. return initialRootFS, nil, err
  128. }
  129. digester := digest.Canonical.Digester()
  130. if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
  131. return initialRootFS, nil, err
  132. }
  133. initialRootFS.Append(layer.DiffID(digester.Digest()))
  134. d, err := b.Commit()
  135. if err != nil {
  136. return initialRootFS, nil, err
  137. }
  138. dm.blobs = append(dm.blobs, d)
  139. }
  140. return initialRootFS, nil, nil
  141. }
  142. func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
  143. b, err := dm.blobStore.New()
  144. if err != nil {
  145. return "", err
  146. }
  147. defer b.Close()
  148. n, err := b.Write(dt)
  149. if err != nil {
  150. return "", err
  151. }
  152. if n != len(dt) {
  153. return "", io.ErrShortWrite
  154. }
  155. d, err := b.Commit()
  156. dm.configDigest = d
  157. return d, err
  158. }
  159. func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
  160. return nil, fmt.Errorf("digest not found")
  161. }
  162. func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
  163. return configToRootFS(c)
  164. }