push.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package graph
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "path"
  9. "strings"
  10. "sync"
  11. log "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/engine"
  13. "github.com/docker/docker/image"
  14. "github.com/docker/docker/registry"
  15. "github.com/docker/docker/utils"
  16. "github.com/docker/libtrust"
  17. )
  18. // Retrieve the all the images to be uploaded in the correct order
  19. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  20. var (
  21. imageList []string
  22. imagesSeen = make(map[string]bool)
  23. tagsByImage = make(map[string][]string)
  24. )
  25. for tag, id := range localRepo {
  26. if requestedTag != "" && requestedTag != tag {
  27. continue
  28. }
  29. var imageListForThisTag []string
  30. tagsByImage[id] = append(tagsByImage[id], tag)
  31. for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
  32. if err != nil {
  33. return nil, nil, err
  34. }
  35. if imagesSeen[img.ID] {
  36. // This image is already on the list, we can ignore it and all its parents
  37. break
  38. }
  39. imagesSeen[img.ID] = true
  40. imageListForThisTag = append(imageListForThisTag, img.ID)
  41. }
  42. // reverse the image list for this tag (so the "most"-parent image is first)
  43. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  44. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  45. }
  46. // append to main image list
  47. imageList = append(imageList, imageListForThisTag...)
  48. }
  49. if len(imageList) == 0 {
  50. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  51. }
  52. log.Debugf("Image list: %v", imageList)
  53. log.Debugf("Tags by image: %v", tagsByImage)
  54. return imageList, tagsByImage, nil
  55. }
  56. // createImageIndex returns an index of an image's layer IDs and tags.
  57. func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
  58. var imageIndex []*registry.ImgData
  59. for _, id := range images {
  60. if tags, hasTags := tags[id]; hasTags {
  61. // If an image has tags you must add an entry in the image index
  62. // for each tag
  63. for _, tag := range tags {
  64. imageIndex = append(imageIndex, &registry.ImgData{
  65. ID: id,
  66. Tag: tag,
  67. })
  68. }
  69. continue
  70. }
  71. // If the image does not have a tag it still needs to be sent to the
  72. // registry with an empty tag so that it is accociated with the repository
  73. imageIndex = append(imageIndex, &registry.ImgData{
  74. ID: id,
  75. Tag: "",
  76. })
  77. }
  78. return imageIndex
  79. }
  80. type imagePushData struct {
  81. id string
  82. endpoint string
  83. tokens []string
  84. }
  85. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  86. // and if it is absent then it sends the image id to the channel to be pushed.
  87. func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter,
  88. images chan imagePushData, imagesToPush chan string) {
  89. defer wg.Done()
  90. for image := range images {
  91. if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
  92. log.Errorf("Error in LookupRemoteImage: %s", err)
  93. imagesToPush <- image.id
  94. continue
  95. }
  96. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id)))
  97. }
  98. }
  99. func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
  100. tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error {
  101. workerCount := len(imageIDs)
  102. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  103. if workerCount > 5 {
  104. workerCount = 5
  105. }
  106. var (
  107. wg = &sync.WaitGroup{}
  108. imageData = make(chan imagePushData, workerCount*2)
  109. imagesToPush = make(chan string, workerCount*2)
  110. pushes = make(chan map[string]struct{}, 1)
  111. )
  112. for i := 0; i < workerCount; i++ {
  113. wg.Add(1)
  114. go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
  115. }
  116. // start a go routine that consumes the images to push
  117. go func() {
  118. shouldPush := make(map[string]struct{})
  119. for id := range imagesToPush {
  120. shouldPush[id] = struct{}{}
  121. }
  122. pushes <- shouldPush
  123. }()
  124. for _, id := range imageIDs {
  125. imageData <- imagePushData{
  126. id: id,
  127. endpoint: endpoint,
  128. tokens: repo.Tokens,
  129. }
  130. }
  131. // close the channel to notify the workers that there will be no more images to check.
  132. close(imageData)
  133. wg.Wait()
  134. close(imagesToPush)
  135. // wait for all the images that require pushes to be collected into a consumable map.
  136. shouldPush := <-pushes
  137. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  138. // is very important that is why we are still itterating over the ordered list of imageIDs.
  139. for _, id := range imageIDs {
  140. if _, push := shouldPush[id]; push {
  141. if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
  142. // FIXME: Continue on error?
  143. return err
  144. }
  145. }
  146. for _, tag := range tags[id] {
  147. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
  148. if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
  149. return err
  150. }
  151. }
  152. }
  153. return nil
  154. }
  155. // pushRepository pushes layers that do not already exist on the registry.
  156. func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
  157. repoInfo *registry.RepositoryInfo, localRepo map[string]string,
  158. tag string, sf *utils.StreamFormatter) error {
  159. log.Debugf("Local repo: %s", localRepo)
  160. out = utils.NewWriteFlusher(out)
  161. imgList, tags, err := s.getImageList(localRepo, tag)
  162. if err != nil {
  163. return err
  164. }
  165. out.Write(sf.FormatStatus("", "Sending image list"))
  166. imageIndex := s.createImageIndex(imgList, tags)
  167. log.Debugf("Preparing to push %s with the following images and tags", localRepo)
  168. for _, data := range imageIndex {
  169. log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  170. }
  171. // Register all the images in a repository with the registry
  172. // If an image is not in this list it will not be associated with the repository
  173. repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
  174. if err != nil {
  175. return err
  176. }
  177. nTag := 1
  178. if tag == "" {
  179. nTag = len(localRepo)
  180. }
  181. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
  182. // push the repository to each of the endpoints only if it does not exist.
  183. for _, endpoint := range repoData.Endpoints {
  184. if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
  185. return err
  186. }
  187. }
  188. _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
  189. return err
  190. }
  191. func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  192. out = utils.NewWriteFlusher(out)
  193. jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
  194. if err != nil {
  195. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  196. }
  197. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  198. imgData := &registry.ImgData{
  199. ID: imgID,
  200. }
  201. // Send the json
  202. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  203. if err == registry.ErrAlreadyExists {
  204. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  205. return "", nil
  206. }
  207. return "", err
  208. }
  209. layerData, err := s.graph.TempLayerArchive(imgID, sf, out)
  210. if err != nil {
  211. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  212. }
  213. defer os.RemoveAll(layerData.Name())
  214. // Send the layer
  215. log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  216. prgRd := utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing")
  217. defer prgRd.Close()
  218. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, prgRd, ep, token, jsonRaw)
  219. if err != nil {
  220. return "", err
  221. }
  222. imgData.Checksum = checksum
  223. imgData.ChecksumPayload = checksumPayload
  224. // Send the checksum
  225. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  226. return "", err
  227. }
  228. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  229. return imgData.Checksum, nil
  230. }
  231. func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out io.Writer, repoInfo *registry.RepositoryInfo, manifestBytes, tag string, sf *utils.StreamFormatter) error {
  232. if repoInfo.Official {
  233. j := eng.Job("trust_update_base")
  234. if err := j.Run(); err != nil {
  235. log.Errorf("error updating trust base graph: %s", err)
  236. }
  237. }
  238. endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
  239. if err != nil {
  240. return fmt.Errorf("error getting registry endpoint: %s", err)
  241. }
  242. auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, false)
  243. if err != nil {
  244. return fmt.Errorf("error getting authorization: %s", err)
  245. }
  246. // if no manifest is given, generate and sign with the key associated with the local tag store
  247. if len(manifestBytes) == 0 {
  248. mBytes, err := s.newManifest(repoInfo.LocalName, repoInfo.RemoteName, tag)
  249. if err != nil {
  250. return err
  251. }
  252. js, err := libtrust.NewJSONSignature(mBytes)
  253. if err != nil {
  254. return err
  255. }
  256. if err = js.Sign(s.trustKey); err != nil {
  257. return err
  258. }
  259. signedBody, err := js.PrettySignature("signatures")
  260. if err != nil {
  261. return err
  262. }
  263. log.Infof("Signed manifest using daemon's key: %s", s.trustKey.KeyID())
  264. manifestBytes = string(signedBody)
  265. }
  266. manifest, verified, err := s.verifyManifest(eng, []byte(manifestBytes))
  267. if err != nil {
  268. return fmt.Errorf("error verifying manifest: %s", err)
  269. }
  270. if err := checkValidManifest(manifest); err != nil {
  271. return fmt.Errorf("invalid manifest: %s", err)
  272. }
  273. if !verified {
  274. log.Debugf("Pushing unverified image")
  275. }
  276. for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
  277. var (
  278. sumStr = manifest.FSLayers[i].BlobSum
  279. imgJSON = []byte(manifest.History[i].V1Compatibility)
  280. )
  281. sumParts := strings.SplitN(sumStr, ":", 2)
  282. if len(sumParts) < 2 {
  283. return fmt.Errorf("Invalid checksum: %s", sumStr)
  284. }
  285. manifestSum := sumParts[1]
  286. img, err := image.NewImgJSON(imgJSON)
  287. if err != nil {
  288. return fmt.Errorf("Failed to parse json: %s", err)
  289. }
  290. img, err = s.graph.Get(img.ID)
  291. if err != nil {
  292. return err
  293. }
  294. arch, err := img.TarLayer()
  295. if err != nil {
  296. return fmt.Errorf("Could not get tar layer: %s", err)
  297. }
  298. // Call mount blob
  299. exists, err := r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, auth)
  300. if err != nil {
  301. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
  302. return err
  303. }
  304. if !exists {
  305. prgRd := utils.ProgressReader(arch, int(img.Size), out, sf, false, utils.TruncateID(img.ID), "Pushing")
  306. defer prgRd.Close()
  307. err = r.PutV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, prgRd, auth)
  308. if err != nil {
  309. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
  310. return err
  311. }
  312. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
  313. } else {
  314. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil))
  315. }
  316. }
  317. // push the manifest
  318. return r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth)
  319. }
  320. // FIXME: Allow to interrupt current push when new push of same image is done.
  321. func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
  322. if n := len(job.Args); n != 1 {
  323. return job.Errorf("Usage: %s IMAGE", job.Name)
  324. }
  325. var (
  326. localName = job.Args[0]
  327. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  328. authConfig = &registry.AuthConfig{}
  329. metaHeaders map[string][]string
  330. )
  331. // Resolve the Repository name from fqn to RepositoryInfo
  332. repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
  333. if err != nil {
  334. return job.Error(err)
  335. }
  336. tag := job.Getenv("tag")
  337. manifestBytes := job.Getenv("manifest")
  338. job.GetenvJson("authConfig", authConfig)
  339. job.GetenvJson("metaHeaders", &metaHeaders)
  340. if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
  341. return job.Error(err)
  342. }
  343. defer s.poolRemove("push", repoInfo.LocalName)
  344. endpoint, err := repoInfo.GetEndpoint()
  345. if err != nil {
  346. return job.Error(err)
  347. }
  348. img, err := s.graph.Get(repoInfo.LocalName)
  349. r, err2 := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
  350. if err2 != nil {
  351. return job.Error(err2)
  352. }
  353. if len(tag) == 0 {
  354. tag = DEFAULTTAG
  355. }
  356. if repoInfo.Index.Official || endpoint.Version == registry.APIVersion2 {
  357. err := s.pushV2Repository(r, job.Eng, job.Stdout, repoInfo, manifestBytes, tag, sf)
  358. if err == nil {
  359. return engine.StatusOK
  360. }
  361. // error out, no fallback to V1
  362. return job.Errorf("Error pushing to registry: %s", err)
  363. }
  364. if err != nil {
  365. reposLen := 1
  366. if tag == "" {
  367. reposLen = len(s.Repositories[repoInfo.LocalName])
  368. }
  369. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
  370. // If it fails, try to get the repository
  371. if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists {
  372. if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
  373. return job.Error(err)
  374. }
  375. return engine.StatusOK
  376. }
  377. return job.Error(err)
  378. }
  379. var token []string
  380. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName))
  381. if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil {
  382. return job.Error(err)
  383. }
  384. return engine.StatusOK
  385. }