download.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package xfer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/distribution"
  9. "github.com/docker/docker/image"
  10. "github.com/docker/docker/layer"
  11. "github.com/docker/docker/pkg/archive"
  12. "github.com/docker/docker/pkg/ioutils"
  13. "github.com/docker/docker/pkg/progress"
  14. "golang.org/x/net/context"
  15. )
  16. const maxDownloadAttempts = 5
  17. // LayerDownloadManager figures out which layers need to be downloaded, then
  18. // registers and downloads those, taking into account dependencies between
  19. // layers.
  20. type LayerDownloadManager struct {
  21. layerStore layer.Store
  22. tm TransferManager
  23. }
  24. // SetConcurrency set the max concurrent downloads for each pull
  25. func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
  26. ldm.tm.SetConcurrency(concurrency)
  27. }
  28. // NewLayerDownloadManager returns a new LayerDownloadManager.
  29. func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager {
  30. return &LayerDownloadManager{
  31. layerStore: layerStore,
  32. tm: NewTransferManager(concurrencyLimit),
  33. }
  34. }
  35. type downloadTransfer struct {
  36. Transfer
  37. layerStore layer.Store
  38. layer layer.Layer
  39. err error
  40. }
  41. // result returns the layer resulting from the download, if the download
  42. // and registration were successful.
  43. func (d *downloadTransfer) result() (layer.Layer, error) {
  44. return d.layer, d.err
  45. }
  46. // A DownloadDescriptor references a layer that may need to be downloaded.
  47. type DownloadDescriptor interface {
  48. // Key returns the key used to deduplicate downloads.
  49. Key() string
  50. // ID returns the ID for display purposes.
  51. ID() string
  52. // DiffID should return the DiffID for this layer, or an error
  53. // if it is unknown (for example, if it has not been downloaded
  54. // before).
  55. DiffID() (layer.DiffID, error)
  56. // Download is called to perform the download.
  57. Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
  58. // Close is called when the download manager is finished with this
  59. // descriptor and will not call Download again or read from the reader
  60. // that Download returned.
  61. Close()
  62. }
  63. // DownloadDescriptorWithRegistered is a DownloadDescriptor that has an
  64. // additional Registered method which gets called after a downloaded layer is
  65. // registered. This allows the user of the download manager to know the DiffID
  66. // of each registered layer. This method is called if a cast to
  67. // DownloadDescriptorWithRegistered is successful.
  68. type DownloadDescriptorWithRegistered interface {
  69. DownloadDescriptor
  70. Registered(diffID layer.DiffID)
  71. }
  72. // Download is a blocking function which ensures the requested layers are
  73. // present in the layer store. It uses the string returned by the Key method to
  74. // deduplicate downloads. If a given layer is not already known to present in
  75. // the layer store, and the key is not used by an in-progress download, the
  76. // Download method is called to get the layer tar data. Layers are then
  77. // registered in the appropriate order. The caller must call the returned
  78. // release function once it is is done with the returned RootFS object.
  79. func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  80. var (
  81. topLayer layer.Layer
  82. topDownload *downloadTransfer
  83. watcher *Watcher
  84. missingLayer bool
  85. transferKey = ""
  86. downloadsByKey = make(map[string]*downloadTransfer)
  87. )
  88. rootFS := initialRootFS
  89. for _, descriptor := range layers {
  90. key := descriptor.Key()
  91. transferKey += key
  92. if !missingLayer {
  93. missingLayer = true
  94. diffID, err := descriptor.DiffID()
  95. if err == nil {
  96. getRootFS := rootFS
  97. getRootFS.Append(diffID)
  98. l, err := ldm.layerStore.Get(getRootFS.ChainID())
  99. if err == nil {
  100. // Layer already exists.
  101. logrus.Debugf("Layer already exists: %s", descriptor.ID())
  102. progress.Update(progressOutput, descriptor.ID(), "Already exists")
  103. if topLayer != nil {
  104. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  105. }
  106. topLayer = l
  107. missingLayer = false
  108. rootFS.Append(diffID)
  109. continue
  110. }
  111. }
  112. }
  113. // Does this layer have the same data as a previous layer in
  114. // the stack? If so, avoid downloading it more than once.
  115. var topDownloadUncasted Transfer
  116. if existingDownload, ok := downloadsByKey[key]; ok {
  117. xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
  118. defer topDownload.Transfer.Release(watcher)
  119. topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
  120. topDownload = topDownloadUncasted.(*downloadTransfer)
  121. continue
  122. }
  123. // Layer is not known to exist - download and register it.
  124. progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
  125. var xferFunc DoFunc
  126. if topDownload != nil {
  127. xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
  128. defer topDownload.Transfer.Release(watcher)
  129. } else {
  130. xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
  131. }
  132. topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
  133. topDownload = topDownloadUncasted.(*downloadTransfer)
  134. downloadsByKey[key] = topDownload
  135. }
  136. if topDownload == nil {
  137. return rootFS, func() {
  138. if topLayer != nil {
  139. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  140. }
  141. }, nil
  142. }
  143. // Won't be using the list built up so far - will generate it
  144. // from downloaded layers instead.
  145. rootFS.DiffIDs = []layer.DiffID{}
  146. defer func() {
  147. if topLayer != nil {
  148. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  149. }
  150. }()
  151. select {
  152. case <-ctx.Done():
  153. topDownload.Transfer.Release(watcher)
  154. return rootFS, func() {}, ctx.Err()
  155. case <-topDownload.Done():
  156. break
  157. }
  158. l, err := topDownload.result()
  159. if err != nil {
  160. topDownload.Transfer.Release(watcher)
  161. return rootFS, func() {}, err
  162. }
  163. // Must do this exactly len(layers) times, so we don't include the
  164. // base layer on Windows.
  165. for range layers {
  166. if l == nil {
  167. topDownload.Transfer.Release(watcher)
  168. return rootFS, func() {}, errors.New("internal error: too few parent layers")
  169. }
  170. rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
  171. l = l.Parent()
  172. }
  173. return rootFS, func() { topDownload.Transfer.Release(watcher) }, err
  174. }
  175. // makeDownloadFunc returns a function that performs the layer download and
  176. // registration. If parentDownload is non-nil, it waits for that download to
  177. // complete before the registration step, and registers the downloaded data
  178. // on top of parentDownload's resulting layer. Otherwise, it registers the
  179. // layer on top of the ChainID given by parentLayer.
  180. func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc {
  181. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  182. d := &downloadTransfer{
  183. Transfer: NewTransfer(),
  184. layerStore: ldm.layerStore,
  185. }
  186. go func() {
  187. defer func() {
  188. close(progressChan)
  189. }()
  190. progressOutput := progress.ChanOutput(progressChan)
  191. select {
  192. case <-start:
  193. default:
  194. progress.Update(progressOutput, descriptor.ID(), "Waiting")
  195. <-start
  196. }
  197. if parentDownload != nil {
  198. // Did the parent download already fail or get
  199. // cancelled?
  200. select {
  201. case <-parentDownload.Done():
  202. _, err := parentDownload.result()
  203. if err != nil {
  204. d.err = err
  205. return
  206. }
  207. default:
  208. }
  209. }
  210. var (
  211. downloadReader io.ReadCloser
  212. size int64
  213. err error
  214. retries int
  215. )
  216. defer descriptor.Close()
  217. for {
  218. downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput)
  219. if err == nil {
  220. break
  221. }
  222. // If an error was returned because the context
  223. // was cancelled, we shouldn't retry.
  224. select {
  225. case <-d.Transfer.Context().Done():
  226. d.err = err
  227. return
  228. default:
  229. }
  230. retries++
  231. if _, isDNR := err.(DoNotRetry); isDNR || retries == maxDownloadAttempts {
  232. logrus.Errorf("Download failed: %v", err)
  233. d.err = err
  234. return
  235. }
  236. logrus.Errorf("Download failed, retrying: %v", err)
  237. delay := retries * 5
  238. ticker := time.NewTicker(time.Second)
  239. selectLoop:
  240. for {
  241. progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
  242. select {
  243. case <-ticker.C:
  244. delay--
  245. if delay == 0 {
  246. ticker.Stop()
  247. break selectLoop
  248. }
  249. case <-d.Transfer.Context().Done():
  250. ticker.Stop()
  251. d.err = errors.New("download cancelled during retry delay")
  252. return
  253. }
  254. }
  255. }
  256. close(inactive)
  257. if parentDownload != nil {
  258. select {
  259. case <-d.Transfer.Context().Done():
  260. d.err = errors.New("layer registration cancelled")
  261. downloadReader.Close()
  262. return
  263. case <-parentDownload.Done():
  264. }
  265. l, err := parentDownload.result()
  266. if err != nil {
  267. d.err = err
  268. downloadReader.Close()
  269. return
  270. }
  271. parentLayer = l.ChainID()
  272. }
  273. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
  274. defer reader.Close()
  275. inflatedLayerData, err := archive.DecompressStream(reader)
  276. if err != nil {
  277. d.err = fmt.Errorf("could not get decompression stream: %v", err)
  278. return
  279. }
  280. var src distribution.Descriptor
  281. if fs, ok := descriptor.(distribution.Describable); ok {
  282. src = fs.Descriptor()
  283. }
  284. if ds, ok := d.layerStore.(layer.DescribableStore); ok {
  285. d.layer, err = ds.RegisterWithDescriptor(inflatedLayerData, parentLayer, src)
  286. } else {
  287. d.layer, err = d.layerStore.Register(inflatedLayerData, parentLayer)
  288. }
  289. if err != nil {
  290. select {
  291. case <-d.Transfer.Context().Done():
  292. d.err = errors.New("layer registration cancelled")
  293. default:
  294. d.err = fmt.Errorf("failed to register layer: %v", err)
  295. }
  296. return
  297. }
  298. progress.Update(progressOutput, descriptor.ID(), "Pull complete")
  299. withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
  300. if hasRegistered {
  301. withRegistered.Registered(d.layer.DiffID())
  302. }
  303. // Doesn't actually need to be its own goroutine, but
  304. // done like this so we can defer close(c).
  305. go func() {
  306. <-d.Transfer.Released()
  307. if d.layer != nil {
  308. layer.ReleaseAndLog(d.layerStore, d.layer)
  309. }
  310. }()
  311. }()
  312. return d
  313. }
  314. }
  315. // makeDownloadFuncFromDownload returns a function that performs the layer
  316. // registration when the layer data is coming from an existing download. It
  317. // waits for sourceDownload and parentDownload to complete, and then
  318. // reregisters the data from sourceDownload's top layer on top of
  319. // parentDownload. This function does not log progress output because it would
  320. // interfere with the progress reporting for sourceDownload, which has the same
  321. // Key.
  322. func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc {
  323. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  324. d := &downloadTransfer{
  325. Transfer: NewTransfer(),
  326. layerStore: ldm.layerStore,
  327. }
  328. go func() {
  329. defer func() {
  330. close(progressChan)
  331. }()
  332. <-start
  333. close(inactive)
  334. select {
  335. case <-d.Transfer.Context().Done():
  336. d.err = errors.New("layer registration cancelled")
  337. return
  338. case <-parentDownload.Done():
  339. }
  340. l, err := parentDownload.result()
  341. if err != nil {
  342. d.err = err
  343. return
  344. }
  345. parentLayer := l.ChainID()
  346. // sourceDownload should have already finished if
  347. // parentDownload finished, but wait for it explicitly
  348. // to be sure.
  349. select {
  350. case <-d.Transfer.Context().Done():
  351. d.err = errors.New("layer registration cancelled")
  352. return
  353. case <-sourceDownload.Done():
  354. }
  355. l, err = sourceDownload.result()
  356. if err != nil {
  357. d.err = err
  358. return
  359. }
  360. layerReader, err := l.TarStream()
  361. if err != nil {
  362. d.err = err
  363. return
  364. }
  365. defer layerReader.Close()
  366. var src distribution.Descriptor
  367. if fs, ok := l.(distribution.Describable); ok {
  368. src = fs.Descriptor()
  369. }
  370. if ds, ok := d.layerStore.(layer.DescribableStore); ok {
  371. d.layer, err = ds.RegisterWithDescriptor(layerReader, parentLayer, src)
  372. } else {
  373. d.layer, err = d.layerStore.Register(layerReader, parentLayer)
  374. }
  375. if err != nil {
  376. d.err = fmt.Errorf("failed to register layer: %v", err)
  377. return
  378. }
  379. withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
  380. if hasRegistered {
  381. withRegistered.Registered(d.layer.DiffID())
  382. }
  383. // Doesn't actually need to be its own goroutine, but
  384. // done like this so we can defer close(c).
  385. go func() {
  386. <-d.Transfer.Released()
  387. if d.layer != nil {
  388. layer.ReleaseAndLog(d.layerStore, d.layer)
  389. }
  390. }()
  391. }()
  392. return d
  393. }
  394. }