download_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package xfer
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "io/ioutil"
  7. "runtime"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/docker/distribution"
  12. "github.com/docker/distribution/digest"
  13. "github.com/docker/docker/image"
  14. "github.com/docker/docker/layer"
  15. "github.com/docker/docker/pkg/progress"
  16. "golang.org/x/net/context"
  17. )
  18. const maxDownloadConcurrency = 3
  19. type mockLayer struct {
  20. layerData bytes.Buffer
  21. diffID layer.DiffID
  22. chainID layer.ChainID
  23. parent layer.Layer
  24. }
  25. func (ml *mockLayer) TarStream() (io.ReadCloser, error) {
  26. return ioutil.NopCloser(bytes.NewBuffer(ml.layerData.Bytes())), nil
  27. }
  28. func (ml *mockLayer) ChainID() layer.ChainID {
  29. return ml.chainID
  30. }
  31. func (ml *mockLayer) DiffID() layer.DiffID {
  32. return ml.diffID
  33. }
  34. func (ml *mockLayer) Parent() layer.Layer {
  35. return ml.parent
  36. }
  37. func (ml *mockLayer) Size() (size int64, err error) {
  38. return 0, nil
  39. }
  40. func (ml *mockLayer) DiffSize() (size int64, err error) {
  41. return 0, nil
  42. }
  43. func (ml *mockLayer) Metadata() (map[string]string, error) {
  44. return make(map[string]string), nil
  45. }
  46. type mockLayerStore struct {
  47. layers map[layer.ChainID]*mockLayer
  48. }
  49. func createChainIDFromParent(parent layer.ChainID, dgsts ...layer.DiffID) layer.ChainID {
  50. if len(dgsts) == 0 {
  51. return parent
  52. }
  53. if parent == "" {
  54. return createChainIDFromParent(layer.ChainID(dgsts[0]), dgsts[1:]...)
  55. }
  56. // H = "H(n-1) SHA256(n)"
  57. dgst := digest.FromBytes([]byte(string(parent) + " " + string(dgsts[0])))
  58. return createChainIDFromParent(layer.ChainID(dgst), dgsts[1:]...)
  59. }
  60. func (ls *mockLayerStore) Register(reader io.Reader, parentID layer.ChainID) (layer.Layer, error) {
  61. return ls.RegisterWithDescriptor(reader, parentID, distribution.Descriptor{})
  62. }
  63. func (ls *mockLayerStore) RegisterWithDescriptor(reader io.Reader, parentID layer.ChainID, _ distribution.Descriptor) (layer.Layer, error) {
  64. var (
  65. parent layer.Layer
  66. err error
  67. )
  68. if parentID != "" {
  69. parent, err = ls.Get(parentID)
  70. if err != nil {
  71. return nil, err
  72. }
  73. }
  74. l := &mockLayer{parent: parent}
  75. _, err = l.layerData.ReadFrom(reader)
  76. if err != nil {
  77. return nil, err
  78. }
  79. l.diffID = layer.DiffID(digest.FromBytes(l.layerData.Bytes()))
  80. l.chainID = createChainIDFromParent(parentID, l.diffID)
  81. ls.layers[l.chainID] = l
  82. return l, nil
  83. }
  84. func (ls *mockLayerStore) Get(chainID layer.ChainID) (layer.Layer, error) {
  85. l, ok := ls.layers[chainID]
  86. if !ok {
  87. return nil, layer.ErrLayerDoesNotExist
  88. }
  89. return l, nil
  90. }
  91. func (ls *mockLayerStore) Release(l layer.Layer) ([]layer.Metadata, error) {
  92. return []layer.Metadata{}, nil
  93. }
  94. func (ls *mockLayerStore) CreateRWLayer(string, layer.ChainID, string, layer.MountInit, map[string]string) (layer.RWLayer, error) {
  95. return nil, errors.New("not implemented")
  96. }
  97. func (ls *mockLayerStore) GetRWLayer(string) (layer.RWLayer, error) {
  98. return nil, errors.New("not implemented")
  99. }
  100. func (ls *mockLayerStore) ReleaseRWLayer(layer.RWLayer) ([]layer.Metadata, error) {
  101. return nil, errors.New("not implemented")
  102. }
  103. func (ls *mockLayerStore) GetMountID(string) (string, error) {
  104. return "", errors.New("not implemented")
  105. }
  106. func (ls *mockLayerStore) Cleanup() error {
  107. return nil
  108. }
  109. func (ls *mockLayerStore) DriverStatus() [][2]string {
  110. return [][2]string{}
  111. }
  112. func (ls *mockLayerStore) DriverName() string {
  113. return "mock"
  114. }
  115. type mockDownloadDescriptor struct {
  116. currentDownloads *int32
  117. id string
  118. diffID layer.DiffID
  119. registeredDiffID layer.DiffID
  120. expectedDiffID layer.DiffID
  121. simulateRetries int
  122. }
  123. // Key returns the key used to deduplicate downloads.
  124. func (d *mockDownloadDescriptor) Key() string {
  125. return d.id
  126. }
  127. // ID returns the ID for display purposes.
  128. func (d *mockDownloadDescriptor) ID() string {
  129. return d.id
  130. }
  131. // DiffID should return the DiffID for this layer, or an error
  132. // if it is unknown (for example, if it has not been downloaded
  133. // before).
  134. func (d *mockDownloadDescriptor) DiffID() (layer.DiffID, error) {
  135. if d.diffID != "" {
  136. return d.diffID, nil
  137. }
  138. return "", errors.New("no diffID available")
  139. }
  140. func (d *mockDownloadDescriptor) Registered(diffID layer.DiffID) {
  141. d.registeredDiffID = diffID
  142. }
  143. func (d *mockDownloadDescriptor) mockTarStream() io.ReadCloser {
  144. // The mock implementation returns the ID repeated 5 times as a tar
  145. // stream instead of actual tar data. The data is ignored except for
  146. // computing IDs.
  147. return ioutil.NopCloser(bytes.NewBuffer([]byte(d.id + d.id + d.id + d.id + d.id)))
  148. }
  149. // Download is called to perform the download.
  150. func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
  151. if d.currentDownloads != nil {
  152. defer atomic.AddInt32(d.currentDownloads, -1)
  153. if atomic.AddInt32(d.currentDownloads, 1) > maxDownloadConcurrency {
  154. return nil, 0, errors.New("concurrency limit exceeded")
  155. }
  156. }
  157. // Sleep a bit to simulate a time-consuming download.
  158. for i := int64(0); i <= 10; i++ {
  159. select {
  160. case <-ctx.Done():
  161. return nil, 0, ctx.Err()
  162. case <-time.After(10 * time.Millisecond):
  163. progressOutput.WriteProgress(progress.Progress{ID: d.ID(), Action: "Downloading", Current: i, Total: 10})
  164. }
  165. }
  166. if d.simulateRetries != 0 {
  167. d.simulateRetries--
  168. return nil, 0, errors.New("simulating retry")
  169. }
  170. return d.mockTarStream(), 0, nil
  171. }
  172. func (d *mockDownloadDescriptor) Close() {
  173. }
  174. func downloadDescriptors(currentDownloads *int32) []DownloadDescriptor {
  175. return []DownloadDescriptor{
  176. &mockDownloadDescriptor{
  177. currentDownloads: currentDownloads,
  178. id: "id1",
  179. expectedDiffID: layer.DiffID("sha256:68e2c75dc5c78ea9240689c60d7599766c213ae210434c53af18470ae8c53ec1"),
  180. },
  181. &mockDownloadDescriptor{
  182. currentDownloads: currentDownloads,
  183. id: "id2",
  184. expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
  185. },
  186. &mockDownloadDescriptor{
  187. currentDownloads: currentDownloads,
  188. id: "id3",
  189. expectedDiffID: layer.DiffID("sha256:58745a8bbd669c25213e9de578c4da5c8ee1c836b3581432c2b50e38a6753300"),
  190. },
  191. &mockDownloadDescriptor{
  192. currentDownloads: currentDownloads,
  193. id: "id2",
  194. expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
  195. },
  196. &mockDownloadDescriptor{
  197. currentDownloads: currentDownloads,
  198. id: "id4",
  199. expectedDiffID: layer.DiffID("sha256:0dfb5b9577716cc173e95af7c10289322c29a6453a1718addc00c0c5b1330936"),
  200. simulateRetries: 1,
  201. },
  202. &mockDownloadDescriptor{
  203. currentDownloads: currentDownloads,
  204. id: "id5",
  205. expectedDiffID: layer.DiffID("sha256:0a5f25fa1acbc647f6112a6276735d0fa01e4ee2aa7ec33015e337350e1ea23d"),
  206. },
  207. }
  208. }
  209. func TestSuccessfulDownload(t *testing.T) {
  210. // TODO Windows: Fix this unit text
  211. if runtime.GOOS == "windows" {
  212. t.Skip("Needs fixing on Windows")
  213. }
  214. layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
  215. ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency)
  216. progressChan := make(chan progress.Progress)
  217. progressDone := make(chan struct{})
  218. receivedProgress := make(map[string]progress.Progress)
  219. go func() {
  220. for p := range progressChan {
  221. receivedProgress[p.ID] = p
  222. }
  223. close(progressDone)
  224. }()
  225. var currentDownloads int32
  226. descriptors := downloadDescriptors(&currentDownloads)
  227. firstDescriptor := descriptors[0].(*mockDownloadDescriptor)
  228. // Pre-register the first layer to simulate an already-existing layer
  229. l, err := layerStore.Register(firstDescriptor.mockTarStream(), "")
  230. if err != nil {
  231. t.Fatal(err)
  232. }
  233. firstDescriptor.diffID = l.DiffID()
  234. rootFS, releaseFunc, err := ldm.Download(context.Background(), *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
  235. if err != nil {
  236. t.Fatalf("download error: %v", err)
  237. }
  238. releaseFunc()
  239. close(progressChan)
  240. <-progressDone
  241. if len(rootFS.DiffIDs) != len(descriptors) {
  242. t.Fatal("got wrong number of diffIDs in rootfs")
  243. }
  244. for i, d := range descriptors {
  245. descriptor := d.(*mockDownloadDescriptor)
  246. if descriptor.diffID != "" {
  247. if receivedProgress[d.ID()].Action != "Already exists" {
  248. t.Fatalf("did not get 'Already exists' message for %v", d.ID())
  249. }
  250. } else if receivedProgress[d.ID()].Action != "Pull complete" {
  251. t.Fatalf("did not get 'Pull complete' message for %v", d.ID())
  252. }
  253. if rootFS.DiffIDs[i] != descriptor.expectedDiffID {
  254. t.Fatalf("rootFS item %d has the wrong diffID (expected: %v got: %v)", i, descriptor.expectedDiffID, rootFS.DiffIDs[i])
  255. }
  256. if descriptor.diffID == "" && descriptor.registeredDiffID != rootFS.DiffIDs[i] {
  257. t.Fatal("diffID mismatch between rootFS and Registered callback")
  258. }
  259. }
  260. }
  261. func TestCancelledDownload(t *testing.T) {
  262. ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency)
  263. progressChan := make(chan progress.Progress)
  264. progressDone := make(chan struct{})
  265. go func() {
  266. for range progressChan {
  267. }
  268. close(progressDone)
  269. }()
  270. ctx, cancel := context.WithCancel(context.Background())
  271. go func() {
  272. <-time.After(time.Millisecond)
  273. cancel()
  274. }()
  275. descriptors := downloadDescriptors(nil)
  276. _, _, err := ldm.Download(ctx, *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
  277. if err != context.Canceled {
  278. t.Fatal("expected download to be cancelled")
  279. }
  280. close(progressChan)
  281. <-progressDone
  282. }