push.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. package graph
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "strings"
  11. "sync"
  12. log "github.com/Sirupsen/logrus"
  13. "github.com/docker/docker/engine"
  14. "github.com/docker/docker/image"
  15. "github.com/docker/docker/registry"
  16. "github.com/docker/docker/utils"
  17. "github.com/docker/libtrust"
  18. )
  19. var ErrV2RegistryUnavailable = errors.New("error v2 registry unavailable")
  20. // Retrieve the all the images to be uploaded in the correct order
  21. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  22. var (
  23. imageList []string
  24. imagesSeen = make(map[string]bool)
  25. tagsByImage = make(map[string][]string)
  26. )
  27. for tag, id := range localRepo {
  28. if requestedTag != "" && requestedTag != tag {
  29. continue
  30. }
  31. var imageListForThisTag []string
  32. tagsByImage[id] = append(tagsByImage[id], tag)
  33. for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
  34. if err != nil {
  35. return nil, nil, err
  36. }
  37. if imagesSeen[img.ID] {
  38. // This image is already on the list, we can ignore it and all its parents
  39. break
  40. }
  41. imagesSeen[img.ID] = true
  42. imageListForThisTag = append(imageListForThisTag, img.ID)
  43. }
  44. // reverse the image list for this tag (so the "most"-parent image is first)
  45. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  46. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  47. }
  48. // append to main image list
  49. imageList = append(imageList, imageListForThisTag...)
  50. }
  51. if len(imageList) == 0 {
  52. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  53. }
  54. log.Debugf("Image list: %v", imageList)
  55. log.Debugf("Tags by image: %v", tagsByImage)
  56. return imageList, tagsByImage, nil
  57. }
  58. func (s *TagStore) getImageTags(localName, askedTag string) ([]string, error) {
  59. localRepo, err := s.Get(localName)
  60. if err != nil {
  61. return nil, err
  62. }
  63. log.Debugf("Checking %s against %#v", askedTag, localRepo)
  64. if len(askedTag) > 0 {
  65. if _, ok := localRepo[askedTag]; !ok {
  66. return nil, fmt.Errorf("Tag does not exist for %s:%s", localName, askedTag)
  67. }
  68. return []string{askedTag}, nil
  69. }
  70. var tags []string
  71. for tag := range localRepo {
  72. tags = append(tags, tag)
  73. }
  74. return tags, nil
  75. }
  76. // createImageIndex returns an index of an image's layer IDs and tags.
  77. func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
  78. var imageIndex []*registry.ImgData
  79. for _, id := range images {
  80. if tags, hasTags := tags[id]; hasTags {
  81. // If an image has tags you must add an entry in the image index
  82. // for each tag
  83. for _, tag := range tags {
  84. imageIndex = append(imageIndex, &registry.ImgData{
  85. ID: id,
  86. Tag: tag,
  87. })
  88. }
  89. continue
  90. }
  91. // If the image does not have a tag it still needs to be sent to the
  92. // registry with an empty tag so that it is accociated with the repository
  93. imageIndex = append(imageIndex, &registry.ImgData{
  94. ID: id,
  95. Tag: "",
  96. })
  97. }
  98. return imageIndex
  99. }
  100. type imagePushData struct {
  101. id string
  102. endpoint string
  103. tokens []string
  104. }
  105. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  106. // and if it is absent then it sends the image id to the channel to be pushed.
  107. func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter,
  108. images chan imagePushData, imagesToPush chan string) {
  109. defer wg.Done()
  110. for image := range images {
  111. if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
  112. log.Errorf("Error in LookupRemoteImage: %s", err)
  113. imagesToPush <- image.id
  114. continue
  115. }
  116. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id)))
  117. }
  118. }
  119. func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
  120. tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error {
  121. workerCount := len(imageIDs)
  122. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  123. if workerCount > 5 {
  124. workerCount = 5
  125. }
  126. var (
  127. wg = &sync.WaitGroup{}
  128. imageData = make(chan imagePushData, workerCount*2)
  129. imagesToPush = make(chan string, workerCount*2)
  130. pushes = make(chan map[string]struct{}, 1)
  131. )
  132. for i := 0; i < workerCount; i++ {
  133. wg.Add(1)
  134. go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
  135. }
  136. // start a go routine that consumes the images to push
  137. go func() {
  138. shouldPush := make(map[string]struct{})
  139. for id := range imagesToPush {
  140. shouldPush[id] = struct{}{}
  141. }
  142. pushes <- shouldPush
  143. }()
  144. for _, id := range imageIDs {
  145. imageData <- imagePushData{
  146. id: id,
  147. endpoint: endpoint,
  148. tokens: repo.Tokens,
  149. }
  150. }
  151. // close the channel to notify the workers that there will be no more images to check.
  152. close(imageData)
  153. wg.Wait()
  154. close(imagesToPush)
  155. // wait for all the images that require pushes to be collected into a consumable map.
  156. shouldPush := <-pushes
  157. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  158. // is very important that is why we are still iterating over the ordered list of imageIDs.
  159. for _, id := range imageIDs {
  160. if _, push := shouldPush[id]; push {
  161. if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
  162. // FIXME: Continue on error?
  163. return err
  164. }
  165. }
  166. for _, tag := range tags[id] {
  167. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
  168. if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
  169. return err
  170. }
  171. }
  172. }
  173. return nil
  174. }
  175. // pushRepository pushes layers that do not already exist on the registry.
  176. func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
  177. repoInfo *registry.RepositoryInfo, localRepo map[string]string,
  178. tag string, sf *utils.StreamFormatter) error {
  179. log.Debugf("Local repo: %s", localRepo)
  180. out = utils.NewWriteFlusher(out)
  181. imgList, tags, err := s.getImageList(localRepo, tag)
  182. if err != nil {
  183. return err
  184. }
  185. out.Write(sf.FormatStatus("", "Sending image list"))
  186. imageIndex := s.createImageIndex(imgList, tags)
  187. log.Debugf("Preparing to push %s with the following images and tags", localRepo)
  188. for _, data := range imageIndex {
  189. log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  190. }
  191. // Register all the images in a repository with the registry
  192. // If an image is not in this list it will not be associated with the repository
  193. repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
  194. if err != nil {
  195. return err
  196. }
  197. nTag := 1
  198. if tag == "" {
  199. nTag = len(localRepo)
  200. }
  201. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
  202. // push the repository to each of the endpoints only if it does not exist.
  203. for _, endpoint := range repoData.Endpoints {
  204. if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
  205. return err
  206. }
  207. }
  208. _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
  209. return err
  210. }
  211. func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  212. out = utils.NewWriteFlusher(out)
  213. jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
  214. if err != nil {
  215. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  216. }
  217. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  218. imgData := &registry.ImgData{
  219. ID: imgID,
  220. }
  221. // Send the json
  222. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  223. if err == registry.ErrAlreadyExists {
  224. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  225. return "", nil
  226. }
  227. return "", err
  228. }
  229. layerData, err := s.graph.TempLayerArchive(imgID, sf, out)
  230. if err != nil {
  231. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  232. }
  233. defer os.RemoveAll(layerData.Name())
  234. // Send the layer
  235. log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  236. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  237. if err != nil {
  238. return "", err
  239. }
  240. imgData.Checksum = checksum
  241. imgData.ChecksumPayload = checksumPayload
  242. // Send the checksum
  243. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  244. return "", err
  245. }
  246. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  247. return imgData.Checksum, nil
  248. }
  249. func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter) error {
  250. if repoInfo.Official {
  251. j := eng.Job("trust_update_base")
  252. if err := j.Run(); err != nil {
  253. log.Errorf("error updating trust base graph: %s", err)
  254. }
  255. }
  256. endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
  257. if err != nil {
  258. if repoInfo.Index.Official {
  259. log.Infof("Unable to push to V2 registry, falling back to v1: %s", err)
  260. return ErrV2RegistryUnavailable
  261. }
  262. return fmt.Errorf("error getting registry endpoint: %s", err)
  263. }
  264. tags, err := s.getImageTags(repoInfo.LocalName, tag)
  265. if err != nil {
  266. return err
  267. }
  268. if len(tags) == 0 {
  269. return fmt.Errorf("No tags to push for %s", repoInfo.LocalName)
  270. }
  271. auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, false)
  272. if err != nil {
  273. return fmt.Errorf("error getting authorization: %s", err)
  274. }
  275. for _, tag := range tags {
  276. log.Debugf("Pushing %s:%s to v2 repository", repoInfo.LocalName, tag)
  277. mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag)
  278. if err != nil {
  279. return err
  280. }
  281. js, err := libtrust.NewJSONSignature(mBytes)
  282. if err != nil {
  283. return err
  284. }
  285. if err = js.Sign(s.trustKey); err != nil {
  286. return err
  287. }
  288. signedBody, err := js.PrettySignature("signatures")
  289. if err != nil {
  290. return err
  291. }
  292. log.Infof("Signed manifest for %s:%s using daemon's key: %s", repoInfo.LocalName, tag, s.trustKey.KeyID())
  293. manifestBytes := string(signedBody)
  294. manifest, verified, err := s.loadManifest(eng, signedBody)
  295. if err != nil {
  296. return fmt.Errorf("error verifying manifest: %s", err)
  297. }
  298. if err := checkValidManifest(manifest); err != nil {
  299. return fmt.Errorf("invalid manifest: %s", err)
  300. }
  301. if verified {
  302. log.Infof("Pushing verified image, key %s is registered for %q", s.trustKey.KeyID(), repoInfo.RemoteName)
  303. }
  304. for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
  305. var (
  306. sumStr = manifest.FSLayers[i].BlobSum
  307. imgJSON = []byte(manifest.History[i].V1Compatibility)
  308. )
  309. sumParts := strings.SplitN(sumStr, ":", 2)
  310. if len(sumParts) < 2 {
  311. return fmt.Errorf("Invalid checksum: %s", sumStr)
  312. }
  313. manifestSum := sumParts[1]
  314. img, err := image.NewImgJSON(imgJSON)
  315. if err != nil {
  316. return fmt.Errorf("Failed to parse json: %s", err)
  317. }
  318. // Call mount blob
  319. exists, err := r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, auth)
  320. if err != nil {
  321. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
  322. return err
  323. }
  324. if !exists {
  325. if err := s.pushV2Image(r, img, endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, sf, out, auth); err != nil {
  326. return err
  327. }
  328. } else {
  329. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil))
  330. }
  331. }
  332. // push the manifest
  333. if err := r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth); err != nil {
  334. return err
  335. }
  336. }
  337. return nil
  338. }
  339. // PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
  340. func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName, sumType, sumStr string, sf *utils.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) error {
  341. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Buffering to Disk", nil))
  342. image, err := s.graph.Get(img.ID)
  343. if err != nil {
  344. return err
  345. }
  346. arch, err := image.TarLayer()
  347. if err != nil {
  348. return err
  349. }
  350. tf, err := s.graph.newTempFile()
  351. if err != nil {
  352. return err
  353. }
  354. defer func() {
  355. tf.Close()
  356. os.Remove(tf.Name())
  357. }()
  358. size, err := bufferToFile(tf, arch)
  359. if err != nil {
  360. return err
  361. }
  362. // Send the layer
  363. log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
  364. if err := r.PutV2ImageBlob(endpoint, imageName, sumType, sumStr, utils.ProgressReader(tf, int(size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth); err != nil {
  365. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
  366. return err
  367. }
  368. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
  369. return nil
  370. }
  371. // FIXME: Allow to interrupt current push when new push of same image is done.
  372. func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
  373. if n := len(job.Args); n != 1 {
  374. return job.Errorf("Usage: %s IMAGE", job.Name)
  375. }
  376. var (
  377. localName = job.Args[0]
  378. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  379. authConfig = &registry.AuthConfig{}
  380. metaHeaders map[string][]string
  381. )
  382. // Resolve the Repository name from fqn to RepositoryInfo
  383. repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
  384. if err != nil {
  385. return job.Error(err)
  386. }
  387. tag := job.Getenv("tag")
  388. job.GetenvJson("authConfig", authConfig)
  389. job.GetenvJson("metaHeaders", &metaHeaders)
  390. if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
  391. return job.Error(err)
  392. }
  393. defer s.poolRemove("push", repoInfo.LocalName)
  394. endpoint, err := repoInfo.GetEndpoint()
  395. if err != nil {
  396. return job.Error(err)
  397. }
  398. img, err := s.graph.Get(repoInfo.LocalName)
  399. r, err2 := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
  400. if err2 != nil {
  401. return job.Error(err2)
  402. }
  403. if endpoint.Version == registry.APIVersion2 {
  404. err := s.pushV2Repository(r, job.Eng, job.Stdout, repoInfo, tag, sf)
  405. if err == nil {
  406. return engine.StatusOK
  407. }
  408. if err != ErrV2RegistryUnavailable {
  409. return job.Errorf("Error pushing to registry: %s", err)
  410. }
  411. }
  412. if err != nil {
  413. reposLen := 1
  414. if tag == "" {
  415. reposLen = len(s.Repositories[repoInfo.LocalName])
  416. }
  417. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
  418. // If it fails, try to get the repository
  419. if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists {
  420. if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
  421. return job.Error(err)
  422. }
  423. return engine.StatusOK
  424. }
  425. return job.Error(err)
  426. }
  427. var token []string
  428. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName))
  429. if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil {
  430. return job.Error(err)
  431. }
  432. return engine.StatusOK
  433. }