download.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package xfer
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/distribution"
  9. "github.com/docker/docker/image"
  10. "github.com/docker/docker/layer"
  11. "github.com/docker/docker/pkg/archive"
  12. "github.com/docker/docker/pkg/ioutils"
  13. "github.com/docker/docker/pkg/progress"
  14. "golang.org/x/net/context"
  15. )
  16. const maxDownloadAttempts = 5
  17. // LayerDownloadManager figures out which layers need to be downloaded, then
  18. // registers and downloads those, taking into account dependencies between
  19. // layers.
  20. type LayerDownloadManager struct {
  21. layerStore layer.Store
  22. tm TransferManager
  23. waitDuration time.Duration
  24. }
  25. // SetConcurrency sets the max concurrent downloads for each pull
  26. func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
  27. ldm.tm.SetConcurrency(concurrency)
  28. }
  29. // NewLayerDownloadManager returns a new LayerDownloadManager.
  30. func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
  31. manager := LayerDownloadManager{
  32. layerStore: layerStore,
  33. tm: NewTransferManager(concurrencyLimit),
  34. waitDuration: time.Second,
  35. }
  36. for _, option := range options {
  37. option(&manager)
  38. }
  39. return &manager
  40. }
  41. type downloadTransfer struct {
  42. Transfer
  43. layerStore layer.Store
  44. layer layer.Layer
  45. err error
  46. }
  47. // result returns the layer resulting from the download, if the download
  48. // and registration were successful.
  49. func (d *downloadTransfer) result() (layer.Layer, error) {
  50. return d.layer, d.err
  51. }
  52. // A DownloadDescriptor references a layer that may need to be downloaded.
  53. type DownloadDescriptor interface {
  54. // Key returns the key used to deduplicate downloads.
  55. Key() string
  56. // ID returns the ID for display purposes.
  57. ID() string
  58. // DiffID should return the DiffID for this layer, or an error
  59. // if it is unknown (for example, if it has not been downloaded
  60. // before).
  61. DiffID() (layer.DiffID, error)
  62. // Download is called to perform the download.
  63. Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
  64. // Close is called when the download manager is finished with this
  65. // descriptor and will not call Download again or read from the reader
  66. // that Download returned.
  67. Close()
  68. }
  69. // DownloadDescriptorWithRegistered is a DownloadDescriptor that has an
  70. // additional Registered method which gets called after a downloaded layer is
  71. // registered. This allows the user of the download manager to know the DiffID
  72. // of each registered layer. This method is called if a cast to
  73. // DownloadDescriptorWithRegistered is successful.
  74. type DownloadDescriptorWithRegistered interface {
  75. DownloadDescriptor
  76. Registered(diffID layer.DiffID)
  77. }
  78. // Download is a blocking function which ensures the requested layers are
  79. // present in the layer store. It uses the string returned by the Key method to
  80. // deduplicate downloads. If a given layer is not already known to present in
  81. // the layer store, and the key is not used by an in-progress download, the
  82. // Download method is called to get the layer tar data. Layers are then
  83. // registered in the appropriate order. The caller must call the returned
  84. // release function once it is done with the returned RootFS object.
  85. func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
  86. var (
  87. topLayer layer.Layer
  88. topDownload *downloadTransfer
  89. watcher *Watcher
  90. missingLayer bool
  91. transferKey = ""
  92. downloadsByKey = make(map[string]*downloadTransfer)
  93. )
  94. rootFS := initialRootFS
  95. for _, descriptor := range layers {
  96. key := descriptor.Key()
  97. transferKey += key
  98. if !missingLayer {
  99. missingLayer = true
  100. diffID, err := descriptor.DiffID()
  101. if err == nil {
  102. getRootFS := rootFS
  103. getRootFS.Append(diffID)
  104. l, err := ldm.layerStore.Get(getRootFS.ChainID())
  105. if err == nil {
  106. // Layer already exists.
  107. logrus.Debugf("Layer already exists: %s", descriptor.ID())
  108. progress.Update(progressOutput, descriptor.ID(), "Already exists")
  109. if topLayer != nil {
  110. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  111. }
  112. topLayer = l
  113. missingLayer = false
  114. rootFS.Append(diffID)
  115. // Register this repository as a source of this layer.
  116. withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
  117. if hasRegistered {
  118. withRegistered.Registered(diffID)
  119. }
  120. continue
  121. }
  122. }
  123. }
  124. // Does this layer have the same data as a previous layer in
  125. // the stack? If so, avoid downloading it more than once.
  126. var topDownloadUncasted Transfer
  127. if existingDownload, ok := downloadsByKey[key]; ok {
  128. xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
  129. defer topDownload.Transfer.Release(watcher)
  130. topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
  131. topDownload = topDownloadUncasted.(*downloadTransfer)
  132. continue
  133. }
  134. // Layer is not known to exist - download and register it.
  135. progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
  136. var xferFunc DoFunc
  137. if topDownload != nil {
  138. xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
  139. defer topDownload.Transfer.Release(watcher)
  140. } else {
  141. xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
  142. }
  143. topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
  144. topDownload = topDownloadUncasted.(*downloadTransfer)
  145. downloadsByKey[key] = topDownload
  146. }
  147. if topDownload == nil {
  148. return rootFS, func() {
  149. if topLayer != nil {
  150. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  151. }
  152. }, nil
  153. }
  154. // Won't be using the list built up so far - will generate it
  155. // from downloaded layers instead.
  156. rootFS.DiffIDs = []layer.DiffID{}
  157. defer func() {
  158. if topLayer != nil {
  159. layer.ReleaseAndLog(ldm.layerStore, topLayer)
  160. }
  161. }()
  162. select {
  163. case <-ctx.Done():
  164. topDownload.Transfer.Release(watcher)
  165. return rootFS, func() {}, ctx.Err()
  166. case <-topDownload.Done():
  167. break
  168. }
  169. l, err := topDownload.result()
  170. if err != nil {
  171. topDownload.Transfer.Release(watcher)
  172. return rootFS, func() {}, err
  173. }
  174. // Must do this exactly len(layers) times, so we don't include the
  175. // base layer on Windows.
  176. for range layers {
  177. if l == nil {
  178. topDownload.Transfer.Release(watcher)
  179. return rootFS, func() {}, errors.New("internal error: too few parent layers")
  180. }
  181. rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
  182. l = l.Parent()
  183. }
  184. return rootFS, func() { topDownload.Transfer.Release(watcher) }, err
  185. }
  186. // makeDownloadFunc returns a function that performs the layer download and
  187. // registration. If parentDownload is non-nil, it waits for that download to
  188. // complete before the registration step, and registers the downloaded data
  189. // on top of parentDownload's resulting layer. Otherwise, it registers the
  190. // layer on top of the ChainID given by parentLayer.
  191. func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc {
  192. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  193. d := &downloadTransfer{
  194. Transfer: NewTransfer(),
  195. layerStore: ldm.layerStore,
  196. }
  197. go func() {
  198. defer func() {
  199. close(progressChan)
  200. }()
  201. progressOutput := progress.ChanOutput(progressChan)
  202. select {
  203. case <-start:
  204. default:
  205. progress.Update(progressOutput, descriptor.ID(), "Waiting")
  206. <-start
  207. }
  208. if parentDownload != nil {
  209. // Did the parent download already fail or get
  210. // cancelled?
  211. select {
  212. case <-parentDownload.Done():
  213. _, err := parentDownload.result()
  214. if err != nil {
  215. d.err = err
  216. return
  217. }
  218. default:
  219. }
  220. }
  221. var (
  222. downloadReader io.ReadCloser
  223. size int64
  224. err error
  225. retries int
  226. )
  227. defer descriptor.Close()
  228. for {
  229. downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput)
  230. if err == nil {
  231. break
  232. }
  233. // If an error was returned because the context
  234. // was cancelled, we shouldn't retry.
  235. select {
  236. case <-d.Transfer.Context().Done():
  237. d.err = err
  238. return
  239. default:
  240. }
  241. retries++
  242. if _, isDNR := err.(DoNotRetry); isDNR || retries == maxDownloadAttempts {
  243. logrus.Errorf("Download failed: %v", err)
  244. d.err = err
  245. return
  246. }
  247. logrus.Errorf("Download failed, retrying: %v", err)
  248. delay := retries * 5
  249. ticker := time.NewTicker(ldm.waitDuration)
  250. selectLoop:
  251. for {
  252. progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
  253. select {
  254. case <-ticker.C:
  255. delay--
  256. if delay == 0 {
  257. ticker.Stop()
  258. break selectLoop
  259. }
  260. case <-d.Transfer.Context().Done():
  261. ticker.Stop()
  262. d.err = errors.New("download cancelled during retry delay")
  263. return
  264. }
  265. }
  266. }
  267. close(inactive)
  268. if parentDownload != nil {
  269. select {
  270. case <-d.Transfer.Context().Done():
  271. d.err = errors.New("layer registration cancelled")
  272. downloadReader.Close()
  273. return
  274. case <-parentDownload.Done():
  275. }
  276. l, err := parentDownload.result()
  277. if err != nil {
  278. d.err = err
  279. downloadReader.Close()
  280. return
  281. }
  282. parentLayer = l.ChainID()
  283. }
  284. reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
  285. defer reader.Close()
  286. inflatedLayerData, err := archive.DecompressStream(reader)
  287. if err != nil {
  288. d.err = fmt.Errorf("could not get decompression stream: %v", err)
  289. return
  290. }
  291. var src distribution.Descriptor
  292. if fs, ok := descriptor.(distribution.Describable); ok {
  293. src = fs.Descriptor()
  294. }
  295. if ds, ok := d.layerStore.(layer.DescribableStore); ok {
  296. d.layer, err = ds.RegisterWithDescriptor(inflatedLayerData, parentLayer, src)
  297. } else {
  298. d.layer, err = d.layerStore.Register(inflatedLayerData, parentLayer)
  299. }
  300. if err != nil {
  301. select {
  302. case <-d.Transfer.Context().Done():
  303. d.err = errors.New("layer registration cancelled")
  304. default:
  305. d.err = fmt.Errorf("failed to register layer: %v", err)
  306. }
  307. return
  308. }
  309. progress.Update(progressOutput, descriptor.ID(), "Pull complete")
  310. withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
  311. if hasRegistered {
  312. withRegistered.Registered(d.layer.DiffID())
  313. }
  314. // Doesn't actually need to be its own goroutine, but
  315. // done like this so we can defer close(c).
  316. go func() {
  317. <-d.Transfer.Released()
  318. if d.layer != nil {
  319. layer.ReleaseAndLog(d.layerStore, d.layer)
  320. }
  321. }()
  322. }()
  323. return d
  324. }
  325. }
  326. // makeDownloadFuncFromDownload returns a function that performs the layer
  327. // registration when the layer data is coming from an existing download. It
  328. // waits for sourceDownload and parentDownload to complete, and then
  329. // reregisters the data from sourceDownload's top layer on top of
  330. // parentDownload. This function does not log progress output because it would
  331. // interfere with the progress reporting for sourceDownload, which has the same
  332. // Key.
  333. func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc {
  334. return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
  335. d := &downloadTransfer{
  336. Transfer: NewTransfer(),
  337. layerStore: ldm.layerStore,
  338. }
  339. go func() {
  340. defer func() {
  341. close(progressChan)
  342. }()
  343. <-start
  344. close(inactive)
  345. select {
  346. case <-d.Transfer.Context().Done():
  347. d.err = errors.New("layer registration cancelled")
  348. return
  349. case <-parentDownload.Done():
  350. }
  351. l, err := parentDownload.result()
  352. if err != nil {
  353. d.err = err
  354. return
  355. }
  356. parentLayer := l.ChainID()
  357. // sourceDownload should have already finished if
  358. // parentDownload finished, but wait for it explicitly
  359. // to be sure.
  360. select {
  361. case <-d.Transfer.Context().Done():
  362. d.err = errors.New("layer registration cancelled")
  363. return
  364. case <-sourceDownload.Done():
  365. }
  366. l, err = sourceDownload.result()
  367. if err != nil {
  368. d.err = err
  369. return
  370. }
  371. layerReader, err := l.TarStream()
  372. if err != nil {
  373. d.err = err
  374. return
  375. }
  376. defer layerReader.Close()
  377. var src distribution.Descriptor
  378. if fs, ok := l.(distribution.Describable); ok {
  379. src = fs.Descriptor()
  380. }
  381. if ds, ok := d.layerStore.(layer.DescribableStore); ok {
  382. d.layer, err = ds.RegisterWithDescriptor(layerReader, parentLayer, src)
  383. } else {
  384. d.layer, err = d.layerStore.Register(layerReader, parentLayer)
  385. }
  386. if err != nil {
  387. d.err = fmt.Errorf("failed to register layer: %v", err)
  388. return
  389. }
  390. withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
  391. if hasRegistered {
  392. withRegistered.Registered(d.layer.DiffID())
  393. }
  394. // Doesn't actually need to be its own goroutine, but
  395. // done like this so we can defer close(c).
  396. go func() {
  397. <-d.Transfer.Released()
  398. if d.layer != nil {
  399. layer.ReleaseAndLog(d.layerStore, d.layer)
  400. }
  401. }()
  402. }()
  403. return d
  404. }
  405. }