diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index 7f323c1df5..6852225488 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -1,6 +1,7 @@ package xfer import ( + "runtime" "sync" "github.com/docker/docker/pkg/progress" @@ -38,7 +39,7 @@ type Transfer interface { Watch(progressOutput progress.Output) *Watcher Release(*Watcher) Context() context.Context - Cancel() + Close() Done() <-chan struct{} Released() <-chan struct{} Broadcast(masterProgressChan <-chan progress.Progress) @@ -61,11 +62,14 @@ type transfer struct { // running remains open as long as the transfer is in progress. running chan struct{} - // hasWatchers stays open until all watchers release the transfer. - hasWatchers chan struct{} + // released stays open until all watchers release the transfer and + // the transfer is no longer tracked by the transfer manager. + released chan struct{} // broadcastDone is true if the master progress channel has closed. broadcastDone bool + // closed is true if Close has been called + closed bool // broadcastSyncChan allows watchers to "ping" the broadcasting // goroutine to wait for it for deplete its input channel. This ensures // a detaching watcher won't miss an event that was sent before it @@ -78,7 +82,7 @@ func NewTransfer() Transfer { t := &transfer{ watchers: make(map[chan struct{}]*Watcher), running: make(chan struct{}), - hasWatchers: make(chan struct{}), + released: make(chan struct{}), broadcastSyncChan: make(chan struct{}), } @@ -144,13 +148,13 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher { running: make(chan struct{}), } + t.watchers[w.releaseChan] = w + if t.broadcastDone { close(w.running) return w } - t.watchers[w.releaseChan] = w - go func() { defer func() { close(w.running) @@ -202,8 +206,19 @@ func (t *transfer) Release(watcher *Watcher) { delete(t.watchers, watcher.releaseChan) if len(t.watchers) == 0 { - close(t.hasWatchers) - t.cancel() + if t.closed { + // released may have been closed already if all + // watchers were released, then another one was added + // while waiting for a previous watcher goroutine to + // finish. + select { + case <-t.released: + default: + close(t.released) + } + } else { + t.cancel() + } } t.mu.Unlock() @@ -223,9 +238,9 @@ func (t *transfer) Done() <-chan struct{} { } // Released returns a channel which is closed once all watchers release the -// transfer. +// transfer AND the transfer is no longer tracked by the transfer manager. func (t *transfer) Released() <-chan struct{} { - return t.hasWatchers + return t.released } // Context returns the context associated with the transfer. @@ -233,9 +248,15 @@ func (t *transfer) Context() context.Context { return t.ctx } -// Cancel cancels the context associated with the transfer. -func (t *transfer) Cancel() { - t.cancel() +// Close is called by the transfer manager when the transfer is no longer +// being tracked. +func (t *transfer) Close() { + t.mu.Lock() + t.closed = true + if len(t.watchers) == 0 { + close(t.released) + } + t.mu.Unlock() } // DoFunc is a function called by the transfer manager to actually perform @@ -280,10 +301,33 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput tm.mu.Lock() defer tm.mu.Unlock() - if xfer, present := tm.transfers[key]; present { + for { + xfer, present := tm.transfers[key] + if !present { + break + } // Transfer is already in progress. watcher := xfer.Watch(progressOutput) - return xfer, watcher + + select { + case <-xfer.Context().Done(): + // We don't want to watch a transfer that has been cancelled. + // Wait for it to be removed from the map and try again. + xfer.Release(watcher) + tm.mu.Unlock() + // The goroutine that removes this transfer from the + // map is also waiting for xfer.Done(), so yield to it. + // This could be avoided by adding a Closed method + // to Transfer to allow explicitly waiting for it to be + // removed the map, but forcing a scheduling round in + // this very rare case seems better than bloating the + // interface definition. + runtime.Gosched() + <-xfer.Done() + tm.mu.Lock() + default: + return xfer, watcher + } } start := make(chan struct{}) @@ -318,6 +362,7 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput } delete(tm.transfers, key) tm.mu.Unlock() + xfer.Close() return } } diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index 7eeb304033..8fe24661bc 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -291,6 +291,44 @@ func TestWatchRelease(t *testing.T) { } } +func TestWatchFinishedTransfer(t *testing.T) { + makeXferFunc := func(id string) DoFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { + xfer := NewTransfer() + go func() { + // Finish immediately + close(progressChan) + }() + return xfer + } + } + + tm := NewTransferManager(5) + + // Start a transfer + watchers := make([]*Watcher, 3) + var xfer Transfer + xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) + + // Give it a watcher immediately + watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + + // Wait for the transfer to complete + <-xfer.Done() + + // Set up another watcher + watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + + // Release the watchers + for _, w := range watchers { + xfer.Release(w) + } + + // Now that all watchers have been released, Released() should + // return a closed channel. + <-xfer.Released() +} + func TestDuplicateTransfer(t *testing.T) { ready := make(chan struct{})