push.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package graph
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sync"
  9. log "github.com/Sirupsen/logrus"
  10. "github.com/docker/docker/engine"
  11. "github.com/docker/docker/pkg/archive"
  12. "github.com/docker/docker/registry"
  13. "github.com/docker/docker/utils"
  14. )
  15. // Retrieve the all the images to be uploaded in the correct order
  16. func (s *TagStore) getImageList(localRepo map[string]string, requestedTag string) ([]string, map[string][]string, error) {
  17. var (
  18. imageList []string
  19. imagesSeen = make(map[string]bool)
  20. tagsByImage = make(map[string][]string)
  21. )
  22. for tag, id := range localRepo {
  23. if requestedTag != "" && requestedTag != tag {
  24. continue
  25. }
  26. var imageListForThisTag []string
  27. tagsByImage[id] = append(tagsByImage[id], tag)
  28. for img, err := s.graph.Get(id); img != nil; img, err = img.GetParent() {
  29. if err != nil {
  30. return nil, nil, err
  31. }
  32. if imagesSeen[img.ID] {
  33. // This image is already on the list, we can ignore it and all its parents
  34. break
  35. }
  36. imagesSeen[img.ID] = true
  37. imageListForThisTag = append(imageListForThisTag, img.ID)
  38. }
  39. // reverse the image list for this tag (so the "most"-parent image is first)
  40. for i, j := 0, len(imageListForThisTag)-1; i < j; i, j = i+1, j-1 {
  41. imageListForThisTag[i], imageListForThisTag[j] = imageListForThisTag[j], imageListForThisTag[i]
  42. }
  43. // append to main image list
  44. imageList = append(imageList, imageListForThisTag...)
  45. }
  46. if len(imageList) == 0 {
  47. return nil, nil, fmt.Errorf("No images found for the requested repository / tag")
  48. }
  49. log.Debugf("Image list: %v", imageList)
  50. log.Debugf("Tags by image: %v", tagsByImage)
  51. return imageList, tagsByImage, nil
  52. }
  53. // createImageIndex returns an index of an image's layer IDs and tags.
  54. func (s *TagStore) createImageIndex(images []string, tags map[string][]string) []*registry.ImgData {
  55. var imageIndex []*registry.ImgData
  56. for _, id := range images {
  57. if tags, hasTags := tags[id]; hasTags {
  58. // If an image has tags you must add an entry in the image index
  59. // for each tag
  60. for _, tag := range tags {
  61. imageIndex = append(imageIndex, &registry.ImgData{
  62. ID: id,
  63. Tag: tag,
  64. })
  65. }
  66. continue
  67. }
  68. // If the image does not have a tag it still needs to be sent to the
  69. // registry with an empty tag so that it is accociated with the repository
  70. imageIndex = append(imageIndex, &registry.ImgData{
  71. ID: id,
  72. Tag: "",
  73. })
  74. }
  75. return imageIndex
  76. }
  77. type imagePushData struct {
  78. id string
  79. endpoint string
  80. tokens []string
  81. }
  82. // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
  83. // and if it is absent then it sends the image id to the channel to be pushed.
  84. func lookupImageOnEndpoint(wg *sync.WaitGroup, r *registry.Session, out io.Writer, sf *utils.StreamFormatter,
  85. images chan imagePushData, imagesToPush chan string) {
  86. defer wg.Done()
  87. for image := range images {
  88. if err := r.LookupRemoteImage(image.id, image.endpoint, image.tokens); err != nil {
  89. log.Errorf("Error in LookupRemoteImage: %s", err)
  90. imagesToPush <- image.id
  91. continue
  92. }
  93. out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", utils.TruncateID(image.id)))
  94. }
  95. }
  96. func (s *TagStore) pushImageToEndpoint(endpoint string, out io.Writer, remoteName string, imageIDs []string,
  97. tags map[string][]string, repo *registry.RepositoryData, sf *utils.StreamFormatter, r *registry.Session) error {
  98. workerCount := len(imageIDs)
  99. // start a maximum of 5 workers to check if images exist on the specified endpoint.
  100. if workerCount > 5 {
  101. workerCount = 5
  102. }
  103. var (
  104. wg = &sync.WaitGroup{}
  105. imageData = make(chan imagePushData, workerCount*2)
  106. imagesToPush = make(chan string, workerCount*2)
  107. pushes = make(chan map[string]struct{}, 1)
  108. )
  109. for i := 0; i < workerCount; i++ {
  110. wg.Add(1)
  111. go lookupImageOnEndpoint(wg, r, out, sf, imageData, imagesToPush)
  112. }
  113. // start a go routine that consumes the images to push
  114. go func() {
  115. shouldPush := make(map[string]struct{})
  116. for id := range imagesToPush {
  117. shouldPush[id] = struct{}{}
  118. }
  119. pushes <- shouldPush
  120. }()
  121. for _, id := range imageIDs {
  122. imageData <- imagePushData{
  123. id: id,
  124. endpoint: endpoint,
  125. tokens: repo.Tokens,
  126. }
  127. }
  128. // close the channel to notify the workers that there will be no more images to check.
  129. close(imageData)
  130. wg.Wait()
  131. close(imagesToPush)
  132. // wait for all the images that require pushes to be collected into a consumable map.
  133. shouldPush := <-pushes
  134. // finish by pushing any images and tags to the endpoint. The order that the images are pushed
  135. // is very important that is why we are still itterating over the ordered list of imageIDs.
  136. for _, id := range imageIDs {
  137. if _, push := shouldPush[id]; push {
  138. if _, err := s.pushImage(r, out, id, endpoint, repo.Tokens, sf); err != nil {
  139. // FIXME: Continue on error?
  140. return err
  141. }
  142. }
  143. for _, tag := range tags[id] {
  144. out.Write(sf.FormatStatus("", "Pushing tag for rev [%s] on {%s}", utils.TruncateID(id), endpoint+"repositories/"+remoteName+"/tags/"+tag))
  145. if err := r.PushRegistryTag(remoteName, id, tag, endpoint, repo.Tokens); err != nil {
  146. return err
  147. }
  148. }
  149. }
  150. return nil
  151. }
  152. // pushRepository pushes layers that do not already exist on the registry.
  153. func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
  154. repoInfo *registry.RepositoryInfo, localRepo map[string]string,
  155. tag string, sf *utils.StreamFormatter) error {
  156. log.Debugf("Local repo: %s", localRepo)
  157. out = utils.NewWriteFlusher(out)
  158. imgList, tags, err := s.getImageList(localRepo, tag)
  159. if err != nil {
  160. return err
  161. }
  162. out.Write(sf.FormatStatus("", "Sending image list"))
  163. imageIndex := s.createImageIndex(imgList, tags)
  164. log.Debugf("Preparing to push %s with the following images and tags", localRepo)
  165. for _, data := range imageIndex {
  166. log.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
  167. }
  168. // Register all the images in a repository with the registry
  169. // If an image is not in this list it will not be associated with the repository
  170. repoData, err := r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, false, nil)
  171. if err != nil {
  172. return err
  173. }
  174. nTag := 1
  175. if tag == "" {
  176. nTag = len(localRepo)
  177. }
  178. out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", repoInfo.CanonicalName, nTag))
  179. // push the repository to each of the endpoints only if it does not exist.
  180. for _, endpoint := range repoData.Endpoints {
  181. if err := s.pushImageToEndpoint(endpoint, out, repoInfo.RemoteName, imgList, tags, repoData, sf, r); err != nil {
  182. return err
  183. }
  184. }
  185. _, err = r.PushImageJSONIndex(repoInfo.RemoteName, imageIndex, true, repoData.Endpoints)
  186. return err
  187. }
  188. func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *utils.StreamFormatter) (checksum string, err error) {
  189. out = utils.NewWriteFlusher(out)
  190. jsonRaw, err := ioutil.ReadFile(path.Join(s.graph.Root, imgID, "json"))
  191. if err != nil {
  192. return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)
  193. }
  194. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pushing", nil))
  195. imgData := &registry.ImgData{
  196. ID: imgID,
  197. }
  198. // Send the json
  199. if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
  200. if err == registry.ErrAlreadyExists {
  201. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image already pushed, skipping", nil))
  202. return "", nil
  203. }
  204. return "", err
  205. }
  206. layerData, err := s.graph.TempLayerArchive(imgID, archive.Uncompressed, sf, out)
  207. if err != nil {
  208. return "", fmt.Errorf("Failed to generate layer archive: %s", err)
  209. }
  210. defer os.RemoveAll(layerData.Name())
  211. // Send the layer
  212. log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
  213. checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, utils.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
  214. if err != nil {
  215. return "", err
  216. }
  217. imgData.Checksum = checksum
  218. imgData.ChecksumPayload = checksumPayload
  219. // Send the checksum
  220. if err := r.PushImageChecksumRegistry(imgData, ep, token); err != nil {
  221. return "", err
  222. }
  223. out.Write(sf.FormatProgress(utils.TruncateID(imgData.ID), "Image successfully pushed", nil))
  224. return imgData.Checksum, nil
  225. }
  226. // FIXME: Allow to interrupt current push when new push of same image is done.
  227. func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
  228. if n := len(job.Args); n != 1 {
  229. return job.Errorf("Usage: %s IMAGE", job.Name)
  230. }
  231. var (
  232. localName = job.Args[0]
  233. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  234. authConfig = &registry.AuthConfig{}
  235. metaHeaders map[string][]string
  236. )
  237. // Resolve the Repository name from fqn to RepositoryInfo
  238. repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
  239. if err != nil {
  240. return job.Error(err)
  241. }
  242. tag := job.Getenv("tag")
  243. job.GetenvJson("authConfig", authConfig)
  244. job.GetenvJson("metaHeaders", &metaHeaders)
  245. if _, err := s.poolAdd("push", repoInfo.LocalName); err != nil {
  246. return job.Error(err)
  247. }
  248. defer s.poolRemove("push", repoInfo.LocalName)
  249. endpoint, err := repoInfo.GetEndpoint()
  250. if err != nil {
  251. return job.Error(err)
  252. }
  253. img, err := s.graph.Get(repoInfo.LocalName)
  254. r, err2 := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, false)
  255. if err2 != nil {
  256. return job.Error(err2)
  257. }
  258. if err != nil {
  259. reposLen := 1
  260. if tag == "" {
  261. reposLen = len(s.Repositories[repoInfo.LocalName])
  262. }
  263. job.Stdout.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.CanonicalName, reposLen))
  264. // If it fails, try to get the repository
  265. if localRepo, exists := s.Repositories[repoInfo.LocalName]; exists {
  266. if err := s.pushRepository(r, job.Stdout, repoInfo, localRepo, tag, sf); err != nil {
  267. return job.Error(err)
  268. }
  269. return engine.StatusOK
  270. }
  271. return job.Error(err)
  272. }
  273. var token []string
  274. job.Stdout.Write(sf.FormatStatus("", "The push refers to an image: [%s]", repoInfo.CanonicalName))
  275. if _, err := s.pushImage(r, job.Stdout, img.ID, endpoint.String(), token, sf); err != nil {
  276. return job.Error(err)
  277. }
  278. return engine.StatusOK
  279. }