blobstore.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package plugin
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "github.com/docker/docker/distribution/xfer"
  9. "github.com/docker/docker/image"
  10. "github.com/docker/docker/layer"
  11. "github.com/docker/docker/pkg/archive"
  12. "github.com/docker/docker/pkg/chrootarchive"
  13. "github.com/docker/docker/pkg/progress"
  14. "github.com/opencontainers/go-digest"
  15. "github.com/pkg/errors"
  16. "github.com/sirupsen/logrus"
  17. "golang.org/x/net/context"
  18. )
  19. type blobstore interface {
  20. New() (WriteCommitCloser, error)
  21. Get(dgst digest.Digest) (io.ReadCloser, error)
  22. Size(dgst digest.Digest) (int64, error)
  23. }
  24. type basicBlobStore struct {
  25. path string
  26. }
  27. func newBasicBlobStore(p string) (*basicBlobStore, error) {
  28. tmpdir := filepath.Join(p, "tmp")
  29. if err := os.MkdirAll(tmpdir, 0700); err != nil {
  30. return nil, errors.Wrapf(err, "failed to mkdir %v", p)
  31. }
  32. return &basicBlobStore{path: p}, nil
  33. }
  34. func (b *basicBlobStore) New() (WriteCommitCloser, error) {
  35. f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
  36. if err != nil {
  37. return nil, errors.Wrap(err, "failed to create temp file")
  38. }
  39. return newInsertion(f), nil
  40. }
  41. func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
  42. return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  43. }
  44. func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
  45. stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
  46. if err != nil {
  47. return 0, err
  48. }
  49. return stat.Size(), nil
  50. }
  51. func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
  52. for _, alg := range []string{string(digest.Canonical)} {
  53. items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
  54. if err != nil {
  55. continue
  56. }
  57. for _, fi := range items {
  58. if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
  59. p := filepath.Join(b.path, alg, fi.Name())
  60. err := os.RemoveAll(p)
  61. logrus.Debugf("cleaned up blob %v: %v", p, err)
  62. }
  63. }
  64. }
  65. }
  66. // WriteCommitCloser defines object that can be committed to blobstore.
  67. type WriteCommitCloser interface {
  68. io.WriteCloser
  69. Commit() (digest.Digest, error)
  70. }
  71. type insertion struct {
  72. io.Writer
  73. f *os.File
  74. digester digest.Digester
  75. closed bool
  76. }
  77. func newInsertion(tempFile *os.File) *insertion {
  78. digester := digest.Canonical.Digester()
  79. return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
  80. }
  81. func (i *insertion) Commit() (digest.Digest, error) {
  82. p := i.f.Name()
  83. d := filepath.Join(filepath.Join(p, "../../"))
  84. i.f.Sync()
  85. defer os.RemoveAll(p)
  86. if err := i.f.Close(); err != nil {
  87. return "", err
  88. }
  89. i.closed = true
  90. dgst := i.digester.Digest()
  91. if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
  92. return "", errors.Wrapf(err, "failed to mkdir %v", d)
  93. }
  94. if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
  95. return "", errors.Wrapf(err, "failed to rename %v", p)
  96. }
  97. return dgst, nil
  98. }
  99. func (i *insertion) Close() error {
  100. if i.closed {
  101. return nil
  102. }
  103. defer os.RemoveAll(i.f.Name())
  104. return i.f.Close()
  105. }
  106. type downloadManager struct {
  107. blobStore blobstore
  108. tmpDir string
  109. blobs []digest.Digest
  110. configDigest digest.Digest
  111. }
  112. func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, platform layer.Platform, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  113. // TODO @jhowardmsft LCOW: May need revisiting.
  114. for _, l := range layers {
  115. b, err := dm.blobStore.New()
  116. if err != nil {
  117. return initialRootFS, nil, err
  118. }
  119. defer b.Close()
  120. rc, _, err := l.Download(ctx, progressOutput)
  121. if err != nil {
  122. return initialRootFS, nil, errors.Wrap(err, "failed to download")
  123. }
  124. defer rc.Close()
  125. r := io.TeeReader(rc, b)
  126. inflatedLayerData, err := archive.DecompressStream(r)
  127. if err != nil {
  128. return initialRootFS, nil, err
  129. }
  130. digester := digest.Canonical.Digester()
  131. if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
  132. return initialRootFS, nil, err
  133. }
  134. initialRootFS.Append(layer.DiffID(digester.Digest()))
  135. d, err := b.Commit()
  136. if err != nil {
  137. return initialRootFS, nil, err
  138. }
  139. dm.blobs = append(dm.blobs, d)
  140. }
  141. return initialRootFS, nil, nil
  142. }
  143. func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
  144. b, err := dm.blobStore.New()
  145. if err != nil {
  146. return "", err
  147. }
  148. defer b.Close()
  149. n, err := b.Write(dt)
  150. if err != nil {
  151. return "", err
  152. }
  153. if n != len(dt) {
  154. return "", io.ErrShortWrite
  155. }
  156. d, err := b.Commit()
  157. dm.configDigest = d
  158. return d, err
  159. }
  160. func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
  161. return nil, fmt.Errorf("digest not found")
  162. }
  163. func (dm *downloadManager) RootFSAndPlatformFromConfig(c []byte) (*image.RootFS, layer.Platform, error) {
  164. return configToRootFS(c)
  165. }