push_v2.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. package distribution // import "github.com/docker/docker/distribution"
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/containerd/log"
  12. "github.com/distribution/reference"
  13. "github.com/docker/distribution"
  14. "github.com/docker/distribution/manifest/schema1"
  15. "github.com/docker/distribution/manifest/schema2"
  16. "github.com/docker/distribution/registry/api/errcode"
  17. "github.com/docker/distribution/registry/client"
  18. apitypes "github.com/docker/docker/api/types"
  19. "github.com/docker/docker/distribution/metadata"
  20. "github.com/docker/docker/distribution/xfer"
  21. "github.com/docker/docker/layer"
  22. "github.com/docker/docker/pkg/ioutils"
  23. "github.com/docker/docker/pkg/progress"
  24. "github.com/docker/docker/pkg/stringid"
  25. "github.com/docker/docker/registry"
  26. "github.com/docker/libtrust"
  27. "github.com/opencontainers/go-digest"
  28. "github.com/pkg/errors"
  29. )
  30. const (
  31. smallLayerMaximumSize = 100 * (1 << 10) // 100KB
  32. middleLayerMaximumSize = 10 * (1 << 20) // 10MB
  33. )
  34. // newPusher creates a new pusher for pushing to a v2 registry.
  35. // The parameters are passed through to the underlying pusher implementation for
  36. // use during the actual push operation.
  37. func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher {
  38. return &pusher{
  39. metadataService: metadata.NewV2MetadataService(config.MetadataStore),
  40. ref: ref,
  41. endpoint: endpoint,
  42. repoInfo: repoInfo,
  43. config: config,
  44. }
  45. }
  46. type pusher struct {
  47. metadataService metadata.V2MetadataService
  48. ref reference.Named
  49. endpoint registry.APIEndpoint
  50. repoInfo *registry.RepositoryInfo
  51. config *ImagePushConfig
  52. repo distribution.Repository
  53. // pushState is state built by the Upload functions.
  54. pushState pushState
  55. }
  56. type pushState struct {
  57. sync.Mutex
  58. // remoteLayers is the set of layers known to exist on the remote side.
  59. // This avoids redundant queries when pushing multiple tags that
  60. // involve the same layers. It is also used to fill in digest and size
  61. // information when building the manifest.
  62. remoteLayers map[layer.DiffID]distribution.Descriptor
  63. hasAuthInfo bool
  64. }
  65. // TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
  66. func (p *pusher) push(ctx context.Context) (err error) {
  67. p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
  68. p.repo, err = newRepository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
  69. p.pushState.hasAuthInfo = p.config.AuthConfig.RegistryToken != "" || (p.config.AuthConfig.Username != "" && p.config.AuthConfig.Password != "")
  70. if err != nil {
  71. log.G(ctx).Debugf("Error getting v2 registry: %v", err)
  72. return err
  73. }
  74. if err = p.pushRepository(ctx); err != nil {
  75. if continueOnError(err, p.endpoint.Mirror) {
  76. return fallbackError{
  77. err: err,
  78. transportOK: true,
  79. }
  80. }
  81. }
  82. return err
  83. }
  84. func (p *pusher) pushRepository(ctx context.Context) (err error) {
  85. if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
  86. imageID, err := p.config.ReferenceStore.Get(p.ref)
  87. if err != nil {
  88. return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
  89. }
  90. return p.pushTag(ctx, namedTagged, imageID)
  91. }
  92. if !reference.IsNameOnly(p.ref) {
  93. return errors.New("cannot push a digest reference")
  94. }
  95. // Push all tags
  96. pushed := 0
  97. for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
  98. if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
  99. pushed++
  100. if err := p.pushTag(ctx, namedTagged, association.ID); err != nil {
  101. return err
  102. }
  103. }
  104. }
  105. if pushed == 0 {
  106. return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name))
  107. }
  108. return nil
  109. }
  110. func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
  111. log.G(ctx).Debugf("Pushing repository: %s", reference.FamiliarString(ref))
  112. imgConfig, err := p.config.ImageStore.Get(ctx, id)
  113. if err != nil {
  114. return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
  115. }
  116. rootfs, err := rootFSFromConfig(imgConfig)
  117. if err != nil {
  118. return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
  119. }
  120. l, err := p.config.LayerStores.Get(rootfs.ChainID())
  121. if err != nil {
  122. return fmt.Errorf("failed to get top layer from image: %v", err)
  123. }
  124. defer l.Release()
  125. hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
  126. if err != nil {
  127. return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
  128. }
  129. var descriptors []xfer.UploadDescriptor
  130. descriptorTemplate := pushDescriptor{
  131. metadataService: p.metadataService,
  132. hmacKey: hmacKey,
  133. repoInfo: p.repoInfo.Name,
  134. ref: p.ref,
  135. endpoint: p.endpoint,
  136. repo: p.repo,
  137. pushState: &p.pushState,
  138. }
  139. // Loop bounds condition is to avoid pushing the base layer on Windows.
  140. for range rootfs.DiffIDs {
  141. descriptor := descriptorTemplate
  142. descriptor.layer = l
  143. descriptor.checkedDigests = make(map[digest.Digest]struct{})
  144. descriptors = append(descriptors, &descriptor)
  145. l = l.Parent()
  146. }
  147. if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
  148. return err
  149. }
  150. // Try schema2 first
  151. builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
  152. manifest, err := manifestFromBuilder(ctx, builder, descriptors)
  153. if err != nil {
  154. return err
  155. }
  156. manSvc, err := p.repo.Manifests(ctx)
  157. if err != nil {
  158. return err
  159. }
  160. putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
  161. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  162. if runtime.GOOS == "windows" {
  163. log.G(ctx).Warnf("failed to upload schema2 manifest: %v", err)
  164. return err
  165. }
  166. // This is a temporary environment variables used in CI to allow pushing
  167. // manifest v2 schema 1 images to test-registries used for testing *pulling*
  168. // these images.
  169. if os.Getenv("DOCKER_ALLOW_SCHEMA1_PUSH_DONOTUSE") == "" {
  170. if err.Error() == "tag invalid" {
  171. msg := "[DEPRECATED] support for pushing manifest v2 schema1 images has been removed. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/"
  172. log.G(ctx).WithError(err).Error(msg)
  173. return errors.Wrap(err, msg)
  174. }
  175. return err
  176. }
  177. log.G(ctx).Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
  178. // Note: this fallback is deprecated, see log messages below
  179. manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag())
  180. if err != nil {
  181. return err
  182. }
  183. pk, err := libtrust.GenerateECP256PrivateKey()
  184. if err != nil {
  185. return errors.Wrap(err, "unexpected error generating private key")
  186. }
  187. builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), pk, manifestRef, imgConfig)
  188. manifest, err = manifestFromBuilder(ctx, builder, descriptors)
  189. if err != nil {
  190. return err
  191. }
  192. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  193. return err
  194. }
  195. // schema2 failed but schema1 succeeded
  196. msg := fmt.Sprintf("[DEPRECATION NOTICE] support for pushing manifest v2 schema1 images will be removed in an upcoming release. Please contact admins of the %s registry NOW to avoid future disruption. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", reference.Domain(ref))
  197. log.G(ctx).Warn(msg)
  198. progress.Message(p.config.ProgressOutput, "", msg)
  199. }
  200. var canonicalManifest []byte
  201. switch v := manifest.(type) {
  202. case *schema1.SignedManifest:
  203. canonicalManifest = v.Canonical
  204. case *schema2.DeserializedManifest:
  205. _, canonicalManifest, err = v.Payload()
  206. if err != nil {
  207. return err
  208. }
  209. }
  210. manifestDigest := digest.FromBytes(canonicalManifest)
  211. progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
  212. if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
  213. return err
  214. }
  215. // Signal digest to the trust client so it can sign the
  216. // push, if appropriate.
  217. progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)})
  218. return nil
  219. }
  220. func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
  221. // descriptors is in reverse order; iterate backwards to get references
  222. // appended in the right order.
  223. for i := len(descriptors) - 1; i >= 0; i-- {
  224. if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil {
  225. return nil, err
  226. }
  227. }
  228. return builder.Build(ctx)
  229. }
  230. type pushDescriptor struct {
  231. layer PushLayer
  232. metadataService metadata.V2MetadataService
  233. hmacKey []byte
  234. repoInfo reference.Named
  235. ref reference.Named
  236. endpoint registry.APIEndpoint
  237. repo distribution.Repository
  238. pushState *pushState
  239. remoteDescriptor distribution.Descriptor
  240. // a set of digests whose presence has been checked in a target repository
  241. checkedDigests map[digest.Digest]struct{}
  242. }
  243. func (pd *pushDescriptor) Key() string {
  244. return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
  245. }
  246. func (pd *pushDescriptor) ID() string {
  247. return stringid.TruncateID(pd.layer.DiffID().String())
  248. }
  249. func (pd *pushDescriptor) DiffID() layer.DiffID {
  250. return pd.layer.DiffID()
  251. }
  252. func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
  253. // Skip foreign layers unless this registry allows nondistributable artifacts.
  254. if !pd.endpoint.AllowNondistributableArtifacts {
  255. if fs, ok := pd.layer.(distribution.Describable); ok {
  256. if d := fs.Descriptor(); len(d.URLs) > 0 {
  257. progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
  258. return d, nil
  259. }
  260. }
  261. }
  262. diffID := pd.DiffID()
  263. pd.pushState.Lock()
  264. if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
  265. // it is already known that the push is not needed and
  266. // therefore doing a stat is unnecessary
  267. pd.pushState.Unlock()
  268. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  269. return descriptor, nil
  270. }
  271. pd.pushState.Unlock()
  272. maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
  273. // Do we have any metadata associated with this layer's DiffID?
  274. metaData, err := pd.metadataService.GetMetadata(diffID)
  275. if err == nil {
  276. // check for blob existence in the target repository
  277. descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData)
  278. if exists || err != nil {
  279. return descriptor, err
  280. }
  281. }
  282. // if digest was empty or not saved, or if blob does not exist on the remote repository,
  283. // then push the blob.
  284. bs := pd.repo.Blobs(ctx)
  285. var layerUpload distribution.BlobWriter
  286. // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
  287. candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData)
  288. isUnauthorizedError := false
  289. for _, mc := range candidates {
  290. mountCandidate := mc
  291. log.G(ctx).Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
  292. createOpts := []distribution.BlobCreateOption{}
  293. if len(mountCandidate.SourceRepository) > 0 {
  294. namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
  295. if err != nil {
  296. log.G(ctx).WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef))
  297. _ = pd.metadataService.Remove(mountCandidate)
  298. continue
  299. }
  300. // Candidates are always under same domain, create remote reference
  301. // with only path to set mount from with
  302. remoteRef, err := reference.WithName(reference.Path(namedRef))
  303. if err != nil {
  304. log.G(ctx).WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef))
  305. continue
  306. }
  307. canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
  308. if err != nil {
  309. log.G(ctx).WithError(err).Error("failed to make canonical reference")
  310. continue
  311. }
  312. createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
  313. }
  314. // send the layer
  315. lu, err := bs.Create(ctx, createOpts...)
  316. switch err := err.(type) {
  317. case nil:
  318. // noop
  319. case distribution.ErrBlobMounted:
  320. progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
  321. err.Descriptor.MediaType = schema2.MediaTypeLayer
  322. pd.pushState.Lock()
  323. pd.pushState.remoteLayers[diffID] = err.Descriptor
  324. pd.pushState.Unlock()
  325. // Cache mapping from this layer's DiffID to the blobsum
  326. if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  327. Digest: err.Descriptor.Digest,
  328. SourceRepository: pd.repoInfo.Name(),
  329. }); err != nil {
  330. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  331. }
  332. return err.Descriptor, nil
  333. case errcode.Errors:
  334. for _, e := range err {
  335. switch e := e.(type) {
  336. case errcode.Error:
  337. if e.Code == errcode.ErrorCodeUnauthorized {
  338. // when unauthorized error that indicate user don't has right to push layer to register
  339. log.G(ctx).Debugln("failed to push layer to registry because unauthorized error")
  340. isUnauthorizedError = true
  341. }
  342. default:
  343. }
  344. }
  345. default:
  346. log.G(ctx).Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
  347. }
  348. // when error is unauthorizedError and user don't hasAuthInfo that's the case user don't has right to push layer to register
  349. // and he hasn't login either, in this case candidate cache should be removed
  350. if len(mountCandidate.SourceRepository) > 0 &&
  351. !(isUnauthorizedError && !pd.pushState.hasAuthInfo) &&
  352. (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
  353. len(mountCandidate.HMAC) == 0) {
  354. cause := "blob mount failure"
  355. if err != nil {
  356. cause = fmt.Sprintf("an error: %v", err.Error())
  357. }
  358. log.G(ctx).Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
  359. _ = pd.metadataService.Remove(mountCandidate)
  360. }
  361. if lu != nil {
  362. // cancel previous upload
  363. cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
  364. layerUpload = lu
  365. }
  366. }
  367. if maxExistenceChecks-len(pd.checkedDigests) > 0 {
  368. // do additional layer existence checks with other known digests if any
  369. descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData)
  370. if exists || err != nil {
  371. return descriptor, err
  372. }
  373. }
  374. log.G(ctx).Debugf("Pushing layer: %s", diffID)
  375. if layerUpload == nil {
  376. layerUpload, err = bs.Create(ctx)
  377. if err != nil {
  378. return distribution.Descriptor{}, retryOnError(err)
  379. }
  380. }
  381. defer layerUpload.Close()
  382. // upload the blob
  383. return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
  384. }
  385. func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
  386. pd.remoteDescriptor = descriptor
  387. }
  388. func (pd *pushDescriptor) Descriptor() distribution.Descriptor {
  389. return pd.remoteDescriptor
  390. }
  391. func (pd *pushDescriptor) uploadUsingSession(
  392. ctx context.Context,
  393. progressOutput progress.Output,
  394. diffID layer.DiffID,
  395. layerUpload distribution.BlobWriter,
  396. ) (distribution.Descriptor, error) {
  397. var reader io.ReadCloser
  398. contentReader, err := pd.layer.Open()
  399. if err != nil {
  400. return distribution.Descriptor{}, retryOnError(err)
  401. }
  402. reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, pd.layer.Size(), pd.ID(), "Pushing")
  403. switch m := pd.layer.MediaType(); m {
  404. case schema2.MediaTypeUncompressedLayer:
  405. compressedReader, compressionDone := compress(reader)
  406. defer func(closer io.Closer) {
  407. closer.Close()
  408. <-compressionDone
  409. }(reader)
  410. reader = compressedReader
  411. case schema2.MediaTypeLayer:
  412. default:
  413. reader.Close()
  414. return distribution.Descriptor{}, xfer.DoNotRetry{Err: fmt.Errorf("unsupported layer media type %s", m)}
  415. }
  416. digester := digest.Canonical.Digester()
  417. tee := io.TeeReader(reader, digester.Hash())
  418. nn, err := layerUpload.ReadFrom(tee)
  419. reader.Close()
  420. if err != nil {
  421. return distribution.Descriptor{}, retryOnError(err)
  422. }
  423. pushDigest := digester.Digest()
  424. if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
  425. return distribution.Descriptor{}, retryOnError(err)
  426. }
  427. log.G(ctx).Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
  428. progress.Update(progressOutput, pd.ID(), "Pushed")
  429. // Cache mapping from this layer's DiffID to the blobsum
  430. if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  431. Digest: pushDigest,
  432. SourceRepository: pd.repoInfo.Name(),
  433. }); err != nil {
  434. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  435. }
  436. desc := distribution.Descriptor{
  437. Digest: pushDigest,
  438. MediaType: schema2.MediaTypeLayer,
  439. Size: nn,
  440. }
  441. pd.pushState.Lock()
  442. pd.pushState.remoteLayers[diffID] = desc
  443. pd.pushState.Unlock()
  444. return desc, nil
  445. }
  446. // layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
  447. // slice. If it finds one that the registry knows about, it returns the known digest and "true". If
  448. // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
  449. // (not just the target one).
  450. func (pd *pushDescriptor) layerAlreadyExists(
  451. ctx context.Context,
  452. progressOutput progress.Output,
  453. diffID layer.DiffID,
  454. checkOtherRepositories bool,
  455. maxExistenceCheckAttempts int,
  456. v2Metadata []metadata.V2Metadata,
  457. ) (desc distribution.Descriptor, exists bool, err error) {
  458. // filter the metadata
  459. candidates := []metadata.V2Metadata{}
  460. for _, meta := range v2Metadata {
  461. if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() {
  462. continue
  463. }
  464. candidates = append(candidates, meta)
  465. }
  466. // sort the candidates by similarity
  467. sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
  468. digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
  469. // an array of unique blob digests ordered from the best mount candidates to worst
  470. layerDigests := []digest.Digest{}
  471. for i := 0; i < len(candidates); i++ {
  472. if len(layerDigests) >= maxExistenceCheckAttempts {
  473. break
  474. }
  475. meta := &candidates[i]
  476. if _, exists := digestToMetadata[meta.Digest]; exists {
  477. // keep reference just to the first mapping (the best mount candidate)
  478. continue
  479. }
  480. if _, exists := pd.checkedDigests[meta.Digest]; exists {
  481. // existence of this digest has already been tested
  482. continue
  483. }
  484. digestToMetadata[meta.Digest] = meta
  485. layerDigests = append(layerDigests, meta.Digest)
  486. }
  487. attempts:
  488. for _, dgst := range layerDigests {
  489. meta := digestToMetadata[dgst]
  490. log.G(ctx).Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
  491. desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
  492. pd.checkedDigests[meta.Digest] = struct{}{}
  493. switch err {
  494. case nil:
  495. if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
  496. // cache mapping from this layer's DiffID to the blobsum
  497. if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  498. Digest: desc.Digest,
  499. SourceRepository: pd.repoInfo.Name(),
  500. }); err != nil {
  501. return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
  502. }
  503. }
  504. desc.MediaType = schema2.MediaTypeLayer
  505. exists = true
  506. break attempts
  507. case distribution.ErrBlobUnknown:
  508. if meta.SourceRepository == pd.repoInfo.Name() {
  509. // remove the mapping to the target repository
  510. pd.metadataService.Remove(*meta)
  511. }
  512. default:
  513. log.G(ctx).WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
  514. }
  515. }
  516. if exists {
  517. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  518. pd.pushState.Lock()
  519. pd.pushState.remoteLayers[diffID] = desc
  520. pd.pushState.Unlock()
  521. }
  522. return desc, exists, nil
  523. }
  524. // getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
  525. // source repositories of target registry, maximum number of layer existence checks performed on the target
  526. // repository and whether the check shall be done also with digests mapped to different repositories. The
  527. // decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
  528. // of upload does not outweigh a latency.
  529. func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
  530. size := layer.Size()
  531. switch {
  532. // big blob
  533. case size > middleLayerMaximumSize:
  534. // 1st attempt to mount the blob few times
  535. // 2nd few existence checks with digests associated to any repository
  536. // then fallback to upload
  537. return 4, 3, true
  538. // middle sized blobs; if we could not get the size, assume we deal with middle sized blob
  539. case size > smallLayerMaximumSize:
  540. // 1st attempt to mount blobs of average size few times
  541. // 2nd try at most 1 existence check if there's an existing mapping to the target repository
  542. // then fallback to upload
  543. return 3, 1, false
  544. // small blobs, do a minimum number of checks
  545. default:
  546. return 1, 1, false
  547. }
  548. }
  549. // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
  550. // array is sorted from youngest to oldest. The resulting array will contain only metadata entries having
  551. // registry part of SourceRepository matching the part of repoInfo.
  552. func getRepositoryMountCandidates(
  553. repoInfo reference.Named,
  554. hmacKey []byte,
  555. max int,
  556. v2Metadata []metadata.V2Metadata,
  557. ) []metadata.V2Metadata {
  558. candidates := []metadata.V2Metadata{}
  559. for _, meta := range v2Metadata {
  560. sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
  561. if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) {
  562. continue
  563. }
  564. // target repository is not a viable candidate
  565. if meta.SourceRepository == repoInfo.Name() {
  566. continue
  567. }
  568. candidates = append(candidates, meta)
  569. }
  570. sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
  571. if max >= 0 && len(candidates) > max {
  572. // select the youngest metadata
  573. candidates = candidates[:max]
  574. }
  575. return candidates
  576. }
  577. // byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
  578. // candidate "a" is preferred over "b":
  579. //
  580. // 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
  581. // "b" was not
  582. // 2. if a number of its repository path components exactly matching path components of target repository is higher
  583. type byLikeness struct {
  584. arr []metadata.V2Metadata
  585. hmacKey []byte
  586. pathComponents []string
  587. }
  588. func (bla byLikeness) Less(i, j int) bool {
  589. aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
  590. bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
  591. if aMacMatch != bMacMatch {
  592. return aMacMatch
  593. }
  594. aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
  595. bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
  596. return aMatch > bMatch
  597. }
  598. func (bla byLikeness) Swap(i, j int) {
  599. bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
  600. }
  601. func (bla byLikeness) Len() int { return len(bla.arr) }
  602. func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
  603. // reverse the metadata array to shift the newest entries to the beginning
  604. for i := 0; i < len(marr)/2; i++ {
  605. marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
  606. }
  607. // keep equal entries ordered from the youngest to the oldest
  608. sort.Stable(byLikeness{
  609. arr: marr,
  610. hmacKey: hmacKey,
  611. pathComponents: getPathComponents(repoInfo.Name()),
  612. })
  613. }
  614. // numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
  615. func numOfMatchingPathComponents(pth string, matchComponents []string) int {
  616. pthComponents := getPathComponents(pth)
  617. i := 0
  618. for ; i < len(pthComponents) && i < len(matchComponents); i++ {
  619. if matchComponents[i] != pthComponents[i] {
  620. return i
  621. }
  622. }
  623. return i
  624. }
  625. func getPathComponents(path string) []string {
  626. return strings.Split(path, "/")
  627. }
  628. func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
  629. if layerUpload != nil {
  630. log.G(ctx).Debugf("cancelling upload of blob %s", dgst)
  631. err := layerUpload.Cancel(ctx)
  632. if err != nil {
  633. log.G(ctx).Warnf("failed to cancel upload: %v", err)
  634. }
  635. }
  636. }