download_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package xfer
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "runtime"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/distribution"
  13. "github.com/docker/docker/image"
  14. "github.com/docker/docker/layer"
  15. "github.com/docker/docker/pkg/progress"
  16. "github.com/opencontainers/go-digest"
  17. "golang.org/x/net/context"
  18. )
  19. const maxDownloadConcurrency = 3
  20. type mockLayer struct {
  21. layerData bytes.Buffer
  22. diffID layer.DiffID
  23. chainID layer.ChainID
  24. parent layer.Layer
  25. }
  26. func (ml *mockLayer) TarStream() (io.ReadCloser, error) {
  27. return ioutil.NopCloser(bytes.NewBuffer(ml.layerData.Bytes())), nil
  28. }
  29. func (ml *mockLayer) TarStreamFrom(layer.ChainID) (io.ReadCloser, error) {
  30. return nil, fmt.Errorf("not implemented")
  31. }
  32. func (ml *mockLayer) ChainID() layer.ChainID {
  33. return ml.chainID
  34. }
  35. func (ml *mockLayer) DiffID() layer.DiffID {
  36. return ml.diffID
  37. }
  38. func (ml *mockLayer) Parent() layer.Layer {
  39. return ml.parent
  40. }
  41. func (ml *mockLayer) Size() (size int64, err error) {
  42. return 0, nil
  43. }
  44. func (ml *mockLayer) DiffSize() (size int64, err error) {
  45. return 0, nil
  46. }
  47. func (ml *mockLayer) Metadata() (map[string]string, error) {
  48. return make(map[string]string), nil
  49. }
  50. type mockLayerStore struct {
  51. layers map[layer.ChainID]*mockLayer
  52. }
  53. func createChainIDFromParent(parent layer.ChainID, dgsts ...layer.DiffID) layer.ChainID {
  54. if len(dgsts) == 0 {
  55. return parent
  56. }
  57. if parent == "" {
  58. return createChainIDFromParent(layer.ChainID(dgsts[0]), dgsts[1:]...)
  59. }
  60. // H = "H(n-1) SHA256(n)"
  61. dgst := digest.FromBytes([]byte(string(parent) + " " + string(dgsts[0])))
  62. return createChainIDFromParent(layer.ChainID(dgst), dgsts[1:]...)
  63. }
  64. func (ls *mockLayerStore) Map() map[layer.ChainID]layer.Layer {
  65. layers := map[layer.ChainID]layer.Layer{}
  66. for k, v := range ls.layers {
  67. layers[k] = v
  68. }
  69. return layers
  70. }
  71. func (ls *mockLayerStore) Register(reader io.Reader, parentID layer.ChainID) (layer.Layer, error) {
  72. return ls.RegisterWithDescriptor(reader, parentID, distribution.Descriptor{})
  73. }
  74. func (ls *mockLayerStore) RegisterWithDescriptor(reader io.Reader, parentID layer.ChainID, _ distribution.Descriptor) (layer.Layer, error) {
  75. var (
  76. parent layer.Layer
  77. err error
  78. )
  79. if parentID != "" {
  80. parent, err = ls.Get(parentID)
  81. if err != nil {
  82. return nil, err
  83. }
  84. }
  85. l := &mockLayer{parent: parent}
  86. _, err = l.layerData.ReadFrom(reader)
  87. if err != nil {
  88. return nil, err
  89. }
  90. l.diffID = layer.DiffID(digest.FromBytes(l.layerData.Bytes()))
  91. l.chainID = createChainIDFromParent(parentID, l.diffID)
  92. ls.layers[l.chainID] = l
  93. return l, nil
  94. }
  95. func (ls *mockLayerStore) Get(chainID layer.ChainID) (layer.Layer, error) {
  96. l, ok := ls.layers[chainID]
  97. if !ok {
  98. return nil, layer.ErrLayerDoesNotExist
  99. }
  100. return l, nil
  101. }
  102. func (ls *mockLayerStore) Release(l layer.Layer) ([]layer.Metadata, error) {
  103. return []layer.Metadata{}, nil
  104. }
  105. func (ls *mockLayerStore) CreateRWLayer(string, layer.ChainID, *layer.CreateRWLayerOpts) (layer.RWLayer, error) {
  106. return nil, errors.New("not implemented")
  107. }
  108. func (ls *mockLayerStore) GetRWLayer(string) (layer.RWLayer, error) {
  109. return nil, errors.New("not implemented")
  110. }
  111. func (ls *mockLayerStore) ReleaseRWLayer(layer.RWLayer) ([]layer.Metadata, error) {
  112. return nil, errors.New("not implemented")
  113. }
  114. func (ls *mockLayerStore) GetMountID(string) (string, error) {
  115. return "", errors.New("not implemented")
  116. }
  117. func (ls *mockLayerStore) Cleanup() error {
  118. return nil
  119. }
  120. func (ls *mockLayerStore) DriverStatus() [][2]string {
  121. return [][2]string{}
  122. }
  123. func (ls *mockLayerStore) DriverName() string {
  124. return "mock"
  125. }
  126. type mockDownloadDescriptor struct {
  127. currentDownloads *int32
  128. id string
  129. diffID layer.DiffID
  130. registeredDiffID layer.DiffID
  131. expectedDiffID layer.DiffID
  132. simulateRetries int
  133. }
  134. // Key returns the key used to deduplicate downloads.
  135. func (d *mockDownloadDescriptor) Key() string {
  136. return d.id
  137. }
  138. // ID returns the ID for display purposes.
  139. func (d *mockDownloadDescriptor) ID() string {
  140. return d.id
  141. }
  142. // DiffID should return the DiffID for this layer, or an error
  143. // if it is unknown (for example, if it has not been downloaded
  144. // before).
  145. func (d *mockDownloadDescriptor) DiffID() (layer.DiffID, error) {
  146. if d.diffID != "" {
  147. return d.diffID, nil
  148. }
  149. return "", errors.New("no diffID available")
  150. }
  151. func (d *mockDownloadDescriptor) Registered(diffID layer.DiffID) {
  152. d.registeredDiffID = diffID
  153. }
  154. func (d *mockDownloadDescriptor) mockTarStream() io.ReadCloser {
  155. // The mock implementation returns the ID repeated 5 times as a tar
  156. // stream instead of actual tar data. The data is ignored except for
  157. // computing IDs.
  158. return ioutil.NopCloser(bytes.NewBuffer([]byte(d.id + d.id + d.id + d.id + d.id)))
  159. }
  160. // Download is called to perform the download.
  161. func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
  162. if d.currentDownloads != nil {
  163. defer atomic.AddInt32(d.currentDownloads, -1)
  164. if atomic.AddInt32(d.currentDownloads, 1) > maxDownloadConcurrency {
  165. return nil, 0, errors.New("concurrency limit exceeded")
  166. }
  167. }
  168. // Sleep a bit to simulate a time-consuming download.
  169. for i := int64(0); i <= 10; i++ {
  170. select {
  171. case <-ctx.Done():
  172. return nil, 0, ctx.Err()
  173. case <-time.After(10 * time.Millisecond):
  174. progressOutput.WriteProgress(progress.Progress{ID: d.ID(), Action: "Downloading", Current: i, Total: 10})
  175. }
  176. }
  177. if d.simulateRetries != 0 {
  178. d.simulateRetries--
  179. return nil, 0, errors.New("simulating retry")
  180. }
  181. return d.mockTarStream(), 0, nil
  182. }
  183. func (d *mockDownloadDescriptor) Close() {
  184. }
  185. func downloadDescriptors(currentDownloads *int32) []DownloadDescriptor {
  186. return []DownloadDescriptor{
  187. &mockDownloadDescriptor{
  188. currentDownloads: currentDownloads,
  189. id: "id1",
  190. expectedDiffID: layer.DiffID("sha256:68e2c75dc5c78ea9240689c60d7599766c213ae210434c53af18470ae8c53ec1"),
  191. },
  192. &mockDownloadDescriptor{
  193. currentDownloads: currentDownloads,
  194. id: "id2",
  195. expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
  196. },
  197. &mockDownloadDescriptor{
  198. currentDownloads: currentDownloads,
  199. id: "id3",
  200. expectedDiffID: layer.DiffID("sha256:58745a8bbd669c25213e9de578c4da5c8ee1c836b3581432c2b50e38a6753300"),
  201. },
  202. &mockDownloadDescriptor{
  203. currentDownloads: currentDownloads,
  204. id: "id2",
  205. expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
  206. },
  207. &mockDownloadDescriptor{
  208. currentDownloads: currentDownloads,
  209. id: "id4",
  210. expectedDiffID: layer.DiffID("sha256:0dfb5b9577716cc173e95af7c10289322c29a6453a1718addc00c0c5b1330936"),
  211. simulateRetries: 1,
  212. },
  213. &mockDownloadDescriptor{
  214. currentDownloads: currentDownloads,
  215. id: "id5",
  216. expectedDiffID: layer.DiffID("sha256:0a5f25fa1acbc647f6112a6276735d0fa01e4ee2aa7ec33015e337350e1ea23d"),
  217. },
  218. }
  219. }
  220. func TestSuccessfulDownload(t *testing.T) {
  221. // TODO Windows: Fix this unit text
  222. if runtime.GOOS == "windows" {
  223. t.Skip("Needs fixing on Windows")
  224. }
  225. layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
  226. ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
  227. progressChan := make(chan progress.Progress)
  228. progressDone := make(chan struct{})
  229. receivedProgress := make(map[string]progress.Progress)
  230. go func() {
  231. for p := range progressChan {
  232. receivedProgress[p.ID] = p
  233. }
  234. close(progressDone)
  235. }()
  236. var currentDownloads int32
  237. descriptors := downloadDescriptors(&currentDownloads)
  238. firstDescriptor := descriptors[0].(*mockDownloadDescriptor)
  239. // Pre-register the first layer to simulate an already-existing layer
  240. l, err := layerStore.Register(firstDescriptor.mockTarStream(), "")
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. firstDescriptor.diffID = l.DiffID()
  245. rootFS, releaseFunc, err := ldm.Download(context.Background(), *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
  246. if err != nil {
  247. t.Fatalf("download error: %v", err)
  248. }
  249. releaseFunc()
  250. close(progressChan)
  251. <-progressDone
  252. if len(rootFS.DiffIDs) != len(descriptors) {
  253. t.Fatal("got wrong number of diffIDs in rootfs")
  254. }
  255. for i, d := range descriptors {
  256. descriptor := d.(*mockDownloadDescriptor)
  257. if descriptor.diffID != "" {
  258. if receivedProgress[d.ID()].Action != "Already exists" {
  259. t.Fatalf("did not get 'Already exists' message for %v", d.ID())
  260. }
  261. } else if receivedProgress[d.ID()].Action != "Pull complete" {
  262. t.Fatalf("did not get 'Pull complete' message for %v", d.ID())
  263. }
  264. if rootFS.DiffIDs[i] != descriptor.expectedDiffID {
  265. t.Fatalf("rootFS item %d has the wrong diffID (expected: %v got: %v)", i, descriptor.expectedDiffID, rootFS.DiffIDs[i])
  266. }
  267. if descriptor.diffID == "" && descriptor.registeredDiffID != rootFS.DiffIDs[i] {
  268. t.Fatal("diffID mismatch between rootFS and Registered callback")
  269. }
  270. }
  271. }
  272. func TestCancelledDownload(t *testing.T) {
  273. ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
  274. progressChan := make(chan progress.Progress)
  275. progressDone := make(chan struct{})
  276. go func() {
  277. for range progressChan {
  278. }
  279. close(progressDone)
  280. }()
  281. ctx, cancel := context.WithCancel(context.Background())
  282. go func() {
  283. <-time.After(time.Millisecond)
  284. cancel()
  285. }()
  286. descriptors := downloadDescriptors(nil)
  287. _, _, err := ldm.Download(ctx, *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
  288. if err != context.Canceled {
  289. t.Fatal("expected download to be cancelled")
  290. }
  291. close(progressChan)
  292. <-progressDone
  293. }