pull.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package graph
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "net/url"
  7. "strings"
  8. "time"
  9. "github.com/docker/docker/engine"
  10. "github.com/docker/docker/image"
  11. "github.com/docker/docker/registry"
  12. "github.com/docker/docker/utils"
  13. )
  14. func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
  15. if n := len(job.Args); n != 1 && n != 2 {
  16. return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
  17. }
  18. var (
  19. localName = job.Args[0]
  20. tag string
  21. sf = utils.NewStreamFormatter(job.GetenvBool("json"))
  22. authConfig = &registry.AuthConfig{}
  23. metaHeaders map[string][]string
  24. )
  25. if len(job.Args) > 1 {
  26. tag = job.Args[1]
  27. }
  28. job.GetenvJson("authConfig", authConfig)
  29. job.GetenvJson("metaHeaders", &metaHeaders)
  30. c, err := s.poolAdd("pull", localName+":"+tag)
  31. if err != nil {
  32. if c != nil {
  33. // Another pull of the same repository is already taking place; just wait for it to finish
  34. job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
  35. <-c
  36. return engine.StatusOK
  37. }
  38. return job.Error(err)
  39. }
  40. defer s.poolRemove("pull", localName+":"+tag)
  41. // Resolve the Repository name from fqn to endpoint + name
  42. hostname, remoteName, err := registry.ResolveRepositoryName(localName)
  43. if err != nil {
  44. return job.Error(err)
  45. }
  46. endpoint, err := registry.ExpandAndVerifyRegistryUrl(hostname)
  47. if err != nil {
  48. return job.Error(err)
  49. }
  50. r, err := registry.NewRegistry(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
  51. if err != nil {
  52. return job.Error(err)
  53. }
  54. if endpoint == registry.IndexServerAddress() {
  55. // If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
  56. localName = remoteName
  57. }
  58. if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil {
  59. return job.Error(err)
  60. }
  61. return engine.StatusOK
  62. }
  63. func (s *TagStore) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
  64. out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
  65. repoData, err := r.GetRepositoryData(remoteName)
  66. if err != nil {
  67. if strings.Contains(err.Error(), "HTTP code: 404") {
  68. return fmt.Errorf("Error: image %s not found", remoteName)
  69. } else {
  70. // Unexpected HTTP error
  71. return err
  72. }
  73. }
  74. utils.Debugf("Retrieving the tag list")
  75. tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
  76. if err != nil {
  77. utils.Errorf("%v", err)
  78. return err
  79. }
  80. for tag, id := range tagsList {
  81. repoData.ImgList[id] = &registry.ImgData{
  82. ID: id,
  83. Tag: tag,
  84. Checksum: "",
  85. }
  86. }
  87. utils.Debugf("Registering tags")
  88. // If no tag has been specified, pull them all
  89. if askedTag == "" {
  90. for tag, id := range tagsList {
  91. repoData.ImgList[id].Tag = tag
  92. }
  93. } else {
  94. // Otherwise, check that the tag exists and use only that one
  95. id, exists := tagsList[askedTag]
  96. if !exists {
  97. return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
  98. }
  99. repoData.ImgList[id].Tag = askedTag
  100. }
  101. errors := make(chan error)
  102. for _, image := range repoData.ImgList {
  103. downloadImage := func(img *registry.ImgData) {
  104. if askedTag != "" && img.Tag != askedTag {
  105. utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
  106. if parallel {
  107. errors <- nil
  108. }
  109. return
  110. }
  111. if img.Tag == "" {
  112. utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
  113. if parallel {
  114. errors <- nil
  115. }
  116. return
  117. }
  118. // ensure no two downloads of the same image happen at the same time
  119. if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
  120. if c != nil {
  121. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
  122. <-c
  123. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  124. } else {
  125. utils.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
  126. }
  127. if parallel {
  128. errors <- nil
  129. }
  130. return
  131. }
  132. defer s.poolRemove("pull", "img:"+img.ID)
  133. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
  134. success := false
  135. var lastErr error
  136. for _, ep := range repoData.Endpoints {
  137. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
  138. if err := s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
  139. // It's not ideal that only the last error is returned, it would be better to concatenate the errors.
  140. // As the error is also given to the output stream the user will see the error.
  141. lastErr = err
  142. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
  143. continue
  144. }
  145. success = true
  146. break
  147. }
  148. if !success {
  149. err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
  150. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
  151. if parallel {
  152. errors <- err
  153. return
  154. }
  155. }
  156. out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
  157. if parallel {
  158. errors <- nil
  159. }
  160. }
  161. if parallel {
  162. go downloadImage(image)
  163. } else {
  164. downloadImage(image)
  165. }
  166. }
  167. if parallel {
  168. var lastError error
  169. for i := 0; i < len(repoData.ImgList); i++ {
  170. if err := <-errors; err != nil {
  171. lastError = err
  172. }
  173. }
  174. if lastError != nil {
  175. return lastError
  176. }
  177. }
  178. for tag, id := range tagsList {
  179. if askedTag != "" && tag != askedTag {
  180. continue
  181. }
  182. if err := s.Set(localName, tag, id, true); err != nil {
  183. return err
  184. }
  185. }
  186. return nil
  187. }
  188. func (s *TagStore) pullImage(r *registry.Registry, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) error {
  189. history, err := r.GetRemoteHistory(imgID, endpoint, token)
  190. if err != nil {
  191. return err
  192. }
  193. out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
  194. // FIXME: Try to stream the images?
  195. // FIXME: Launch the getRemoteImage() in goroutines
  196. for i := len(history) - 1; i >= 0; i-- {
  197. id := history[i]
  198. // ensure no two downloads of the same layer happen at the same time
  199. if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
  200. utils.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
  201. <-c
  202. }
  203. defer s.poolRemove("pull", "layer:"+id)
  204. if !s.graph.Exists(id) {
  205. out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
  206. var (
  207. imgJSON []byte
  208. imgSize int
  209. err error
  210. img *image.Image
  211. )
  212. retries := 5
  213. for j := 1; j <= retries; j++ {
  214. imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
  215. if err != nil && j == retries {
  216. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  217. return err
  218. } else if err != nil {
  219. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  220. continue
  221. }
  222. img, err = image.NewImgJSON(imgJSON)
  223. if err != nil && j == retries {
  224. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  225. return fmt.Errorf("Failed to parse json: %s", err)
  226. } else if err != nil {
  227. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  228. continue
  229. } else {
  230. break
  231. }
  232. }
  233. for j := 1; j <= retries; j++ {
  234. // Get the layer
  235. status := "Pulling fs layer"
  236. if j > 1 {
  237. status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
  238. }
  239. out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
  240. layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
  241. if uerr, ok := err.(*url.Error); ok {
  242. err = uerr.Err
  243. }
  244. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  245. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  246. continue
  247. } else if err != nil {
  248. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
  249. return err
  250. }
  251. defer layer.Close()
  252. err = s.graph.Register(imgJSON,
  253. utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"),
  254. img)
  255. if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
  256. time.Sleep(time.Duration(j) * 500 * time.Millisecond)
  257. continue
  258. } else if err != nil {
  259. out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
  260. return err
  261. } else {
  262. break
  263. }
  264. }
  265. }
  266. out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
  267. }
  268. return nil
  269. }