store.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package images
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/containerd/containerd/content"
  6. cerrdefs "github.com/containerd/containerd/errdefs"
  7. "github.com/containerd/containerd/leases"
  8. "github.com/containerd/containerd/namespaces"
  9. "github.com/containerd/log"
  10. "github.com/docker/docker/distribution"
  11. "github.com/docker/docker/image"
  12. "github.com/docker/docker/layer"
  13. "github.com/opencontainers/go-digest"
  14. "github.com/pkg/errors"
  15. )
  16. const imageKeyPrefix = "moby-image-"
  17. func imageKey(dgst string) string {
  18. return imageKeyPrefix + dgst
  19. }
  20. // imageStoreWithLease wraps the configured image store with one that deletes the lease
  21. // reigstered for a given image ID, if one exists
  22. //
  23. // This is used by the main image service to wrap delete calls to the real image store.
  24. type imageStoreWithLease struct {
  25. image.Store
  26. leases leases.Manager
  27. // Normally we'd pass namespace down through a context.Context, however...
  28. // The interface for image store doesn't allow this, so we store it here.
  29. ns string
  30. }
  31. func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
  32. ctx := namespaces.WithNamespace(context.TODO(), s.ns)
  33. if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(id.String())}); err != nil && !cerrdefs.IsNotFound(err) {
  34. return nil, errors.Wrap(err, "error deleting lease")
  35. }
  36. return s.Store.Delete(id)
  37. }
  38. // iamgeStoreForPull is created for each pull It wraps an underlying image store
  39. // to handle registering leases for content fetched in a single image pull.
  40. type imageStoreForPull struct {
  41. distribution.ImageConfigStore
  42. leases leases.Manager
  43. ingested *contentStoreForPull
  44. }
  45. func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
  46. id, err := s.ImageConfigStore.Put(ctx, config)
  47. if err != nil {
  48. return "", err
  49. }
  50. return id, s.updateLease(ctx, id)
  51. }
  52. func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
  53. id, err := s.ImageConfigStore.Get(ctx, dgst)
  54. if err != nil {
  55. return nil, err
  56. }
  57. return id, s.updateLease(ctx, dgst)
  58. }
  59. func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
  60. leaseID := imageKey(dgst.String())
  61. lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
  62. if err != nil {
  63. if !cerrdefs.IsAlreadyExists(err) {
  64. return errors.Wrap(err, "error creating lease")
  65. }
  66. lease = leases.Lease{ID: leaseID}
  67. }
  68. digested := s.ingested.getDigested()
  69. resource := leases.Resource{
  70. Type: "content",
  71. }
  72. for _, dgst := range digested {
  73. log.G(ctx).WithFields(log.Fields{
  74. "digest": dgst,
  75. "lease": lease.ID,
  76. }).Debug("Adding content digest to lease")
  77. resource.ID = dgst.String()
  78. if err := s.leases.AddResource(ctx, lease, resource); err != nil {
  79. return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
  80. }
  81. }
  82. return nil
  83. }
  84. // contentStoreForPull is used to wrap the configured content store to
  85. // add lease management for a single `pull`
  86. // It stores all committed digests so that `imageStoreForPull` can add
  87. // the digsted resources to the lease for an image.
  88. type contentStoreForPull struct {
  89. distribution.ContentStore
  90. leases leases.Manager
  91. mu sync.Mutex
  92. digested []digest.Digest
  93. }
  94. func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
  95. c.mu.Lock()
  96. c.digested = append(c.digested, dgst)
  97. c.mu.Unlock()
  98. }
  99. func (c *contentStoreForPull) getDigested() []digest.Digest {
  100. c.mu.Lock()
  101. digested := make([]digest.Digest, len(c.digested))
  102. copy(digested, c.digested)
  103. c.mu.Unlock()
  104. return digested
  105. }
  106. func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
  107. w, err := c.ContentStore.Writer(ctx, opts...)
  108. if err != nil {
  109. if cerrdefs.IsAlreadyExists(err) {
  110. var cfg content.WriterOpts
  111. for _, o := range opts {
  112. if err := o(&cfg); err != nil {
  113. return nil, err
  114. }
  115. }
  116. c.addDigested(cfg.Desc.Digest)
  117. }
  118. return nil, err
  119. }
  120. return &contentWriter{
  121. cs: c,
  122. Writer: w,
  123. }, nil
  124. }
  125. type contentWriter struct {
  126. cs *contentStoreForPull
  127. content.Writer
  128. }
  129. func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
  130. err := w.Writer.Commit(ctx, size, expected, opts...)
  131. if err == nil || cerrdefs.IsAlreadyExists(err) {
  132. w.cs.addDigested(expected)
  133. }
  134. return err
  135. }