123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- package xfer
- import (
- "sync/atomic"
- "testing"
- "time"
- "github.com/docker/docker/pkg/progress"
- )
- func TestTransfer(t *testing.T) {
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- select {
- case <-start:
- default:
- t.Fatalf("transfer function not started even though concurrency limit not reached")
- }
- xfer := NewTransfer()
- go func() {
- for i := 0; i <= 10; i++ {
- progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
- time.Sleep(10 * time.Millisecond)
- }
- close(progressChan)
- }()
- return xfer
- }
- }
- tm := NewTransferManager(5)
- progressChan := make(chan progress.Progress)
- progressDone := make(chan struct{})
- receivedProgress := make(map[string]int64)
- go func() {
- for p := range progressChan {
- val, present := receivedProgress[p.ID]
- if present && p.Current <= val {
- t.Fatalf("got unexpected progress value: %d (expected %d)", p.Current, val+1)
- }
- receivedProgress[p.ID] = p.Current
- }
- close(progressDone)
- }()
- // Start a few transfers
- ids := []string{"id1", "id2", "id3"}
- xfers := make([]Transfer, len(ids))
- watchers := make([]*Watcher, len(ids))
- for i, id := range ids {
- xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
- }
- for i, xfer := range xfers {
- <-xfer.Done()
- xfer.Release(watchers[i])
- }
- close(progressChan)
- <-progressDone
- for _, id := range ids {
- if receivedProgress[id] != 10 {
- t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
- }
- }
- }
- func TestConcurrencyLimit(t *testing.T) {
- concurrencyLimit := 3
- var runningJobs int32
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- xfer := NewTransfer()
- go func() {
- <-start
- totalJobs := atomic.AddInt32(&runningJobs, 1)
- if int(totalJobs) > concurrencyLimit {
- t.Fatalf("too many jobs running")
- }
- for i := 0; i <= 10; i++ {
- progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
- time.Sleep(10 * time.Millisecond)
- }
- atomic.AddInt32(&runningJobs, -1)
- close(progressChan)
- }()
- return xfer
- }
- }
- tm := NewTransferManager(concurrencyLimit)
- progressChan := make(chan progress.Progress)
- progressDone := make(chan struct{})
- receivedProgress := make(map[string]int64)
- go func() {
- for p := range progressChan {
- receivedProgress[p.ID] = p.Current
- }
- close(progressDone)
- }()
- // Start more transfers than the concurrency limit
- ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
- xfers := make([]Transfer, len(ids))
- watchers := make([]*Watcher, len(ids))
- for i, id := range ids {
- xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
- }
- for i, xfer := range xfers {
- <-xfer.Done()
- xfer.Release(watchers[i])
- }
- close(progressChan)
- <-progressDone
- for _, id := range ids {
- if receivedProgress[id] != 10 {
- t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
- }
- }
- }
- func TestInactiveJobs(t *testing.T) {
- concurrencyLimit := 3
- var runningJobs int32
- testDone := make(chan struct{})
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- xfer := NewTransfer()
- go func() {
- <-start
- totalJobs := atomic.AddInt32(&runningJobs, 1)
- if int(totalJobs) > concurrencyLimit {
- t.Fatalf("too many jobs running")
- }
- for i := 0; i <= 10; i++ {
- progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
- time.Sleep(10 * time.Millisecond)
- }
- atomic.AddInt32(&runningJobs, -1)
- close(inactive)
- <-testDone
- close(progressChan)
- }()
- return xfer
- }
- }
- tm := NewTransferManager(concurrencyLimit)
- progressChan := make(chan progress.Progress)
- progressDone := make(chan struct{})
- receivedProgress := make(map[string]int64)
- go func() {
- for p := range progressChan {
- receivedProgress[p.ID] = p.Current
- }
- close(progressDone)
- }()
- // Start more transfers than the concurrency limit
- ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
- xfers := make([]Transfer, len(ids))
- watchers := make([]*Watcher, len(ids))
- for i, id := range ids {
- xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
- }
- close(testDone)
- for i, xfer := range xfers {
- <-xfer.Done()
- xfer.Release(watchers[i])
- }
- close(progressChan)
- <-progressDone
- for _, id := range ids {
- if receivedProgress[id] != 10 {
- t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
- }
- }
- }
- func TestWatchRelease(t *testing.T) {
- ready := make(chan struct{})
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- xfer := NewTransfer()
- go func() {
- defer func() {
- close(progressChan)
- }()
- <-ready
- for i := int64(0); ; i++ {
- select {
- case <-time.After(10 * time.Millisecond):
- case <-xfer.Context().Done():
- return
- }
- progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
- }
- }()
- return xfer
- }
- }
- tm := NewTransferManager(5)
- type watcherInfo struct {
- watcher *Watcher
- progressChan chan progress.Progress
- progressDone chan struct{}
- receivedFirstProgress chan struct{}
- }
- progressConsumer := func(w watcherInfo) {
- first := true
- for range w.progressChan {
- if first {
- close(w.receivedFirstProgress)
- }
- first = false
- }
- close(w.progressDone)
- }
- // Start a transfer
- watchers := make([]watcherInfo, 5)
- var xfer Transfer
- watchers[0].progressChan = make(chan progress.Progress)
- watchers[0].progressDone = make(chan struct{})
- watchers[0].receivedFirstProgress = make(chan struct{})
- xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
- go progressConsumer(watchers[0])
- // Give it multiple watchers
- for i := 1; i != len(watchers); i++ {
- watchers[i].progressChan = make(chan progress.Progress)
- watchers[i].progressDone = make(chan struct{})
- watchers[i].receivedFirstProgress = make(chan struct{})
- watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
- go progressConsumer(watchers[i])
- }
- // Now that the watchers are set up, allow the transfer goroutine to
- // proceed.
- close(ready)
- // Confirm that each watcher gets progress output.
- for _, w := range watchers {
- <-w.receivedFirstProgress
- }
- // Release one watcher every 5ms
- for _, w := range watchers {
- xfer.Release(w.watcher)
- <-time.After(5 * time.Millisecond)
- }
- // Now that all watchers have been released, Released() should
- // return a closed channel.
- <-xfer.Released()
- // Done() should return a closed channel because the xfer func returned
- // due to cancellation.
- <-xfer.Done()
- for _, w := range watchers {
- close(w.progressChan)
- <-w.progressDone
- }
- }
- func TestWatchFinishedTransfer(t *testing.T) {
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- xfer := NewTransfer()
- go func() {
- // Finish immediately
- close(progressChan)
- }()
- return xfer
- }
- }
- tm := NewTransferManager(5)
- // Start a transfer
- watchers := make([]*Watcher, 3)
- var xfer Transfer
- xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
- // Give it a watcher immediately
- watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
- // Wait for the transfer to complete
- <-xfer.Done()
- // Set up another watcher
- watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
- // Release the watchers
- for _, w := range watchers {
- xfer.Release(w)
- }
- // Now that all watchers have been released, Released() should
- // return a closed channel.
- <-xfer.Released()
- }
- func TestDuplicateTransfer(t *testing.T) {
- ready := make(chan struct{})
- var xferFuncCalls int32
- makeXferFunc := func(id string) DoFunc {
- return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
- atomic.AddInt32(&xferFuncCalls, 1)
- xfer := NewTransfer()
- go func() {
- defer func() {
- close(progressChan)
- }()
- <-ready
- for i := int64(0); ; i++ {
- select {
- case <-time.After(10 * time.Millisecond):
- case <-xfer.Context().Done():
- return
- }
- progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
- }
- }()
- return xfer
- }
- }
- tm := NewTransferManager(5)
- type transferInfo struct {
- xfer Transfer
- watcher *Watcher
- progressChan chan progress.Progress
- progressDone chan struct{}
- receivedFirstProgress chan struct{}
- }
- progressConsumer := func(t transferInfo) {
- first := true
- for range t.progressChan {
- if first {
- close(t.receivedFirstProgress)
- }
- first = false
- }
- close(t.progressDone)
- }
- // Try to start multiple transfers with the same ID
- transfers := make([]transferInfo, 5)
- for i := range transfers {
- t := &transfers[i]
- t.progressChan = make(chan progress.Progress)
- t.progressDone = make(chan struct{})
- t.receivedFirstProgress = make(chan struct{})
- t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
- go progressConsumer(*t)
- }
- // Allow the transfer goroutine to proceed.
- close(ready)
- // Confirm that each watcher gets progress output.
- for _, t := range transfers {
- <-t.receivedFirstProgress
- }
- // Confirm that the transfer function was called exactly once.
- if xferFuncCalls != 1 {
- t.Fatal("transfer function wasn't called exactly once")
- }
- // Release one watcher every 5ms
- for _, t := range transfers {
- t.xfer.Release(t.watcher)
- <-time.After(5 * time.Millisecond)
- }
- for _, t := range transfers {
- // Now that all watchers have been released, Released() should
- // return a closed channel.
- <-t.xfer.Released()
- // Done() should return a closed channel because the xfer func returned
- // due to cancellation.
- <-t.xfer.Done()
- }
- for _, t := range transfers {
- close(t.progressChan)
- <-t.progressDone
- }
- }
|