|
@@ -130,7 +130,11 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) {
|
|
c.mu.Lock()
|
|
c.mu.Lock()
|
|
// detect case where caller has just returned, let it clean up before
|
|
// detect case where caller has just returned, let it clean up before
|
|
select {
|
|
select {
|
|
- case <-c.ready: // could return if no error
|
|
|
|
|
|
+ case <-c.ready:
|
|
|
|
+ c.mu.Unlock()
|
|
|
|
+ <-c.cleaned
|
|
|
|
+ return nil, errRetry
|
|
|
|
+ case <-c.ctx.done: // could return if no error
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
<-c.cleaned
|
|
<-c.cleaned
|
|
return nil, errRetry
|
|
return nil, errRetry
|
|
@@ -141,6 +145,10 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) {
|
|
if ok {
|
|
if ok {
|
|
c.progressState.add(pw)
|
|
c.progressState.add(pw)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ ctx, cancel := context.WithCancel(ctx)
|
|
|
|
+ defer cancel()
|
|
|
|
+
|
|
c.ctxs = append(c.ctxs, ctx)
|
|
c.ctxs = append(c.ctxs, ctx)
|
|
|
|
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
@@ -149,18 +157,16 @@ func (c *call) wait(ctx context.Context) (v interface{}, err error) {
|
|
|
|
|
|
select {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- select {
|
|
|
|
- case <-c.ctx.Done():
|
|
|
|
|
|
+ if c.ctx.checkDone() {
|
|
// if this cancelled the last context, then wait for function to shut down
|
|
// if this cancelled the last context, then wait for function to shut down
|
|
// and don't accept any more callers
|
|
// and don't accept any more callers
|
|
<-c.ready
|
|
<-c.ready
|
|
return c.result, c.err
|
|
return c.result, c.err
|
|
- default:
|
|
|
|
- if ok {
|
|
|
|
- c.progressState.close(pw)
|
|
|
|
- }
|
|
|
|
- return nil, ctx.Err()
|
|
|
|
}
|
|
}
|
|
|
|
+ if ok {
|
|
|
|
+ c.progressState.close(pw)
|
|
|
|
+ }
|
|
|
|
+ return nil, ctx.Err()
|
|
case <-c.ready:
|
|
case <-c.ready:
|
|
return c.result, c.err // shared not implemented yet
|
|
return c.result, c.err // shared not implemented yet
|
|
}
|
|
}
|
|
@@ -183,9 +189,6 @@ func (c *call) Deadline() (deadline time.Time, ok bool) {
|
|
}
|
|
}
|
|
|
|
|
|
func (c *call) Done() <-chan struct{} {
|
|
func (c *call) Done() <-chan struct{} {
|
|
- c.mu.Lock()
|
|
|
|
- c.ctx.signal()
|
|
|
|
- c.mu.Unlock()
|
|
|
|
return c.ctx.done
|
|
return c.ctx.done
|
|
}
|
|
}
|
|
|
|
|
|
@@ -238,23 +241,28 @@ func newContext(c *call) *sharedContext {
|
|
return &sharedContext{call: c, done: make(chan struct{})}
|
|
return &sharedContext{call: c, done: make(chan struct{})}
|
|
}
|
|
}
|
|
|
|
|
|
-// call with lock
|
|
|
|
-func (c *sharedContext) signal() {
|
|
|
|
|
|
+func (sc *sharedContext) checkDone() bool {
|
|
|
|
+ sc.mu.Lock()
|
|
select {
|
|
select {
|
|
- case <-c.done:
|
|
|
|
|
|
+ case <-sc.done:
|
|
|
|
+ sc.mu.Unlock()
|
|
|
|
+ return true
|
|
default:
|
|
default:
|
|
- var err error
|
|
|
|
- for _, ctx := range c.ctxs {
|
|
|
|
- select {
|
|
|
|
- case <-ctx.Done():
|
|
|
|
- err = ctx.Err()
|
|
|
|
- default:
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
+ var err error
|
|
|
|
+ for _, ctx := range sc.ctxs {
|
|
|
|
+ select {
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
+ err = ctx.Err()
|
|
|
|
+ default:
|
|
|
|
+ sc.mu.Unlock()
|
|
|
|
+ return false
|
|
}
|
|
}
|
|
- c.err = err
|
|
|
|
- close(c.done)
|
|
|
|
}
|
|
}
|
|
|
|
+ sc.err = err
|
|
|
|
+ close(sc.done)
|
|
|
|
+ sc.mu.Unlock()
|
|
|
|
+ return true
|
|
}
|
|
}
|
|
|
|
|
|
type rawProgressWriter interface {
|
|
type rawProgressWriter interface {
|