Add method CleanContext to broadcaster.Unbuffered

Method .Clean() of broadcaster.Unbuffered may block idefinitely
waiting for it's blocked writes to complete. Add new method
.CleanContext() that allows to forcefully close and remove writers
when context is done.
This is hard to implement with a single global lock for all
operations: there is no way to limit the time for which Write
will hold it and it is impossible for CleanContext() to use
.writers without holding a lock because .Write() may both read and
change .writers.
That's why:
 - introduce two locks instead of one. One of them
   (writersUpdateMu) should be held when updating writers slice -
   and .Write will release it while calling Write in writers. The
   second one (writersWriteMu) will prevent several Writes and
   Adds from happening simultaneously and from interfering with
   each other.
 - make writers a pointer-to-slice instead of slice so that
   CleanContext can assign writers to nil without interfering with
   Write (which will use it's own copy of that pointer).

Signed-off-by: Daniil Sigalov <asterite@seclab.cs.msu.ru>
This commit is contained in:
Daniil Sigalov 2020-12-20 02:46:50 +03:00
parent 89b542b421
commit 0eb5aa645d

View file

@ -1,49 +1,141 @@
package broadcaster // import "github.com/docker/docker/pkg/broadcaster"
import (
"context"
"io"
"sync"
)
// Unbuffered accumulates multiple io.WriteCloser by stream.
type Unbuffered struct {
mu sync.Mutex
writers []io.WriteCloser
writersUpdateMu sync.Mutex
writersWriteMu chan struct{}
initWritersWriteMuOnce sync.Once
writers *[]io.WriteCloser
}
// Add adds new io.WriteCloser.
func (w *Unbuffered) Add(writer io.WriteCloser) {
w.mu.Lock()
w.writers = append(w.writers, writer)
w.mu.Unlock()
w.lockWritersWriteMu()
defer w.unlockWritersWriteMu()
w.writersUpdateMu.Lock()
defer w.writersUpdateMu.Unlock()
if w.writers == nil {
w.writers = &[]io.WriteCloser{}
}
*w.writers = append(*w.writers, writer)
}
// Write writes bytes to all writers. Failed writers will be evicted during
// this call.
func (w *Unbuffered) Write(p []byte) (n int, err error) {
w.mu.Lock()
w.lockWritersWriteMu()
defer w.unlockWritersWriteMu()
w.writersUpdateMu.Lock()
// make a copy of w.writers. Even if .CleanContext sets w.writers, this
// Write call will still use it's valid copy
writers := w.writers
// release w.writersUpdateMu. This allows CleanContext to set w.writers
// to nil - but this Write call won't be affected by this, as it uses a copy
// of that pointer. We will also be able to safely iterate over *writers:
// clean methods never resize the slice (they only may set pointer to nil),
// and Add and Write require w.writersWriteMu, which we are still holding
w.writersUpdateMu.Unlock()
if writers == nil {
return
}
var evict []int
for i, sw := range w.writers {
for i, sw := range *writers {
if n, err := sw.Write(p); err != nil || n != len(p) {
// On error, evict the writer
evict = append(evict, i)
}
}
w.writersUpdateMu.Lock()
// at this point w.writers might have already been set to nil, but we're
// not affected by this as we are using a copy
for n, i := range evict {
w.writers = append(w.writers[:i-n], w.writers[i-n+1:]...)
*writers = append((*writers)[:i-n], (*writers)[i-n+1:]...)
}
w.mu.Unlock()
w.writersUpdateMu.Unlock()
return len(p), nil
}
func (w *Unbuffered) cleanUnlocked() {
if w.writers == nil {
return
}
for _, sw := range *w.writers {
sw.Close()
}
w.writers = nil
}
func (w *Unbuffered) cleanWithWriteLock() {
w.writersUpdateMu.Lock()
defer w.writersUpdateMu.Unlock()
w.cleanUnlocked()
}
// Clean closes and removes all writers. Last non-eol-terminated part of data
// will be saved.
func (w *Unbuffered) Clean() error {
w.mu.Lock()
for _, sw := range w.writers {
sw.Close()
}
w.writers = nil
w.mu.Unlock()
w.lockWritersWriteMu()
defer w.unlockWritersWriteMu()
w.cleanWithWriteLock()
return nil
}
// CleanContext closes and removes all writers.
// CleanContext supports timeouts via the context to unblock and forcefully
// close the io streams. This function should only be used if all writers
// added to Unbuffered support concurrent calls to Close and Write: it will
// call Close while Write may be in progress in order to forcefully close
// writers
func (w *Unbuffered) CleanContext(ctx context.Context) error {
writersWriteMu := w.getWritersWriteMu()
select {
case writersWriteMu <- struct{}{}:
defer w.unlockWritersWriteMu()
case <-ctx.Done():
// forceful cleanup - we will call w.cleanWithWriteLock() without
// actually holding w.writersWriteMu. This may call .Close() on a
// WriteCloser which is being blocked insite Write call
}
w.cleanWithWriteLock()
return ctx.Err()
}
func (w *Unbuffered) initWritersWriteMu() {
w.writersWriteMu = make(chan struct{}, 1)
}
func (w *Unbuffered) getWritersWriteMu() chan struct{} {
w.initWritersWriteMuOnce.Do(w.initWritersWriteMu)
return w.writersWriteMu
}
func (w *Unbuffered) lockWritersWriteMu() {
w.getWritersWriteMu() <- struct{}{}
}
func (w *Unbuffered) unlockWritersWriteMu() {
// this is never called before 'getWritersWriteMu()', so w.writersWriteMu is
// guaranteed to be initialized
<-w.writersWriteMu
}