push.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package graph
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "github.com/docker/docker/archive"
  9. "github.com/docker/docker/engine"
  10. "github.com/docker/docker/registry"
  11. "github.com/docker/docker/utils"
  12. )
  13. // Retrieve the all the images to be uploaded in the correct order
  14. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  15. var (
  16. imageList []string
  17. imagesSeen map[string]bool = make(map[string]bool)
  18. tagsByImage map[string][]string = make(map[string][]string)
  19. )
  20. for tag, id := range localRepo {
  21. if requestedTag != "" && requestedTag != tag {
  22. continue
  23. }
  24. var imageListForThisTag []string
  25. tagsByImage[id] = append(tagsByImage[id], tag)
  26. for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
  27. if err != nil {
  28. return nil, nil, err
  29. }
  30. if imagesSeen[img.ID] {
  31. // This image is already on the list, we can ignore it and all its parents
  32. break
  33. }
  34. imagesSeen[img.ID] = true
  35. imageListForThisTag = append(imageListForThisTag, img.ID)
  36. }
  37. // reverse the image list for this tag (so the "most"-parent image is first)
  38. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  39. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  40. }
  41. // append to main image list
  42. imageList = append(imageList, imageListForThisTag...)
  43. }
  44. if len(imageList) == 0 {
  45. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  46. }
  47. utils.Debugf("Image list: %v", imageList)
  48. utils.Debugf("Tags by image: %v", tagsByImage)
  49. return imageList, tagsByImage, nil
  50. }
  51. func (s *TagStore) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, tag string, sf *utils.StreamFormatter) error {
  52. out = utils.NewWriteFlusher(out)
  53. utils.Debugf("Local repo: %s", localRepo)
  54. imgList, tagsByImage, err := s.getImageList(localRepo, tag)
  55. if err != nil {
  56. return err
  57. }
  58. out.Write(sf.FormatStatus("", "Sending image list"))
  59. var (
  60. repoData *registry.RepositoryData
  61. imageIndex []*registry.ImgData
  62. )
  63. for _, imgId := range imgList {
  64. if tags, exists := tagsByImage[imgId]; exists {
  65. // If an image has tags you must add an entry in the image index
  66. // for each tag
  67. for _, tag := range tags {
  68. imageIndex = append(imageIndex, &registry.ImgData{
  69. ID: imgId,
  70. Tag: tag,
  71. })
  72. }
  73. } else {
  74. // If the image does not have a tag it still needs to be sent to the
  75. // registry with an empty tag so that it is accociated with the repository
  76. imageIndex = append(imageIndex, &registry.ImgData{
  77. ID: imgId,
  78. Tag: "",
  79. })
  80. }
  81. }
  82. utils.Debugf("Preparing to push %s with the following images and tags\n", localRepo)
  83. for _, data := range imageIndex {
  84. utils.Debugf("Pushing ID: %s with Tag: %s\n", data.ID, data.Tag)
  85. }
  86. // Register all the images in a repository with the registry
  87. // If an image is not in this list it will not be associated with the repository
  88. repoData, err = r.PushImageJSONIndex(remoteName, imageIndex, false, nil)
  89. if err != nil {
  90. return err
  91. }
  92. nTag := 1
  93. if tag == "" {
  94. nTag = len(localRepo)
  95. }
  96. for _, ep := range repoData.Endpoints {
  97. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, nTag))
  98. for _, imgId := range imgList {
  99. if r.LookupRemoteImage(imgId, ep, repoData.Tokens) {
  100. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(imgId)))
  101. } else {
  102. if _, err := s.pushImage(r, out, remoteName, imgId, ep, repoData.Tokens, sf); err != nil {
  103. // FIXME: Continue on error?
  104. return err
  105. }
  106. }
  107. for _, tag := range tagsByImage[imgId] {
  108. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(imgId), ep+"repositories/"+remoteName+"/tags/"+tag))
  109. if err := r.PushRegistryTag(remoteName, imgId, tag, ep, repoData.Tokens); err != nil {
  110. return err
  111. }
  112. }
  113. }
  114. }
  115. if _, err := r.PushImageJSONIndex(remoteName, imageIndex, true, repoData.Endpoints); err != nil {
  116. return err
  117. }
  118. return nil
  119. }
  120. func (s *TagStore) pushImage(r *registry.Registry, out io.Writer, remote, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  121. out = utils.NewWriteFlusher(out)
  122. jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
  123. if err != nil {
  124. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  125. }
  126. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  127. imgData := &registry.ImgData{
  128. ID: imgID,
  129. }
  130. // Send the json
  131. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  132. if err == registry.ErrAlreadyExists {
  133. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  134. return "", nil
  135. }
  136. return "", err
  137. }
  138. layerData, err := s.graph.TempLayerArchive(imgID, archive.Uncompressed, sf, out)
  139. if err != nil {
  140. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  141. }
  142. defer os.RemoveAll(layerData.Name())
  143. // Send the layer
  144. utils.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  145. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  146. if err != nil {
  147. return "", err
  148. }
  149. imgData.Checksum = checksum
  150. imgData.ChecksumPayload = checksumPayload
  151. // Send the checksum
  152. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  153. return "", err
  154. }
  155. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  156. return imgData.Checksum, nil
  157. }
  158. // FIXME: Allow to interrupt current push when new push of same image is done.
  159. func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
  160. if n := len(job.Args); n != 1 {
  161. return job.Errorf("Usage: %s IMAGE", job.Name)
  162. }
  163. var (
  164. localName = job.Args[0]
  165. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  166. authConfig = &registry.AuthConfig{}
  167. metaHeaders map[string][]string
  168. )
  169. tag := job.Getenv("tag")
  170. job.GetenvJson("authConfig", authConfig)
  171. job.GetenvJson("metaHeaders", &metaHeaders)
  172. if _, err := s.poolAdd("push", localName); err != nil {
  173. return job.Error(err)
  174. }
  175. defer s.poolRemove("push", localName)
  176. // Resolve the Repository name from fqn to endpoint + name
  177. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  178. if err != nil {
  179. return job.Error(err)
  180. }
  181. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  182. if err != nil {
  183. return job.Error(err)
  184. }
  185. img, err := s.graph.Get(localName)
  186. r, err2 := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
  187. if err2 != nil {
  188. return job.Error(err2)
  189. }
  190. if err != nil {
  191. reposLen := 1
  192. if tag == "" {
  193. reposLen = len(s.Repositories[localName])
  194. }
  195. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
  196. // If it fails, try to get the repository
  197. if localRepo, exists := s.Repositories[localName]; exists {
  198. if err := s.pushRepository(r, job.Stdout, localName, remoteName, localRepo, tag, sf); err != nil {
  199. return job.Error(err)
  200. }
  201. return engine.StatusOK
  202. }
  203. return job.Error(err)
  204. }
  205. var token []string
  206. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
  207. if _, err := s.pushImage(r, job.Stdout, remoteName, img.ID, endpoint, token, sf); err != nil {
  208. return job.Error(err)
  209. }
  210. return engine.StatusOK
  211. }