123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- package xfer
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "runtime"
- "sync/atomic"
- "testing"
- "time"
- "github.com/docker/distribution"
- "github.com/docker/docker/image"
- "github.com/docker/docker/layer"
- "github.com/docker/docker/pkg/progress"
- "github.com/opencontainers/go-digest"
- "golang.org/x/net/context"
- )
- const maxDownloadConcurrency = 3
- type mockLayer struct {
- layerData bytes.Buffer
- diffID layer.DiffID
- chainID layer.ChainID
- parent layer.Layer
- }
- func (ml *mockLayer) TarStream() (io.ReadCloser, error) {
- return ioutil.NopCloser(bytes.NewBuffer(ml.layerData.Bytes())), nil
- }
- func (ml *mockLayer) TarStreamFrom(layer.ChainID) (io.ReadCloser, error) {
- return nil, fmt.Errorf("not implemented")
- }
- func (ml *mockLayer) ChainID() layer.ChainID {
- return ml.chainID
- }
- func (ml *mockLayer) DiffID() layer.DiffID {
- return ml.diffID
- }
- func (ml *mockLayer) Parent() layer.Layer {
- return ml.parent
- }
- func (ml *mockLayer) Size() (size int64, err error) {
- return 0, nil
- }
- func (ml *mockLayer) DiffSize() (size int64, err error) {
- return 0, nil
- }
- func (ml *mockLayer) Metadata() (map[string]string, error) {
- return make(map[string]string), nil
- }
- type mockLayerStore struct {
- layers map[layer.ChainID]*mockLayer
- }
- func createChainIDFromParent(parent layer.ChainID, dgsts ...layer.DiffID) layer.ChainID {
- if len(dgsts) == 0 {
- return parent
- }
- if parent == "" {
- return createChainIDFromParent(layer.ChainID(dgsts[0]), dgsts[1:]...)
- }
- // H = "H(n-1) SHA256(n)"
- dgst := digest.FromBytes([]byte(string(parent) + " " + string(dgsts[0])))
- return createChainIDFromParent(layer.ChainID(dgst), dgsts[1:]...)
- }
- func (ls *mockLayerStore) Map() map[layer.ChainID]layer.Layer {
- layers := map[layer.ChainID]layer.Layer{}
- for k, v := range ls.layers {
- layers[k] = v
- }
- return layers
- }
- func (ls *mockLayerStore) Register(reader io.Reader, parentID layer.ChainID) (layer.Layer, error) {
- return ls.RegisterWithDescriptor(reader, parentID, distribution.Descriptor{})
- }
- func (ls *mockLayerStore) RegisterWithDescriptor(reader io.Reader, parentID layer.ChainID, _ distribution.Descriptor) (layer.Layer, error) {
- var (
- parent layer.Layer
- err error
- )
- if parentID != "" {
- parent, err = ls.Get(parentID)
- if err != nil {
- return nil, err
- }
- }
- l := &mockLayer{parent: parent}
- _, err = l.layerData.ReadFrom(reader)
- if err != nil {
- return nil, err
- }
- l.diffID = layer.DiffID(digest.FromBytes(l.layerData.Bytes()))
- l.chainID = createChainIDFromParent(parentID, l.diffID)
- ls.layers[l.chainID] = l
- return l, nil
- }
- func (ls *mockLayerStore) Get(chainID layer.ChainID) (layer.Layer, error) {
- l, ok := ls.layers[chainID]
- if !ok {
- return nil, layer.ErrLayerDoesNotExist
- }
- return l, nil
- }
- func (ls *mockLayerStore) Release(l layer.Layer) ([]layer.Metadata, error) {
- return []layer.Metadata{}, nil
- }
- func (ls *mockLayerStore) CreateRWLayer(string, layer.ChainID, *layer.CreateRWLayerOpts) (layer.RWLayer, error) {
- return nil, errors.New("not implemented")
- }
- func (ls *mockLayerStore) GetRWLayer(string) (layer.RWLayer, error) {
- return nil, errors.New("not implemented")
- }
- func (ls *mockLayerStore) ReleaseRWLayer(layer.RWLayer) ([]layer.Metadata, error) {
- return nil, errors.New("not implemented")
- }
- func (ls *mockLayerStore) GetMountID(string) (string, error) {
- return "", errors.New("not implemented")
- }
- func (ls *mockLayerStore) Cleanup() error {
- return nil
- }
- func (ls *mockLayerStore) DriverStatus() [][2]string {
- return [][2]string{}
- }
- func (ls *mockLayerStore) DriverName() string {
- return "mock"
- }
- type mockDownloadDescriptor struct {
- currentDownloads *int32
- id string
- diffID layer.DiffID
- registeredDiffID layer.DiffID
- expectedDiffID layer.DiffID
- simulateRetries int
- }
- // Key returns the key used to deduplicate downloads.
- func (d *mockDownloadDescriptor) Key() string {
- return d.id
- }
- // ID returns the ID for display purposes.
- func (d *mockDownloadDescriptor) ID() string {
- return d.id
- }
- // DiffID should return the DiffID for this layer, or an error
- // if it is unknown (for example, if it has not been downloaded
- // before).
- func (d *mockDownloadDescriptor) DiffID() (layer.DiffID, error) {
- if d.diffID != "" {
- return d.diffID, nil
- }
- return "", errors.New("no diffID available")
- }
- func (d *mockDownloadDescriptor) Registered(diffID layer.DiffID) {
- d.registeredDiffID = diffID
- }
- func (d *mockDownloadDescriptor) mockTarStream() io.ReadCloser {
- // The mock implementation returns the ID repeated 5 times as a tar
- // stream instead of actual tar data. The data is ignored except for
- // computing IDs.
- return ioutil.NopCloser(bytes.NewBuffer([]byte(d.id + d.id + d.id + d.id + d.id)))
- }
- // Download is called to perform the download.
- func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
- if d.currentDownloads != nil {
- defer atomic.AddInt32(d.currentDownloads, -1)
- if atomic.AddInt32(d.currentDownloads, 1) > maxDownloadConcurrency {
- return nil, 0, errors.New("concurrency limit exceeded")
- }
- }
- // Sleep a bit to simulate a time-consuming download.
- for i := int64(0); i <= 10; i++ {
- select {
- case <-ctx.Done():
- return nil, 0, ctx.Err()
- case <-time.After(10 * time.Millisecond):
- progressOutput.WriteProgress(progress.Progress{ID: d.ID(), Action: "Downloading", Current: i, Total: 10})
- }
- }
- if d.simulateRetries != 0 {
- d.simulateRetries--
- return nil, 0, errors.New("simulating retry")
- }
- return d.mockTarStream(), 0, nil
- }
- func (d *mockDownloadDescriptor) Close() {
- }
- func downloadDescriptors(currentDownloads *int32) []DownloadDescriptor {
- return []DownloadDescriptor{
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id1",
- expectedDiffID: layer.DiffID("sha256:68e2c75dc5c78ea9240689c60d7599766c213ae210434c53af18470ae8c53ec1"),
- },
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id2",
- expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
- },
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id3",
- expectedDiffID: layer.DiffID("sha256:58745a8bbd669c25213e9de578c4da5c8ee1c836b3581432c2b50e38a6753300"),
- },
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id2",
- expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
- },
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id4",
- expectedDiffID: layer.DiffID("sha256:0dfb5b9577716cc173e95af7c10289322c29a6453a1718addc00c0c5b1330936"),
- simulateRetries: 1,
- },
- &mockDownloadDescriptor{
- currentDownloads: currentDownloads,
- id: "id5",
- expectedDiffID: layer.DiffID("sha256:0a5f25fa1acbc647f6112a6276735d0fa01e4ee2aa7ec33015e337350e1ea23d"),
- },
- }
- }
- func TestSuccessfulDownload(t *testing.T) {
- // TODO Windows: Fix this unit text
- if runtime.GOOS == "windows" {
- t.Skip("Needs fixing on Windows")
- }
- layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
- ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
- progressChan := make(chan progress.Progress)
- progressDone := make(chan struct{})
- receivedProgress := make(map[string]progress.Progress)
- go func() {
- for p := range progressChan {
- receivedProgress[p.ID] = p
- }
- close(progressDone)
- }()
- var currentDownloads int32
- descriptors := downloadDescriptors(¤tDownloads)
- firstDescriptor := descriptors[0].(*mockDownloadDescriptor)
- // Pre-register the first layer to simulate an already-existing layer
- l, err := layerStore.Register(firstDescriptor.mockTarStream(), "")
- if err != nil {
- t.Fatal(err)
- }
- firstDescriptor.diffID = l.DiffID()
- rootFS, releaseFunc, err := ldm.Download(context.Background(), *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
- if err != nil {
- t.Fatalf("download error: %v", err)
- }
- releaseFunc()
- close(progressChan)
- <-progressDone
- if len(rootFS.DiffIDs) != len(descriptors) {
- t.Fatal("got wrong number of diffIDs in rootfs")
- }
- for i, d := range descriptors {
- descriptor := d.(*mockDownloadDescriptor)
- if descriptor.diffID != "" {
- if receivedProgress[d.ID()].Action != "Already exists" {
- t.Fatalf("did not get 'Already exists' message for %v", d.ID())
- }
- } else if receivedProgress[d.ID()].Action != "Pull complete" {
- t.Fatalf("did not get 'Pull complete' message for %v", d.ID())
- }
- if rootFS.DiffIDs[i] != descriptor.expectedDiffID {
- t.Fatalf("rootFS item %d has the wrong diffID (expected: %v got: %v)", i, descriptor.expectedDiffID, rootFS.DiffIDs[i])
- }
- if descriptor.diffID == "" && descriptor.registeredDiffID != rootFS.DiffIDs[i] {
- t.Fatal("diffID mismatch between rootFS and Registered callback")
- }
- }
- }
- func TestCancelledDownload(t *testing.T) {
- ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
- progressChan := make(chan progress.Progress)
- progressDone := make(chan struct{})
- go func() {
- for range progressChan {
- }
- close(progressDone)
- }()
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- <-time.After(time.Millisecond)
- cancel()
- }()
- descriptors := downloadDescriptors(nil)
- _, _, err := ldm.Download(ctx, *image.NewRootFS(), descriptors, progress.ChanOutput(progressChan))
- if err != context.Canceled {
- t.Fatal("expected download to be cancelled")
- }
- close(progressChan)
- <-progressDone
- }
|