pull_v2.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039
  1. package distribution // import "github.com/docker/docker/distribution"
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "runtime"
  9. "time"
  10. "github.com/containerd/containerd/log"
  11. "github.com/containerd/containerd/platforms"
  12. "github.com/docker/distribution"
  13. "github.com/docker/distribution/manifest/manifestlist"
  14. "github.com/docker/distribution/manifest/ocischema"
  15. "github.com/docker/distribution/manifest/schema1"
  16. "github.com/docker/distribution/manifest/schema2"
  17. "github.com/docker/distribution/reference"
  18. "github.com/docker/distribution/registry/client/transport"
  19. "github.com/docker/docker/distribution/metadata"
  20. "github.com/docker/docker/distribution/xfer"
  21. "github.com/docker/docker/image"
  22. v1 "github.com/docker/docker/image/v1"
  23. "github.com/docker/docker/layer"
  24. "github.com/docker/docker/pkg/ioutils"
  25. "github.com/docker/docker/pkg/progress"
  26. "github.com/docker/docker/pkg/stringid"
  27. "github.com/docker/docker/pkg/system"
  28. refstore "github.com/docker/docker/reference"
  29. "github.com/docker/docker/registry"
  30. "github.com/opencontainers/go-digest"
  31. specs "github.com/opencontainers/image-spec/specs-go/v1"
  32. "github.com/pkg/errors"
  33. "github.com/sirupsen/logrus"
  34. archvariant "github.com/tonistiigi/go-archvariant"
  35. )
  36. var (
  37. errRootFSMismatch = errors.New("layers from manifest don't match image configuration")
  38. errRootFSInvalid = errors.New("invalid rootfs in image configuration")
  39. )
  40. // ImageConfigPullError is an error pulling the image config blob
  41. // (only applies to schema2).
  42. type ImageConfigPullError struct {
  43. Err error
  44. }
  45. // Error returns the error string for ImageConfigPullError.
  46. func (e ImageConfigPullError) Error() string {
  47. return "error pulling image configuration: " + e.Err.Error()
  48. }
  49. type v2Puller struct {
  50. V2MetadataService metadata.V2MetadataService
  51. endpoint registry.APIEndpoint
  52. config *ImagePullConfig
  53. repoInfo *registry.RepositoryInfo
  54. repo distribution.Repository
  55. manifestStore *manifestStore
  56. }
  57. func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) {
  58. // TODO(tiborvass): was ReceiveTimeout
  59. p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
  60. if err != nil {
  61. logrus.Warnf("Error getting v2 registry: %v", err)
  62. return err
  63. }
  64. p.manifestStore.remote, err = p.repo.Manifests(ctx)
  65. if err != nil {
  66. return err
  67. }
  68. if err = p.pullV2Repository(ctx, ref); err != nil {
  69. if _, ok := err.(fallbackError); ok {
  70. return err
  71. }
  72. if continueOnError(err, p.endpoint.Mirror) {
  73. return fallbackError{
  74. err: err,
  75. transportOK: true,
  76. }
  77. }
  78. }
  79. return err
  80. }
  81. func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) {
  82. var layersDownloaded bool
  83. if !reference.IsNameOnly(ref) {
  84. layersDownloaded, err = p.pullV2Tag(ctx, ref, p.config.Platform)
  85. if err != nil {
  86. return err
  87. }
  88. } else {
  89. tags, err := p.repo.Tags(ctx).All(ctx)
  90. if err != nil {
  91. return err
  92. }
  93. for _, tag := range tags {
  94. tagRef, err := reference.WithTag(ref, tag)
  95. if err != nil {
  96. return err
  97. }
  98. pulledNew, err := p.pullV2Tag(ctx, tagRef, p.config.Platform)
  99. if err != nil {
  100. // Since this is the pull-all-tags case, don't
  101. // allow an error pulling a particular tag to
  102. // make the whole pull fall back to v1.
  103. if fallbackErr, ok := err.(fallbackError); ok {
  104. return fallbackErr.err
  105. }
  106. return err
  107. }
  108. // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
  109. // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
  110. layersDownloaded = layersDownloaded || pulledNew
  111. }
  112. }
  113. writeStatus(reference.FamiliarString(ref), p.config.ProgressOutput, layersDownloaded)
  114. return nil
  115. }
  116. type v2LayerDescriptor struct {
  117. digest digest.Digest
  118. diffID layer.DiffID
  119. repoInfo *registry.RepositoryInfo
  120. repo distribution.Repository
  121. V2MetadataService metadata.V2MetadataService
  122. tmpFile *os.File
  123. verifier digest.Verifier
  124. src distribution.Descriptor
  125. }
  126. func (ld *v2LayerDescriptor) Key() string {
  127. return "v2:" + ld.digest.String()
  128. }
  129. func (ld *v2LayerDescriptor) ID() string {
  130. return stringid.TruncateID(ld.digest.String())
  131. }
  132. func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
  133. if ld.diffID != "" {
  134. return ld.diffID, nil
  135. }
  136. return ld.V2MetadataService.GetDiffID(ld.digest)
  137. }
  138. func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
  139. logrus.Debugf("pulling blob %q", ld.digest)
  140. var (
  141. err error
  142. offset int64
  143. )
  144. if ld.tmpFile == nil {
  145. ld.tmpFile, err = createDownloadFile()
  146. if err != nil {
  147. return nil, 0, xfer.DoNotRetry{Err: err}
  148. }
  149. } else {
  150. offset, err = ld.tmpFile.Seek(0, io.SeekEnd)
  151. if err != nil {
  152. logrus.Debugf("error seeking to end of download file: %v", err)
  153. offset = 0
  154. ld.tmpFile.Close()
  155. if err := os.Remove(ld.tmpFile.Name()); err != nil {
  156. logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
  157. }
  158. ld.tmpFile, err = createDownloadFile()
  159. if err != nil {
  160. return nil, 0, xfer.DoNotRetry{Err: err}
  161. }
  162. } else if offset != 0 {
  163. logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset)
  164. }
  165. }
  166. tmpFile := ld.tmpFile
  167. layerDownload, err := ld.open(ctx)
  168. if err != nil {
  169. logrus.Errorf("Error initiating layer download: %v", err)
  170. return nil, 0, retryOnError(err)
  171. }
  172. if offset != 0 {
  173. _, err := layerDownload.Seek(offset, io.SeekStart)
  174. if err != nil {
  175. if err := ld.truncateDownloadFile(); err != nil {
  176. return nil, 0, xfer.DoNotRetry{Err: err}
  177. }
  178. return nil, 0, err
  179. }
  180. }
  181. size, err := layerDownload.Seek(0, io.SeekEnd)
  182. if err != nil {
  183. // Seek failed, perhaps because there was no Content-Length
  184. // header. This shouldn't fail the download, because we can
  185. // still continue without a progress bar.
  186. size = 0
  187. } else {
  188. if size != 0 && offset > size {
  189. logrus.Debug("Partial download is larger than full blob. Starting over")
  190. offset = 0
  191. if err := ld.truncateDownloadFile(); err != nil {
  192. return nil, 0, xfer.DoNotRetry{Err: err}
  193. }
  194. }
  195. // Restore the seek offset either at the beginning of the
  196. // stream, or just after the last byte we have from previous
  197. // attempts.
  198. _, err = layerDownload.Seek(offset, io.SeekStart)
  199. if err != nil {
  200. return nil, 0, err
  201. }
  202. }
  203. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading")
  204. defer reader.Close()
  205. if ld.verifier == nil {
  206. ld.verifier = ld.digest.Verifier()
  207. }
  208. _, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier))
  209. if err != nil {
  210. if err == transport.ErrWrongCodeForByteRange {
  211. if err := ld.truncateDownloadFile(); err != nil {
  212. return nil, 0, xfer.DoNotRetry{Err: err}
  213. }
  214. return nil, 0, err
  215. }
  216. return nil, 0, retryOnError(err)
  217. }
  218. progress.Update(progressOutput, ld.ID(), "Verifying Checksum")
  219. if !ld.verifier.Verified() {
  220. err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
  221. logrus.Error(err)
  222. // Allow a retry if this digest verification error happened
  223. // after a resumed download.
  224. if offset != 0 {
  225. if err := ld.truncateDownloadFile(); err != nil {
  226. return nil, 0, xfer.DoNotRetry{Err: err}
  227. }
  228. return nil, 0, err
  229. }
  230. return nil, 0, xfer.DoNotRetry{Err: err}
  231. }
  232. progress.Update(progressOutput, ld.ID(), "Download complete")
  233. logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), tmpFile.Name())
  234. _, err = tmpFile.Seek(0, io.SeekStart)
  235. if err != nil {
  236. tmpFile.Close()
  237. if err := os.Remove(tmpFile.Name()); err != nil {
  238. logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
  239. }
  240. ld.tmpFile = nil
  241. ld.verifier = nil
  242. return nil, 0, xfer.DoNotRetry{Err: err}
  243. }
  244. // hand off the temporary file to the download manager, so it will only
  245. // be closed once
  246. ld.tmpFile = nil
  247. return ioutils.NewReadCloserWrapper(tmpFile, func() error {
  248. tmpFile.Close()
  249. err := os.RemoveAll(tmpFile.Name())
  250. if err != nil {
  251. logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
  252. }
  253. return err
  254. }), size, nil
  255. }
  256. func (ld *v2LayerDescriptor) Close() {
  257. if ld.tmpFile != nil {
  258. ld.tmpFile.Close()
  259. if err := os.RemoveAll(ld.tmpFile.Name()); err != nil {
  260. logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
  261. }
  262. }
  263. }
  264. func (ld *v2LayerDescriptor) truncateDownloadFile() error {
  265. // Need a new hash context since we will be redoing the download
  266. ld.verifier = nil
  267. if _, err := ld.tmpFile.Seek(0, io.SeekStart); err != nil {
  268. logrus.Errorf("error seeking to beginning of download file: %v", err)
  269. return err
  270. }
  271. if err := ld.tmpFile.Truncate(0); err != nil {
  272. logrus.Errorf("error truncating download file: %v", err)
  273. return err
  274. }
  275. return nil
  276. }
  277. func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
  278. // Cache mapping from this layer's DiffID to the blobsum
  279. ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
  280. }
  281. func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
  282. var (
  283. tagOrDigest string // Used for logging/progress only
  284. dgst digest.Digest
  285. mt string
  286. size int64
  287. tagged reference.NamedTagged
  288. isTagged bool
  289. )
  290. if digested, isDigested := ref.(reference.Canonical); isDigested {
  291. dgst = digested.Digest()
  292. tagOrDigest = digested.String()
  293. } else if tagged, isTagged = ref.(reference.NamedTagged); isTagged {
  294. tagService := p.repo.Tags(ctx)
  295. desc, err := tagService.Get(ctx, tagged.Tag())
  296. if err != nil {
  297. return false, err
  298. }
  299. dgst = desc.Digest
  300. tagOrDigest = tagged.Tag()
  301. mt = desc.MediaType
  302. size = desc.Size
  303. } else {
  304. return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", reference.FamiliarString(ref))
  305. }
  306. ctx = log.WithLogger(ctx, logrus.WithFields(
  307. logrus.Fields{
  308. "digest": dgst,
  309. "remote": ref,
  310. }))
  311. desc := specs.Descriptor{
  312. MediaType: mt,
  313. Digest: dgst,
  314. Size: size,
  315. }
  316. manifest, err := p.manifestStore.Get(ctx, desc)
  317. if err != nil {
  318. if isTagged && isNotFound(errors.Cause(err)) {
  319. logrus.WithField("ref", ref).WithError(err).Debug("Falling back to pull manifest by tag")
  320. msg := `%s Failed to pull manifest by the resolved digest. This registry does not
  321. appear to conform to the distribution registry specification; falling back to
  322. pull by tag. This fallback is DEPRECATED, and will be removed in a future
  323. release. Please contact admins of %s. %s
  324. `
  325. warnEmoji := "\U000026A0\U0000FE0F"
  326. progress.Messagef(p.config.ProgressOutput, "WARNING", msg, warnEmoji, p.endpoint.URL, warnEmoji)
  327. // Fetch by tag worked, but fetch by digest didn't.
  328. // This is a broken registry implementation.
  329. // We'll fallback to the old behavior and get the manifest by tag.
  330. var ms distribution.ManifestService
  331. ms, err = p.repo.Manifests(ctx)
  332. if err != nil {
  333. return false, err
  334. }
  335. manifest, err = ms.Get(ctx, "", distribution.WithTag(tagged.Tag()))
  336. err = errors.Wrap(err, "error after falling back to get manifest by tag")
  337. }
  338. if err != nil {
  339. return false, err
  340. }
  341. }
  342. if manifest == nil {
  343. return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
  344. }
  345. if m, ok := manifest.(*schema2.DeserializedManifest); ok {
  346. var allowedMediatype bool
  347. for _, t := range p.config.Schema2Types {
  348. if m.Manifest.Config.MediaType == t {
  349. allowedMediatype = true
  350. break
  351. }
  352. }
  353. if !allowedMediatype {
  354. configClass := mediaTypeClasses[m.Manifest.Config.MediaType]
  355. if configClass == "" {
  356. configClass = "unknown"
  357. }
  358. return false, invalidManifestClassError{m.Manifest.Config.MediaType, configClass}
  359. }
  360. }
  361. logrus.Debugf("Pulling ref from V2 registry: %s", reference.FamiliarString(ref))
  362. progress.Message(p.config.ProgressOutput, tagOrDigest, "Pulling from "+reference.FamiliarName(p.repo.Named()))
  363. var (
  364. id digest.Digest
  365. manifestDigest digest.Digest
  366. )
  367. switch v := manifest.(type) {
  368. case *schema1.SignedManifest:
  369. if p.config.RequireSchema2 {
  370. return false, fmt.Errorf("invalid manifest: not schema2")
  371. }
  372. // give registries time to upgrade to schema2 and only warn if we know a registry has been upgraded long time ago
  373. // TODO: condition to be removed
  374. if reference.Domain(ref) == "docker.io" {
  375. msg := fmt.Sprintf("Image %s uses outdated schema1 manifest format. Please upgrade to a schema2 image for better future compatibility. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", ref)
  376. logrus.Warn(msg)
  377. progress.Message(p.config.ProgressOutput, "", msg)
  378. }
  379. id, manifestDigest, err = p.pullSchema1(ctx, ref, v, platform)
  380. if err != nil {
  381. return false, err
  382. }
  383. case *schema2.DeserializedManifest:
  384. id, manifestDigest, err = p.pullSchema2(ctx, ref, v, platform)
  385. if err != nil {
  386. return false, err
  387. }
  388. case *ocischema.DeserializedManifest:
  389. id, manifestDigest, err = p.pullOCI(ctx, ref, v, platform)
  390. if err != nil {
  391. return false, err
  392. }
  393. case *manifestlist.DeserializedManifestList:
  394. id, manifestDigest, err = p.pullManifestList(ctx, ref, v, platform)
  395. if err != nil {
  396. return false, err
  397. }
  398. default:
  399. return false, invalidManifestFormatError{}
  400. }
  401. progress.Message(p.config.ProgressOutput, "", "Digest: "+manifestDigest.String())
  402. if p.config.ReferenceStore != nil {
  403. oldTagID, err := p.config.ReferenceStore.Get(ref)
  404. if err == nil {
  405. if oldTagID == id {
  406. return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id)
  407. }
  408. } else if err != refstore.ErrDoesNotExist {
  409. return false, err
  410. }
  411. if canonical, ok := ref.(reference.Canonical); ok {
  412. if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil {
  413. return false, err
  414. }
  415. } else {
  416. if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
  417. return false, err
  418. }
  419. if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil {
  420. return false, err
  421. }
  422. }
  423. }
  424. return true, nil
  425. }
  426. func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
  427. if platform != nil {
  428. // Early bath if the requested OS doesn't match that of the configuration.
  429. // This avoids doing the download, only to potentially fail later.
  430. if !system.IsOSSupported(platform.OS) {
  431. return "", "", fmt.Errorf("cannot download image with operating system %q when requesting %q", runtime.GOOS, platform.OS)
  432. }
  433. }
  434. var verifiedManifest *schema1.Manifest
  435. verifiedManifest, err = verifySchema1Manifest(unverifiedManifest, ref)
  436. if err != nil {
  437. return "", "", err
  438. }
  439. rootFS := image.NewRootFS()
  440. // remove duplicate layers and check parent chain validity
  441. err = fixManifestLayers(verifiedManifest)
  442. if err != nil {
  443. return "", "", err
  444. }
  445. var descriptors []xfer.DownloadDescriptor
  446. // Image history converted to the new format
  447. var history []image.History
  448. // Note that the order of this loop is in the direction of bottom-most
  449. // to top-most, so that the downloads slice gets ordered correctly.
  450. for i := len(verifiedManifest.FSLayers) - 1; i >= 0; i-- {
  451. blobSum := verifiedManifest.FSLayers[i].BlobSum
  452. if err = blobSum.Validate(); err != nil {
  453. return "", "", errors.Wrapf(err, "could not validate layer digest %q", blobSum)
  454. }
  455. var throwAway struct {
  456. ThrowAway bool `json:"throwaway,omitempty"`
  457. }
  458. if err := json.Unmarshal([]byte(verifiedManifest.History[i].V1Compatibility), &throwAway); err != nil {
  459. return "", "", err
  460. }
  461. h, err := v1.HistoryFromConfig([]byte(verifiedManifest.History[i].V1Compatibility), throwAway.ThrowAway)
  462. if err != nil {
  463. return "", "", err
  464. }
  465. history = append(history, h)
  466. if throwAway.ThrowAway {
  467. continue
  468. }
  469. layerDescriptor := &v2LayerDescriptor{
  470. digest: blobSum,
  471. repoInfo: p.repoInfo,
  472. repo: p.repo,
  473. V2MetadataService: p.V2MetadataService,
  474. }
  475. descriptors = append(descriptors, layerDescriptor)
  476. }
  477. resultRootFS, release, err := p.config.DownloadManager.Download(ctx, *rootFS, descriptors, p.config.ProgressOutput)
  478. if err != nil {
  479. return "", "", err
  480. }
  481. defer release()
  482. config, err := v1.MakeConfigFromV1Config([]byte(verifiedManifest.History[0].V1Compatibility), &resultRootFS, history)
  483. if err != nil {
  484. return "", "", err
  485. }
  486. imageID, err := p.config.ImageStore.Put(ctx, config)
  487. if err != nil {
  488. return "", "", err
  489. }
  490. manifestDigest = digest.FromBytes(unverifiedManifest.Canonical)
  491. return imageID, manifestDigest, nil
  492. }
  493. func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
  494. if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
  495. // If the image already exists locally, no need to pull
  496. // anything.
  497. return target.Digest, nil
  498. }
  499. var descriptors []xfer.DownloadDescriptor
  500. // Note that the order of this loop is in the direction of bottom-most
  501. // to top-most, so that the downloads slice gets ordered correctly.
  502. for _, d := range layers {
  503. if err := d.Digest.Validate(); err != nil {
  504. return "", errors.Wrapf(err, "could not validate layer digest %q", d.Digest)
  505. }
  506. layerDescriptor := &v2LayerDescriptor{
  507. digest: d.Digest,
  508. repo: p.repo,
  509. repoInfo: p.repoInfo,
  510. V2MetadataService: p.V2MetadataService,
  511. src: d,
  512. }
  513. descriptors = append(descriptors, layerDescriptor)
  514. }
  515. configChan := make(chan []byte, 1)
  516. configErrChan := make(chan error, 1)
  517. layerErrChan := make(chan error, 1)
  518. downloadsDone := make(chan struct{})
  519. var cancel func()
  520. ctx, cancel = context.WithCancel(ctx)
  521. defer cancel()
  522. // Pull the image config
  523. go func() {
  524. configJSON, err := p.pullSchema2Config(ctx, target.Digest)
  525. if err != nil {
  526. configErrChan <- ImageConfigPullError{Err: err}
  527. cancel()
  528. return
  529. }
  530. configChan <- configJSON
  531. }()
  532. var (
  533. configJSON []byte // raw serialized image config
  534. downloadedRootFS *image.RootFS // rootFS from registered layers
  535. configRootFS *image.RootFS // rootFS from configuration
  536. release func() // release resources from rootFS download
  537. configPlatform *specs.Platform // for LCOW when registering downloaded layers
  538. )
  539. layerStoreOS := runtime.GOOS
  540. if platform != nil {
  541. layerStoreOS = platform.OS
  542. }
  543. // https://github.com/docker/docker/issues/24766 - Err on the side of caution,
  544. // explicitly blocking images intended for linux from the Windows daemon. On
  545. // Windows, we do this before the attempt to download, effectively serialising
  546. // the download slightly slowing it down. We have to do it this way, as
  547. // chances are the download of layers itself would fail due to file names
  548. // which aren't suitable for NTFS. At some point in the future, if a similar
  549. // check to block Windows images being pulled on Linux is implemented, it
  550. // may be necessary to perform the same type of serialisation.
  551. if runtime.GOOS == "windows" {
  552. configJSON, configRootFS, configPlatform, err = receiveConfig(p.config.ImageStore, configChan, configErrChan)
  553. if err != nil {
  554. return "", err
  555. }
  556. if configRootFS == nil {
  557. return "", errRootFSInvalid
  558. }
  559. if err := checkImageCompatibility(configPlatform.OS, configPlatform.OSVersion); err != nil {
  560. return "", err
  561. }
  562. if len(descriptors) != len(configRootFS.DiffIDs) {
  563. return "", errRootFSMismatch
  564. }
  565. if platform == nil {
  566. // Early bath if the requested OS doesn't match that of the configuration.
  567. // This avoids doing the download, only to potentially fail later.
  568. if !system.IsOSSupported(configPlatform.OS) {
  569. return "", fmt.Errorf("cannot download image with operating system %q when requesting %q", configPlatform.OS, layerStoreOS)
  570. }
  571. layerStoreOS = configPlatform.OS
  572. }
  573. // Populate diff ids in descriptors to avoid downloading foreign layers
  574. // which have been side loaded
  575. for i := range descriptors {
  576. descriptors[i].(*v2LayerDescriptor).diffID = configRootFS.DiffIDs[i]
  577. }
  578. }
  579. // Assume that the operating system is the host OS if blank, and validate it
  580. // to ensure we don't cause a panic by an invalid index into the layerstores.
  581. if layerStoreOS != "" && !system.IsOSSupported(layerStoreOS) {
  582. return "", system.ErrNotSupportedOperatingSystem
  583. }
  584. if p.config.DownloadManager != nil {
  585. go func() {
  586. var (
  587. err error
  588. rootFS image.RootFS
  589. )
  590. downloadRootFS := *image.NewRootFS()
  591. rootFS, release, err = p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput)
  592. if err != nil {
  593. // Intentionally do not cancel the config download here
  594. // as the error from config download (if there is one)
  595. // is more interesting than the layer download error
  596. layerErrChan <- err
  597. return
  598. }
  599. downloadedRootFS = &rootFS
  600. close(downloadsDone)
  601. }()
  602. } else {
  603. // We have nothing to download
  604. close(downloadsDone)
  605. }
  606. if configJSON == nil {
  607. configJSON, configRootFS, _, err = receiveConfig(p.config.ImageStore, configChan, configErrChan)
  608. if err == nil && configRootFS == nil {
  609. err = errRootFSInvalid
  610. }
  611. if err != nil {
  612. cancel()
  613. select {
  614. case <-downloadsDone:
  615. case <-layerErrChan:
  616. }
  617. return "", err
  618. }
  619. }
  620. select {
  621. case <-downloadsDone:
  622. case err = <-layerErrChan:
  623. return "", err
  624. }
  625. if release != nil {
  626. defer release()
  627. }
  628. if downloadedRootFS != nil {
  629. // The DiffIDs returned in rootFS MUST match those in the config.
  630. // Otherwise the image config could be referencing layers that aren't
  631. // included in the manifest.
  632. if len(downloadedRootFS.DiffIDs) != len(configRootFS.DiffIDs) {
  633. return "", errRootFSMismatch
  634. }
  635. for i := range downloadedRootFS.DiffIDs {
  636. if downloadedRootFS.DiffIDs[i] != configRootFS.DiffIDs[i] {
  637. return "", errRootFSMismatch
  638. }
  639. }
  640. }
  641. imageID, err := p.config.ImageStore.Put(ctx, configJSON)
  642. if err != nil {
  643. return "", err
  644. }
  645. return imageID, nil
  646. }
  647. func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
  648. manifestDigest, err = schema2ManifestDigest(ref, mfst)
  649. if err != nil {
  650. return "", "", err
  651. }
  652. id, err = p.pullSchema2Layers(ctx, mfst.Target(), mfst.Layers, platform)
  653. return id, manifestDigest, err
  654. }
  655. func (p *v2Puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) {
  656. manifestDigest, err = schema2ManifestDigest(ref, mfst)
  657. if err != nil {
  658. return "", "", err
  659. }
  660. id, err = p.pullSchema2Layers(ctx, mfst.Target(), mfst.Layers, platform)
  661. return id, manifestDigest, err
  662. }
  663. func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, *specs.Platform, error) {
  664. select {
  665. case configJSON := <-configChan:
  666. rootfs, err := s.RootFSFromConfig(configJSON)
  667. if err != nil {
  668. return nil, nil, nil, err
  669. }
  670. platform, err := s.PlatformFromConfig(configJSON)
  671. if err != nil {
  672. return nil, nil, nil, err
  673. }
  674. return configJSON, rootfs, platform, nil
  675. case err := <-errChan:
  676. return nil, nil, nil, err
  677. // Don't need a case for ctx.Done in the select because cancellation
  678. // will trigger an error in p.pullSchema2ImageConfig.
  679. }
  680. }
  681. // pullManifestList handles "manifest lists" which point to various
  682. // platform-specific manifests.
  683. func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) {
  684. manifestListDigest, err = schema2ManifestDigest(ref, mfstList)
  685. if err != nil {
  686. return "", "", err
  687. }
  688. var platform specs.Platform
  689. if pp != nil {
  690. platform = *pp
  691. }
  692. logrus.Debugf("%s resolved to a manifestList object with %d entries; looking for a %s/%s match", ref, len(mfstList.Manifests), platforms.Format(platform), runtime.GOARCH)
  693. manifestMatches := filterManifests(mfstList.Manifests, platform)
  694. if len(manifestMatches) == 0 {
  695. errMsg := fmt.Sprintf("no matching manifest for %s in the manifest list entries", formatPlatform(platform))
  696. logrus.Debugf(errMsg)
  697. return "", "", errors.New(errMsg)
  698. }
  699. if len(manifestMatches) > 1 {
  700. logrus.Debugf("found multiple matches in manifest list, choosing best match %s", manifestMatches[0].Digest.String())
  701. }
  702. match := manifestMatches[0]
  703. if err := checkImageCompatibility(match.Platform.OS, match.Platform.OSVersion); err != nil {
  704. return "", "", err
  705. }
  706. desc := specs.Descriptor{
  707. Digest: match.Digest,
  708. Size: match.Size,
  709. MediaType: match.MediaType,
  710. }
  711. manifest, err := p.manifestStore.Get(ctx, desc)
  712. if err != nil {
  713. return "", "", err
  714. }
  715. manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), match.Digest)
  716. if err != nil {
  717. return "", "", err
  718. }
  719. switch v := manifest.(type) {
  720. case *schema1.SignedManifest:
  721. msg := fmt.Sprintf("[DEPRECATION NOTICE] v2 schema1 manifests in manifest lists are not supported and will break in a future release. Suggest author of %s to upgrade to v2 schema2. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", ref)
  722. logrus.Warn(msg)
  723. progress.Message(p.config.ProgressOutput, "", msg)
  724. platform := toOCIPlatform(manifestMatches[0].Platform)
  725. id, _, err = p.pullSchema1(ctx, manifestRef, v, &platform)
  726. if err != nil {
  727. return "", "", err
  728. }
  729. case *schema2.DeserializedManifest:
  730. platform := toOCIPlatform(manifestMatches[0].Platform)
  731. id, _, err = p.pullSchema2(ctx, manifestRef, v, &platform)
  732. if err != nil {
  733. return "", "", err
  734. }
  735. case *ocischema.DeserializedManifest:
  736. platform := toOCIPlatform(manifestMatches[0].Platform)
  737. id, _, err = p.pullOCI(ctx, manifestRef, v, &platform)
  738. if err != nil {
  739. return "", "", err
  740. }
  741. default:
  742. return "", "", errors.New("unsupported manifest format")
  743. }
  744. return id, manifestListDigest, err
  745. }
  746. const (
  747. defaultSchemaPullBackoff = 250 * time.Millisecond
  748. defaultMaxSchemaPullAttempts = 5
  749. )
  750. func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
  751. blobs := p.repo.Blobs(ctx)
  752. err = retry(ctx, defaultMaxSchemaPullAttempts, defaultSchemaPullBackoff, func(ctx context.Context) (err error) {
  753. configJSON, err = blobs.Get(ctx, dgst)
  754. return err
  755. })
  756. if err != nil {
  757. return nil, err
  758. }
  759. // Verify image config digest
  760. verifier := dgst.Verifier()
  761. if _, err := verifier.Write(configJSON); err != nil {
  762. return nil, err
  763. }
  764. if !verifier.Verified() {
  765. err := fmt.Errorf("image config verification failed for digest %s", dgst)
  766. logrus.Error(err)
  767. return nil, err
  768. }
  769. return configJSON, nil
  770. }
  771. func retry(ctx context.Context, maxAttempts int, sleep time.Duration, f func(ctx context.Context) error) (err error) {
  772. attempt := 0
  773. for ; attempt < maxAttempts; attempt++ {
  774. err = retryOnError(f(ctx))
  775. if err == nil {
  776. return nil
  777. }
  778. if xfer.IsDoNotRetryError(err) {
  779. break
  780. }
  781. if attempt+1 < maxAttempts {
  782. timer := time.NewTimer(sleep)
  783. select {
  784. case <-ctx.Done():
  785. timer.Stop()
  786. return ctx.Err()
  787. case <-timer.C:
  788. logrus.WithError(err).WithField("attempts", attempt+1).Debug("retrying after error")
  789. sleep *= 2
  790. }
  791. }
  792. }
  793. return errors.Wrapf(err, "download failed after attempts=%d", attempt+1)
  794. }
  795. // schema2ManifestDigest computes the manifest digest, and, if pulling by
  796. // digest, ensures that it matches the requested digest.
  797. func schema2ManifestDigest(ref reference.Named, mfst distribution.Manifest) (digest.Digest, error) {
  798. _, canonical, err := mfst.Payload()
  799. if err != nil {
  800. return "", err
  801. }
  802. // If pull by digest, then verify the manifest digest.
  803. if digested, isDigested := ref.(reference.Canonical); isDigested {
  804. verifier := digested.Digest().Verifier()
  805. if _, err := verifier.Write(canonical); err != nil {
  806. return "", err
  807. }
  808. if !verifier.Verified() {
  809. err := fmt.Errorf("manifest verification failed for digest %s", digested.Digest())
  810. logrus.Error(err)
  811. return "", err
  812. }
  813. return digested.Digest(), nil
  814. }
  815. return digest.FromBytes(canonical), nil
  816. }
  817. func verifySchema1Manifest(signedManifest *schema1.SignedManifest, ref reference.Reference) (m *schema1.Manifest, err error) {
  818. // If pull by digest, then verify the manifest digest. NOTE: It is
  819. // important to do this first, before any other content validation. If the
  820. // digest cannot be verified, don't even bother with those other things.
  821. if digested, isCanonical := ref.(reference.Canonical); isCanonical {
  822. verifier := digested.Digest().Verifier()
  823. if _, err := verifier.Write(signedManifest.Canonical); err != nil {
  824. return nil, err
  825. }
  826. if !verifier.Verified() {
  827. err := fmt.Errorf("image verification failed for digest %s", digested.Digest())
  828. logrus.Error(err)
  829. return nil, err
  830. }
  831. }
  832. m = &signedManifest.Manifest
  833. if m.SchemaVersion != 1 {
  834. return nil, fmt.Errorf("unsupported schema version %d for %q", m.SchemaVersion, reference.FamiliarString(ref))
  835. }
  836. if len(m.FSLayers) != len(m.History) {
  837. return nil, fmt.Errorf("length of history not equal to number of layers for %q", reference.FamiliarString(ref))
  838. }
  839. if len(m.FSLayers) == 0 {
  840. return nil, fmt.Errorf("no FSLayers in manifest for %q", reference.FamiliarString(ref))
  841. }
  842. return m, nil
  843. }
  844. // fixManifestLayers removes repeated layers from the manifest and checks the
  845. // correctness of the parent chain.
  846. func fixManifestLayers(m *schema1.Manifest) error {
  847. imgs := make([]*image.V1Image, len(m.FSLayers))
  848. for i := range m.FSLayers {
  849. img := &image.V1Image{}
  850. if err := json.Unmarshal([]byte(m.History[i].V1Compatibility), img); err != nil {
  851. return err
  852. }
  853. imgs[i] = img
  854. if err := v1.ValidateID(img.ID); err != nil {
  855. return err
  856. }
  857. }
  858. if imgs[len(imgs)-1].Parent != "" && runtime.GOOS != "windows" {
  859. // Windows base layer can point to a base layer parent that is not in manifest.
  860. return errors.New("invalid parent ID in the base layer of the image")
  861. }
  862. // check general duplicates to error instead of a deadlock
  863. idmap := make(map[string]struct{})
  864. var lastID string
  865. for _, img := range imgs {
  866. // skip IDs that appear after each other, we handle those later
  867. if _, exists := idmap[img.ID]; img.ID != lastID && exists {
  868. return fmt.Errorf("ID %+v appears multiple times in manifest", img.ID)
  869. }
  870. lastID = img.ID
  871. idmap[lastID] = struct{}{}
  872. }
  873. // backwards loop so that we keep the remaining indexes after removing items
  874. for i := len(imgs) - 2; i >= 0; i-- {
  875. if imgs[i].ID == imgs[i+1].ID { // repeated ID. remove and continue
  876. m.FSLayers = append(m.FSLayers[:i], m.FSLayers[i+1:]...)
  877. m.History = append(m.History[:i], m.History[i+1:]...)
  878. } else if imgs[i].Parent != imgs[i+1].ID {
  879. return fmt.Errorf("invalid parent ID. Expected %v, got %v", imgs[i+1].ID, imgs[i].Parent)
  880. }
  881. }
  882. return nil
  883. }
  884. func createDownloadFile() (*os.File, error) {
  885. return os.CreateTemp("", "GetImageBlob")
  886. }
  887. func toOCIPlatform(p manifestlist.PlatformSpec) specs.Platform {
  888. return specs.Platform{
  889. OS: p.OS,
  890. Architecture: p.Architecture,
  891. Variant: p.Variant,
  892. OSFeatures: p.OSFeatures,
  893. OSVersion: p.OSVersion,
  894. }
  895. }
  896. // maximumSpec returns the distribution platform with maximum compatibility for the current node.
  897. func maximumSpec() specs.Platform {
  898. p := platforms.DefaultSpec()
  899. if p.Architecture == "amd64" {
  900. p.Variant = archvariant.AMD64Variant()
  901. }
  902. return p
  903. }