pull_v2.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package graph
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/distribution"
  9. "github.com/docker/distribution/digest"
  10. "github.com/docker/distribution/manifest"
  11. "github.com/docker/docker/image"
  12. "github.com/docker/docker/pkg/progressreader"
  13. "github.com/docker/docker/pkg/streamformatter"
  14. "github.com/docker/docker/pkg/stringid"
  15. "github.com/docker/docker/registry"
  16. "github.com/docker/docker/trust"
  17. "github.com/docker/docker/utils"
  18. "github.com/docker/libtrust"
  19. "golang.org/x/net/context"
  20. )
  21. type v2Puller struct {
  22. *TagStore
  23. endpoint registry.APIEndpoint
  24. config *ImagePullConfig
  25. sf *streamformatter.StreamFormatter
  26. repoInfo *registry.RepositoryInfo
  27. repo distribution.Repository
  28. sessionID string
  29. }
  30. func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
  31. // TODO(tiborvass): was ReceiveTimeout
  32. p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
  33. if err != nil {
  34. logrus.Debugf("Error getting v2 registry: %v", err)
  35. return true, err
  36. }
  37. p.sessionID = stringid.GenerateRandomID()
  38. if err := p.pullV2Repository(tag); err != nil {
  39. if registry.ContinueOnError(err) {
  40. logrus.Debugf("Error trying v2 registry: %v", err)
  41. return true, err
  42. }
  43. return false, err
  44. }
  45. return false, nil
  46. }
  47. func (p *v2Puller) pullV2Repository(tag string) (err error) {
  48. var tags []string
  49. taggedName := p.repoInfo.LocalName
  50. if len(tag) > 0 {
  51. tags = []string{tag}
  52. taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
  53. } else {
  54. var err error
  55. manSvc, err := p.repo.Manifests(context.Background())
  56. if err != nil {
  57. return err
  58. }
  59. tags, err = manSvc.Tags()
  60. if err != nil {
  61. return err
  62. }
  63. }
  64. c, err := p.poolAdd("pull", taggedName)
  65. if err != nil {
  66. if c != nil {
  67. // Another pull of the same repository is already taking place; just wait for it to finish
  68. p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
  69. <-c
  70. return nil
  71. }
  72. return err
  73. }
  74. defer p.poolRemove("pull", taggedName)
  75. var layersDownloaded bool
  76. for _, tag := range tags {
  77. // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
  78. // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
  79. pulledNew, err := p.pullV2Tag(tag, taggedName)
  80. if err != nil {
  81. return err
  82. }
  83. layersDownloaded = layersDownloaded || pulledNew
  84. }
  85. writeStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded)
  86. return nil
  87. }
  88. // downloadInfo is used to pass information from download to extractor
  89. type downloadInfo struct {
  90. img *image.Image
  91. tmpFile *os.File
  92. digest digest.Digest
  93. layer distribution.ReadSeekCloser
  94. size int64
  95. err chan error
  96. verified bool
  97. }
  98. type errVerification struct{}
  99. func (errVerification) Error() string { return "verification failed" }
  100. func (p *v2Puller) download(di *downloadInfo) {
  101. logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
  102. out := p.config.OutStream
  103. if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
  104. if c != nil {
  105. out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil))
  106. <-c
  107. out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
  108. } else {
  109. logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err)
  110. }
  111. di.err <- nil
  112. return
  113. }
  114. defer p.poolRemove("pull", "img:"+di.img.ID)
  115. tmpFile, err := ioutil.TempFile("", "GetImageBlob")
  116. if err != nil {
  117. di.err <- err
  118. return
  119. }
  120. blobs := p.repo.Blobs(nil)
  121. desc, err := blobs.Stat(nil, di.digest)
  122. if err != nil {
  123. logrus.Debugf("Error statting layer: %v", err)
  124. di.err <- err
  125. return
  126. }
  127. di.size = desc.Size
  128. layerDownload, err := blobs.Open(nil, di.digest)
  129. if err != nil {
  130. logrus.Debugf("Error fetching layer: %v", err)
  131. di.err <- err
  132. return
  133. }
  134. defer layerDownload.Close()
  135. verifier, err := digest.NewDigestVerifier(di.digest)
  136. if err != nil {
  137. di.err <- err
  138. return
  139. }
  140. reader := progressreader.New(progressreader.Config{
  141. In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
  142. Out: out,
  143. Formatter: p.sf,
  144. Size: int(di.size),
  145. NewLines: false,
  146. ID: stringid.TruncateID(di.img.ID),
  147. Action: "Downloading",
  148. })
  149. io.Copy(tmpFile, reader)
  150. out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil))
  151. di.verified = verifier.Verified()
  152. if !di.verified {
  153. logrus.Infof("Image verification failed for layer %s", di.digest)
  154. }
  155. out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
  156. logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name())
  157. di.tmpFile = tmpFile
  158. di.layer = layerDownload
  159. di.err <- nil
  160. }
  161. func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
  162. logrus.Debugf("Pulling tag from V2 registry: %q", tag)
  163. out := p.config.OutStream
  164. manSvc, err := p.repo.Manifests(context.Background())
  165. if err != nil {
  166. return false, err
  167. }
  168. manifest, err := manSvc.GetByTag(tag)
  169. if err != nil {
  170. return false, err
  171. }
  172. verified, err := p.validateManifest(manifest, tag)
  173. if err != nil {
  174. return false, err
  175. }
  176. if verified {
  177. logrus.Printf("Image manifest for %s has been verified", taggedName)
  178. }
  179. out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
  180. downloads := make([]downloadInfo, len(manifest.FSLayers))
  181. layerIDs := []string{}
  182. defer func() {
  183. p.graph.Release(p.sessionID, layerIDs...)
  184. }()
  185. for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
  186. img, err := image.NewImgJSON([]byte(manifest.History[i].V1Compatibility))
  187. if err != nil {
  188. logrus.Debugf("error getting image v1 json: %v", err)
  189. return false, err
  190. }
  191. downloads[i].img = img
  192. downloads[i].digest = manifest.FSLayers[i].BlobSum
  193. p.graph.Retain(p.sessionID, img.ID)
  194. layerIDs = append(layerIDs, img.ID)
  195. // Check if exists
  196. if p.graph.Exists(img.ID) {
  197. logrus.Debugf("Image already exists: %s", img.ID)
  198. continue
  199. }
  200. out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
  201. downloads[i].err = make(chan error)
  202. go p.download(&downloads[i])
  203. }
  204. var tagUpdated bool
  205. for i := len(downloads) - 1; i >= 0; i-- {
  206. d := &downloads[i]
  207. if d.err != nil {
  208. if err := <-d.err; err != nil {
  209. return false, err
  210. }
  211. }
  212. verified = verified && d.verified
  213. if d.layer != nil {
  214. // if tmpFile is empty assume download and extracted elsewhere
  215. defer os.Remove(d.tmpFile.Name())
  216. defer d.tmpFile.Close()
  217. d.tmpFile.Seek(0, 0)
  218. if d.tmpFile != nil {
  219. reader := progressreader.New(progressreader.Config{
  220. In: d.tmpFile,
  221. Out: out,
  222. Formatter: p.sf,
  223. Size: int(d.size),
  224. NewLines: false,
  225. ID: stringid.TruncateID(d.img.ID),
  226. Action: "Extracting",
  227. })
  228. err = p.graph.Register(d.img, reader)
  229. if err != nil {
  230. return false, err
  231. }
  232. if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil {
  233. return false, err
  234. }
  235. // FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
  236. }
  237. out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
  238. tagUpdated = true
  239. } else {
  240. out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
  241. }
  242. }
  243. manifestDigest, _, err := digestFromManifest(manifest, p.repoInfo.LocalName)
  244. if err != nil {
  245. return false, err
  246. }
  247. // Check for new tag if no layers downloaded
  248. if !tagUpdated {
  249. repo, err := p.Get(p.repoInfo.LocalName)
  250. if err != nil {
  251. return false, err
  252. }
  253. if repo != nil {
  254. if _, exists := repo[tag]; !exists {
  255. tagUpdated = true
  256. }
  257. } else {
  258. tagUpdated = true
  259. }
  260. }
  261. if verified && tagUpdated {
  262. out.Write(p.sf.FormatStatus(p.repo.Name()+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
  263. }
  264. if utils.DigestReference(tag) {
  265. // TODO(stevvooe): Ideally, we should always set the digest so we can
  266. // use the digest whether we pull by it or not. Unfortunately, the tag
  267. // store treats the digest as a separate tag, meaning there may be an
  268. // untagged digest image that would seem to be dangling by a user.
  269. if err = p.SetDigest(p.repoInfo.LocalName, tag, downloads[0].img.ID); err != nil {
  270. return false, err
  271. }
  272. } else {
  273. // only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
  274. if err = p.Tag(p.repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
  275. return false, err
  276. }
  277. }
  278. if manifestDigest != "" {
  279. out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
  280. }
  281. return tagUpdated, nil
  282. }
  283. // verifyTrustedKeys checks the keys provided against the trust store,
  284. // ensuring that the provided keys are trusted for the namespace. The keys
  285. // provided from this method must come from the signatures provided as part of
  286. // the manifest JWS package, obtained from unpackSignedManifest or libtrust.
  287. func (p *v2Puller) verifyTrustedKeys(namespace string, keys []libtrust.PublicKey) (verified bool, err error) {
  288. if namespace[0] != '/' {
  289. namespace = "/" + namespace
  290. }
  291. for _, key := range keys {
  292. b, err := key.MarshalJSON()
  293. if err != nil {
  294. return false, fmt.Errorf("error marshalling public key: %s", err)
  295. }
  296. // Check key has read/write permission (0x03)
  297. v, err := p.trustService.CheckKey(namespace, b, 0x03)
  298. if err != nil {
  299. vErr, ok := err.(trust.NotVerifiedError)
  300. if !ok {
  301. return false, fmt.Errorf("error running key check: %s", err)
  302. }
  303. logrus.Debugf("Key check result: %v", vErr)
  304. }
  305. verified = v
  306. }
  307. if verified {
  308. logrus.Debug("Key check result: verified")
  309. }
  310. return
  311. }
  312. func (p *v2Puller) validateManifest(m *manifest.SignedManifest, tag string) (verified bool, err error) {
  313. // TODO(tiborvass): what's the usecase for having manifest == nil and err == nil ? Shouldn't be the error be "DoesNotExist" ?
  314. if m == nil {
  315. return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
  316. }
  317. if m.SchemaVersion != 1 {
  318. return false, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
  319. }
  320. if len(m.FSLayers) != len(m.History) {
  321. return false, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
  322. }
  323. if len(m.FSLayers) == 0 {
  324. return false, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
  325. }
  326. keys, err := manifest.Verify(m)
  327. if err != nil {
  328. return false, fmt.Errorf("error verifying manifest for tag %q: %v", tag, err)
  329. }
  330. verified, err = p.verifyTrustedKeys(m.Name, keys)
  331. if err != nil {
  332. return false, fmt.Errorf("error verifying manifest keys: %v", err)
  333. }
  334. localDigest, err := digest.ParseDigest(tag)
  335. // if pull by digest, then verify
  336. if err == nil {
  337. verifier, err := digest.NewDigestVerifier(localDigest)
  338. if err != nil {
  339. return false, err
  340. }
  341. payload, err := m.Payload()
  342. if err != nil {
  343. return false, err
  344. }
  345. if _, err := verifier.Write(payload); err != nil {
  346. return false, err
  347. }
  348. verified = verified && verifier.Verified()
  349. }
  350. return verified, nil
  351. }