pull.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package graph
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "net/url"
  7. "strings"
  8. "time"
  9. "github.com/docker/docker/engine"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/pkg/log"
  12. "github.com/docker/docker/registry"
  13. "github.com/docker/docker/utils"
  14. )
  15. func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
  16. if n := len(job.Args); n != 1 && n != 2 {
  17. return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
  18. }
  19. var (
  20. localName = job.Args[0]
  21. tag string
  22. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  23. authConfig = &registry.AuthConfig{}
  24. metaHeaders map[string][]string
  25. )
  26. if len(job.Args) > 1 {
  27. tag = job.Args[1]
  28. }
  29. job.GetenvJson("authConfig", authConfig)
  30. job.GetenvJson("metaHeaders", &metaHeaders)
  31. c, err := s.poolAdd("pull", localName+":"+tag)
  32. if err != nil {
  33. if c != nil {
  34. // Another pull of the same repository is already taking place; just wait for it to finish
  35. job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
  36. <-c
  37. return engine.StatusOK
  38. }
  39. return job.Error(err)
  40. }
  41. defer s.poolRemove("pull", localName+":"+tag)
  42. // Resolve the Repository name from fqn to endpoint + name
  43. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  44. if err != nil {
  45. return job.Error(err)
  46. }
  47. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  48. if err != nil {
  49. return job.Error(err)
  50. }
  51. r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
  52. if err != nil {
  53. return job.Error(err)
  54. }
  55. if endpoint == registry.IndexServerAddress() {
  56. // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
  57. localName = remoteName
  58. }
  59. if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
  60. return job.Error(err)
  61. }
  62. return engine.StatusOK
  63. }
  64. func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
  65. out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
  66. repoData, err := r.GetRepositoryData(remoteName)
  67. if err != nil {
  68. if strings.Contains(err.Error(), "HTTP code: 404") {
  69. return fmt.Errorf("Error: image %s not found", remoteName)
  70. } else {
  71. // Unexpected HTTP error
  72. return err
  73. }
  74. }
  75. log.Debugf("Retrieving the tag list")
  76. tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
  77. if err != nil {
  78. log.Errorf("%v", err)
  79. return err
  80. }
  81. for tag, id := range tagsList {
  82. repoData.ImgList[id] = &registry.ImgData{
  83. ID: id,
  84. Tag: tag,
  85. Checksum: "",
  86. }
  87. }
  88. log.Debugf("Registering tags")
  89. // If no tag has been specified, pull them all
  90. if askedTag == "" {
  91. for tag, id := range tagsList {
  92. repoData.ImgList[id].Tag = tag
  93. }
  94. } else {
  95. // Otherwise, check that the tag exists and use only that one
  96. id, exists := tagsList[askedTag]
  97. if !exists {
  98. return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
  99. }
  100. repoData.ImgList[id].Tag = askedTag
  101. }
  102. errors := make(chan error)
  103. for _, image := range repoData.ImgList {
  104. downloadImage := func(img *registry.ImgData) {
  105. if askedTag != "" && img.Tag != askedTag {
  106. log.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
  107. if parallel {
  108. errors <- nil
  109. }
  110. return
  111. }
  112. if img.Tag == "" {
  113. log.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
  114. if parallel {
  115. errors <- nil
  116. }
  117. return
  118. }
  119. // ensure no two downloads of the same image happen at the same time
  120. if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
  121. if c != nil {
  122. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
  123. <-c
  124. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  125. } else {
  126. log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
  127. }
  128. if parallel {
  129. errors <- nil
  130. }
  131. return
  132. }
  133. defer s.poolRemove("pull", "img:"+img.ID)
  134. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
  135. success := false
  136. var lastErr error
  137. for _, ep := range repoData.Endpoints {
  138. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
  139. if err := s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
  140. // It's not ideal that only the last error is returned, it would be better to concatenate the errors.
  141. // As the error is also given to the output stream the user will see the error.
  142. lastErr = err
  143. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
  144. continue
  145. }
  146. success = true
  147. break
  148. }
  149. if !success {
  150. err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
  151. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
  152. if parallel {
  153. errors <- err
  154. return
  155. }
  156. }
  157. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  158. if parallel {
  159. errors <- nil
  160. }
  161. }
  162. if parallel {
  163. go downloadImage(image)
  164. } else {
  165. downloadImage(image)
  166. }
  167. }
  168. if parallel {
  169. var lastError error
  170. for i := 0; i < len(repoData.ImgList); i++ {
  171. if err := <-errors; err != nil {
  172. lastError = err
  173. }
  174. }
  175. if lastError != nil {
  176. return lastError
  177. }
  178. }
  179. for tag, id := range tagsList {
  180. if askedTag != "" && tag != askedTag {
  181. continue
  182. }
  183. if err := s.Set(localName, tag, id, true); err != nil {
  184. return err
  185. }
  186. }
  187. return nil
  188. }
  189. func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
  190. history, err := r.GetRemoteHistory(imgID, endpoint, token)
  191. if err != nil {
  192. return err
  193. }
  194. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
  195. // FIXME: Try to stream the images?
  196. // FIXME: Launch the getRemoteImage() in goroutines
  197. for i := len(history) - 1; i >= 0; i-- {
  198. id := history[i]
  199. // ensure no two downloads of the same layer happen at the same time
  200. if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
  201. log.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
  202. <-c
  203. }
  204. defer s.poolRemove("pull", "layer:"+id)
  205. if !s.graph.Exists(id) {
  206. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
  207. var (
  208. imgJSON []byte
  209. imgSize int
  210. err error
  211. img *image.Image
  212. )
  213. retries := 5
  214. for j := 1; j <= retries; j++ {
  215. imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
  216. if err != nil && j == retries {
  217. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  218. return err
  219. } else if err != nil {
  220. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  221. continue
  222. }
  223. img, err = image.NewImgJSON(imgJSON)
  224. if err != nil && j == retries {
  225. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  226. return fmt.Errorf("Failed to parse json: %s", err)
  227. } else if err != nil {
  228. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  229. continue
  230. } else {
  231. break
  232. }
  233. }
  234. for j := 1; j <= retries; j++ {
  235. // Get the layer
  236. status := "Pulling fs layer"
  237. if j > 1 {
  238. status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
  239. }
  240. out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
  241. layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
  242. if uerr, ok := err.(*url.Error); ok {
  243. err = uerr.Err
  244. }
  245. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  246. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  247. continue
  248. } else if err != nil {
  249. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  250. return err
  251. }
  252. defer layer.Close()
  253. err = s.graph.Register(img, imgJSON,
  254. utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"))
  255. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  256. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  257. continue
  258. } else if err != nil {
  259. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
  260. return err
  261. } else {
  262. break
  263. }
  264. }
  265. }
  266. out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
  267. }
  268. return nil
  269. }