push_v2.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. package distribution
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "runtime"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "golang.org/x/net/context"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/distribution"
  13. "github.com/docker/distribution/digest"
  14. "github.com/docker/distribution/manifest/schema1"
  15. "github.com/docker/distribution/manifest/schema2"
  16. distreference "github.com/docker/distribution/reference"
  17. "github.com/docker/distribution/registry/client"
  18. "github.com/docker/docker/distribution/metadata"
  19. "github.com/docker/docker/distribution/xfer"
  20. "github.com/docker/docker/image"
  21. "github.com/docker/docker/layer"
  22. "github.com/docker/docker/pkg/ioutils"
  23. "github.com/docker/docker/pkg/progress"
  24. "github.com/docker/docker/pkg/stringid"
  25. "github.com/docker/docker/reference"
  26. "github.com/docker/docker/registry"
  27. )
  28. const (
  29. smallLayerMaximumSize = 100 * (1 << 10) // 100KB
  30. middleLayerMaximumSize = 10 * (1 << 20) // 10MB
  31. )
  32. // PushResult contains the tag, manifest digest, and manifest size from the
  33. // push. It's used to signal this information to the trust code in the client
  34. // so it can sign the manifest if necessary.
  35. type PushResult struct {
  36. Tag string
  37. Digest digest.Digest
  38. Size int
  39. }
  40. type v2Pusher struct {
  41. v2MetadataService metadata.V2MetadataService
  42. ref reference.Named
  43. endpoint registry.APIEndpoint
  44. repoInfo *registry.RepositoryInfo
  45. config *ImagePushConfig
  46. repo distribution.Repository
  47. // pushState is state built by the Upload functions.
  48. pushState pushState
  49. }
  50. type pushState struct {
  51. sync.Mutex
  52. // remoteLayers is the set of layers known to exist on the remote side.
  53. // This avoids redundant queries when pushing multiple tags that
  54. // involve the same layers. It is also used to fill in digest and size
  55. // information when building the manifest.
  56. remoteLayers map[layer.DiffID]distribution.Descriptor
  57. // confirmedV2 is set to true if we confirm we're talking to a v2
  58. // registry. This is used to limit fallbacks to the v1 protocol.
  59. confirmedV2 bool
  60. }
  61. func (p *v2Pusher) Push(ctx context.Context) (err error) {
  62. p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
  63. p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
  64. if err != nil {
  65. logrus.Debugf("Error getting v2 registry: %v", err)
  66. return err
  67. }
  68. if err = p.pushV2Repository(ctx); err != nil {
  69. if continueOnError(err) {
  70. return fallbackError{
  71. err: err,
  72. confirmedV2: p.pushState.confirmedV2,
  73. transportOK: true,
  74. }
  75. }
  76. }
  77. return err
  78. }
  79. func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
  80. if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
  81. imageID, err := p.config.ReferenceStore.Get(p.ref)
  82. if err != nil {
  83. return fmt.Errorf("tag does not exist: %s", p.ref.String())
  84. }
  85. return p.pushV2Tag(ctx, namedTagged, imageID)
  86. }
  87. if !reference.IsNameOnly(p.ref) {
  88. return errors.New("cannot push a digest reference")
  89. }
  90. // Pull all tags
  91. pushed := 0
  92. for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
  93. if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
  94. pushed++
  95. if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
  96. return err
  97. }
  98. }
  99. }
  100. if pushed == 0 {
  101. return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
  102. }
  103. return nil
  104. }
  105. func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
  106. logrus.Debugf("Pushing repository: %s", ref.String())
  107. img, err := p.config.ImageStore.Get(image.IDFromDigest(id))
  108. if err != nil {
  109. return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
  110. }
  111. var l layer.Layer
  112. topLayerID := img.RootFS.ChainID()
  113. if topLayerID == "" {
  114. l = layer.EmptyLayer
  115. } else {
  116. l, err = p.config.LayerStore.Get(topLayerID)
  117. if err != nil {
  118. return fmt.Errorf("failed to get top layer from image: %v", err)
  119. }
  120. defer layer.ReleaseAndLog(p.config.LayerStore, l)
  121. }
  122. hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
  123. if err != nil {
  124. return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
  125. }
  126. var descriptors []xfer.UploadDescriptor
  127. descriptorTemplate := v2PushDescriptor{
  128. v2MetadataService: p.v2MetadataService,
  129. hmacKey: hmacKey,
  130. repoInfo: p.repoInfo,
  131. ref: p.ref,
  132. repo: p.repo,
  133. pushState: &p.pushState,
  134. }
  135. // Loop bounds condition is to avoid pushing the base layer on Windows.
  136. for i := 0; i < len(img.RootFS.DiffIDs); i++ {
  137. descriptor := descriptorTemplate
  138. descriptor.layer = l
  139. descriptor.checkedDigests = make(map[digest.Digest]struct{})
  140. descriptors = append(descriptors, &descriptor)
  141. l = l.Parent()
  142. }
  143. if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
  144. return err
  145. }
  146. // Try schema2 first
  147. builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
  148. manifest, err := manifestFromBuilder(ctx, builder, descriptors)
  149. if err != nil {
  150. return err
  151. }
  152. manSvc, err := p.repo.Manifests(ctx)
  153. if err != nil {
  154. return err
  155. }
  156. putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
  157. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  158. if runtime.GOOS == "windows" {
  159. logrus.Warnf("failed to upload schema2 manifest: %v", err)
  160. return err
  161. }
  162. logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
  163. manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
  164. if err != nil {
  165. return err
  166. }
  167. builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
  168. manifest, err = manifestFromBuilder(ctx, builder, descriptors)
  169. if err != nil {
  170. return err
  171. }
  172. if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
  173. return err
  174. }
  175. }
  176. var canonicalManifest []byte
  177. switch v := manifest.(type) {
  178. case *schema1.SignedManifest:
  179. canonicalManifest = v.Canonical
  180. case *schema2.DeserializedManifest:
  181. _, canonicalManifest, err = v.Payload()
  182. if err != nil {
  183. return err
  184. }
  185. }
  186. manifestDigest := digest.FromBytes(canonicalManifest)
  187. progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
  188. if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
  189. return err
  190. }
  191. // Signal digest to the trust client so it can sign the
  192. // push, if appropriate.
  193. progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})
  194. return nil
  195. }
  196. func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
  197. // descriptors is in reverse order; iterate backwards to get references
  198. // appended in the right order.
  199. for i := len(descriptors) - 1; i >= 0; i-- {
  200. if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
  201. return nil, err
  202. }
  203. }
  204. return builder.Build(ctx)
  205. }
  206. type v2PushDescriptor struct {
  207. layer layer.Layer
  208. v2MetadataService metadata.V2MetadataService
  209. hmacKey []byte
  210. repoInfo reference.Named
  211. ref reference.Named
  212. repo distribution.Repository
  213. pushState *pushState
  214. remoteDescriptor distribution.Descriptor
  215. // a set of digests whose presence has been checked in a target repository
  216. checkedDigests map[digest.Digest]struct{}
  217. }
  218. func (pd *v2PushDescriptor) Key() string {
  219. return "v2push:" + pd.ref.FullName() + " " + pd.layer.DiffID().String()
  220. }
  221. func (pd *v2PushDescriptor) ID() string {
  222. return stringid.TruncateID(pd.layer.DiffID().String())
  223. }
  224. func (pd *v2PushDescriptor) DiffID() layer.DiffID {
  225. return pd.layer.DiffID()
  226. }
  227. func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
  228. if fs, ok := pd.layer.(distribution.Describable); ok {
  229. if d := fs.Descriptor(); len(d.URLs) > 0 {
  230. progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
  231. return d, nil
  232. }
  233. }
  234. diffID := pd.DiffID()
  235. pd.pushState.Lock()
  236. if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
  237. // it is already known that the push is not needed and
  238. // therefore doing a stat is unnecessary
  239. pd.pushState.Unlock()
  240. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  241. return descriptor, nil
  242. }
  243. pd.pushState.Unlock()
  244. maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
  245. // Do we have any metadata associated with this layer's DiffID?
  246. v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
  247. if err == nil {
  248. // check for blob existence in the target repository if we have a mapping with it
  249. descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, false, 1, v2Metadata)
  250. if exists || err != nil {
  251. return descriptor, err
  252. }
  253. }
  254. // if digest was empty or not saved, or if blob does not exist on the remote repository,
  255. // then push the blob.
  256. bs := pd.repo.Blobs(ctx)
  257. var layerUpload distribution.BlobWriter
  258. // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
  259. candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
  260. for _, mountCandidate := range candidates {
  261. logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
  262. createOpts := []distribution.BlobCreateOption{}
  263. if len(mountCandidate.SourceRepository) > 0 {
  264. namedRef, err := reference.WithName(mountCandidate.SourceRepository)
  265. if err != nil {
  266. logrus.Errorf("failed to parse source repository reference %v: %v", namedRef.String(), err)
  267. pd.v2MetadataService.Remove(mountCandidate)
  268. continue
  269. }
  270. // TODO (brianbland): We need to construct a reference where the Name is
  271. // only the full remote name, so clean this up when distribution has a
  272. // richer reference package
  273. remoteRef, err := distreference.WithName(namedRef.RemoteName())
  274. if err != nil {
  275. logrus.Errorf("failed to make remote reference out of %q: %v", namedRef.RemoteName(), namedRef.RemoteName())
  276. continue
  277. }
  278. canonicalRef, err := distreference.WithDigest(distreference.TrimNamed(remoteRef), mountCandidate.Digest)
  279. if err != nil {
  280. logrus.Errorf("failed to make canonical reference: %v", err)
  281. continue
  282. }
  283. createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
  284. }
  285. // send the layer
  286. lu, err := bs.Create(ctx, createOpts...)
  287. switch err := err.(type) {
  288. case nil:
  289. // noop
  290. case distribution.ErrBlobMounted:
  291. progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
  292. err.Descriptor.MediaType = schema2.MediaTypeLayer
  293. pd.pushState.Lock()
  294. pd.pushState.confirmedV2 = true
  295. pd.pushState.remoteLayers[diffID] = err.Descriptor
  296. pd.pushState.Unlock()
  297. // Cache mapping from this layer's DiffID to the blobsum
  298. if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  299. Digest: err.Descriptor.Digest,
  300. SourceRepository: pd.repoInfo.FullName(),
  301. }); err != nil {
  302. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  303. }
  304. return err.Descriptor, nil
  305. default:
  306. logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
  307. }
  308. if len(mountCandidate.SourceRepository) > 0 &&
  309. (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
  310. len(mountCandidate.HMAC) == 0) {
  311. cause := "blob mount failure"
  312. if err != nil {
  313. cause = fmt.Sprintf("an error: %v", err.Error())
  314. }
  315. logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
  316. pd.v2MetadataService.Remove(mountCandidate)
  317. }
  318. if lu != nil {
  319. // cancel previous upload
  320. cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
  321. layerUpload = lu
  322. }
  323. }
  324. if maxExistenceChecks-len(pd.checkedDigests) > 0 {
  325. // do additional layer existence checks with other known digests if any
  326. descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
  327. if exists || err != nil {
  328. return descriptor, err
  329. }
  330. }
  331. logrus.Debugf("Pushing layer: %s", diffID)
  332. if layerUpload == nil {
  333. layerUpload, err = bs.Create(ctx)
  334. if err != nil {
  335. return distribution.Descriptor{}, retryOnError(err)
  336. }
  337. }
  338. defer layerUpload.Close()
  339. // upload the blob
  340. desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
  341. if err != nil {
  342. return desc, err
  343. }
  344. return desc, nil
  345. }
  346. func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
  347. pd.remoteDescriptor = descriptor
  348. }
  349. func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
  350. return pd.remoteDescriptor
  351. }
  352. func (pd *v2PushDescriptor) uploadUsingSession(
  353. ctx context.Context,
  354. progressOutput progress.Output,
  355. diffID layer.DiffID,
  356. layerUpload distribution.BlobWriter,
  357. ) (distribution.Descriptor, error) {
  358. arch, err := pd.layer.TarStream()
  359. if err != nil {
  360. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  361. }
  362. // don't care if this fails; best effort
  363. size, _ := pd.layer.DiffSize()
  364. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
  365. compressedReader, compressionDone := compress(reader)
  366. defer func() {
  367. reader.Close()
  368. <-compressionDone
  369. }()
  370. digester := digest.Canonical.New()
  371. tee := io.TeeReader(compressedReader, digester.Hash())
  372. nn, err := layerUpload.ReadFrom(tee)
  373. compressedReader.Close()
  374. if err != nil {
  375. return distribution.Descriptor{}, retryOnError(err)
  376. }
  377. pushDigest := digester.Digest()
  378. if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
  379. return distribution.Descriptor{}, retryOnError(err)
  380. }
  381. logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
  382. progress.Update(progressOutput, pd.ID(), "Pushed")
  383. // Cache mapping from this layer's DiffID to the blobsum
  384. if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  385. Digest: pushDigest,
  386. SourceRepository: pd.repoInfo.FullName(),
  387. }); err != nil {
  388. return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
  389. }
  390. desc := distribution.Descriptor{
  391. Digest: pushDigest,
  392. MediaType: schema2.MediaTypeLayer,
  393. Size: nn,
  394. }
  395. pd.pushState.Lock()
  396. // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
  397. pd.pushState.confirmedV2 = true
  398. pd.pushState.remoteLayers[diffID] = desc
  399. pd.pushState.Unlock()
  400. return desc, nil
  401. }
  402. // layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
  403. // slice. If it finds one that the registry knows about, it returns the known digest and "true". If
  404. // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
  405. // (not just the target one).
  406. func (pd *v2PushDescriptor) layerAlreadyExists(
  407. ctx context.Context,
  408. progressOutput progress.Output,
  409. diffID layer.DiffID,
  410. checkOtherRepositories bool,
  411. maxExistenceCheckAttempts int,
  412. v2Metadata []metadata.V2Metadata,
  413. ) (desc distribution.Descriptor, exists bool, err error) {
  414. // filter the metadata
  415. candidates := []metadata.V2Metadata{}
  416. for _, meta := range v2Metadata {
  417. if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.FullName() {
  418. continue
  419. }
  420. candidates = append(candidates, meta)
  421. }
  422. // sort the candidates by similarity
  423. sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
  424. digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
  425. // an array of unique blob digests ordered from the best mount candidates to worst
  426. layerDigests := []digest.Digest{}
  427. for i := 0; i < len(candidates); i++ {
  428. if len(layerDigests) >= maxExistenceCheckAttempts {
  429. break
  430. }
  431. meta := &candidates[i]
  432. if _, exists := digestToMetadata[meta.Digest]; exists {
  433. // keep reference just to the first mapping (the best mount candidate)
  434. continue
  435. }
  436. if _, exists := pd.checkedDigests[meta.Digest]; exists {
  437. // existence of this digest has already been tested
  438. continue
  439. }
  440. digestToMetadata[meta.Digest] = meta
  441. layerDigests = append(layerDigests, meta.Digest)
  442. }
  443. attempts:
  444. for _, dgst := range layerDigests {
  445. meta := digestToMetadata[dgst]
  446. logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName())
  447. desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
  448. pd.checkedDigests[meta.Digest] = struct{}{}
  449. switch err {
  450. case nil:
  451. if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.FullName() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
  452. // cache mapping from this layer's DiffID to the blobsum
  453. if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
  454. Digest: desc.Digest,
  455. SourceRepository: pd.repoInfo.FullName(),
  456. }); err != nil {
  457. return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
  458. }
  459. }
  460. desc.MediaType = schema2.MediaTypeLayer
  461. exists = true
  462. break attempts
  463. case distribution.ErrBlobUnknown:
  464. if meta.SourceRepository == pd.repoInfo.FullName() {
  465. // remove the mapping to the target repository
  466. pd.v2MetadataService.Remove(*meta)
  467. }
  468. default:
  469. logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName())
  470. }
  471. }
  472. if exists {
  473. progress.Update(progressOutput, pd.ID(), "Layer already exists")
  474. pd.pushState.Lock()
  475. pd.pushState.remoteLayers[diffID] = desc
  476. pd.pushState.Unlock()
  477. }
  478. return desc, exists, nil
  479. }
  480. // getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
  481. // source repositories of target registry, maximum number of layer existence checks performed on the target
  482. // repository and whether the check shall be done also with digests mapped to different repositories. The
  483. // decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
  484. // of upload does not outweigh a latency.
  485. func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
  486. size, err := layer.DiffSize()
  487. switch {
  488. // big blob
  489. case size > middleLayerMaximumSize:
  490. // 1st attempt to mount the blob few times
  491. // 2nd few existence checks with digests associated to any repository
  492. // then fallback to upload
  493. return 4, 3, true
  494. // middle sized blobs; if we could not get the size, assume we deal with middle sized blob
  495. case size > smallLayerMaximumSize, err != nil:
  496. // 1st attempt to mount blobs of average size few times
  497. // 2nd try at most 1 existence check if there's an existing mapping to the target repository
  498. // then fallback to upload
  499. return 3, 1, false
  500. // small blobs, do a minimum number of checks
  501. default:
  502. return 1, 1, false
  503. }
  504. }
  505. // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
  506. // array is sorted from youngest to oldest. If requireReigstryMatch is true, the resulting array will contain
  507. // only metadata entries having registry part of SourceRepository matching the part of repoInfo.
  508. func getRepositoryMountCandidates(
  509. repoInfo reference.Named,
  510. hmacKey []byte,
  511. max int,
  512. v2Metadata []metadata.V2Metadata,
  513. ) []metadata.V2Metadata {
  514. candidates := []metadata.V2Metadata{}
  515. for _, meta := range v2Metadata {
  516. sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
  517. if err != nil || repoInfo.Hostname() != sourceRepo.Hostname() {
  518. continue
  519. }
  520. // target repository is not a viable candidate
  521. if meta.SourceRepository == repoInfo.FullName() {
  522. continue
  523. }
  524. candidates = append(candidates, meta)
  525. }
  526. sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
  527. if max >= 0 && len(candidates) > max {
  528. // select the youngest metadata
  529. candidates = candidates[:max]
  530. }
  531. return candidates
  532. }
  533. // byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
  534. // candidate "a" is preferred over "b":
  535. //
  536. // 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
  537. // "b" was not
  538. // 2. if a number of its repository path components exactly matching path components of target repository is higher
  539. type byLikeness struct {
  540. arr []metadata.V2Metadata
  541. hmacKey []byte
  542. pathComponents []string
  543. }
  544. func (bla byLikeness) Less(i, j int) bool {
  545. aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
  546. bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
  547. if aMacMatch != bMacMatch {
  548. return aMacMatch
  549. }
  550. aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
  551. bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
  552. return aMatch > bMatch
  553. }
  554. func (bla byLikeness) Swap(i, j int) {
  555. bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
  556. }
  557. func (bla byLikeness) Len() int { return len(bla.arr) }
  558. func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
  559. // reverse the metadata array to shift the newest entries to the beginning
  560. for i := 0; i < len(marr)/2; i++ {
  561. marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
  562. }
  563. // keep equal entries ordered from the youngest to the oldest
  564. sort.Stable(byLikeness{
  565. arr: marr,
  566. hmacKey: hmacKey,
  567. pathComponents: getPathComponents(repoInfo.FullName()),
  568. })
  569. }
  570. // numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
  571. func numOfMatchingPathComponents(pth string, matchComponents []string) int {
  572. pthComponents := getPathComponents(pth)
  573. i := 0
  574. for ; i < len(pthComponents) && i < len(matchComponents); i++ {
  575. if matchComponents[i] != pthComponents[i] {
  576. return i
  577. }
  578. }
  579. return i
  580. }
  581. func getPathComponents(path string) []string {
  582. // make sure to add docker.io/ prefix to the path
  583. named, err := reference.ParseNamed(path)
  584. if err == nil {
  585. path = named.FullName()
  586. }
  587. return strings.Split(path, "/")
  588. }
  589. func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
  590. if layerUpload != nil {
  591. logrus.Debugf("cancelling upload of blob %s", dgst)
  592. err := layerUpload.Cancel(ctx)
  593. if err != nil {
  594. logrus.Warnf("failed to cancel upload: %v", err)
  595. }
  596. }
  597. }