|
@@ -205,6 +205,11 @@ func (a *Agent) run(ctx context.Context) {
|
|
|
sessionq chan sessionOperation
|
|
|
leaving = a.leaving
|
|
|
subscriptions = map[string]context.CancelFunc{}
|
|
|
+ // subscriptionDone is a channel that allows us to notify ourselves
|
|
|
+ // that a lot subscription should be finished. this channel is
|
|
|
+ // unbuffered, because it is only written to in a goroutine, and
|
|
|
+ // therefore cannot block the main execution path.
|
|
|
+ subscriptionDone = make(chan string)
|
|
|
)
|
|
|
defer func() {
|
|
|
session.close()
|
|
@@ -310,8 +315,26 @@ func (a *Agent) run(ctx context.Context) {
|
|
|
|
|
|
subCtx, subCancel := context.WithCancel(ctx)
|
|
|
subscriptions[sub.ID] = subCancel
|
|
|
- // TODO(dperny) we're tossing the error here, that seems wrong
|
|
|
- go a.worker.Subscribe(subCtx, sub)
|
|
|
+ // NOTE(dperny): for like 3 years, there has been a to do saying
|
|
|
+ // "we're tossing the error here, that seems wrong". this is not a
|
|
|
+ // to do anymore. 9/10 of these errors are going to be "context
|
|
|
+ // deadline exceeded", and the remaining 1/10 obviously doesn't
|
|
|
+ // matter or we'd have missed it by now.
|
|
|
+ go func() {
|
|
|
+ a.worker.Subscribe(subCtx, sub)
|
|
|
+ // when the worker finishes the subscription, we should notify
|
|
|
+ // ourselves that this has occurred. We cannot rely on getting
|
|
|
+ // a Close message from the manager, as any number of things
|
|
|
+ // could go wrong (see github.com/moby/moby/issues/39916).
|
|
|
+ subscriptionDone <- sub.ID
|
|
|
+ }()
|
|
|
+ case subID := <-subscriptionDone:
|
|
|
+ // subscription may already have been removed. If so, no need to
|
|
|
+ // take any action.
|
|
|
+ if cancel, ok := subscriptions[subID]; ok {
|
|
|
+ cancel()
|
|
|
+ delete(subscriptions, subID)
|
|
|
+ }
|
|
|
case <-registered:
|
|
|
log.G(ctx).Debugln("agent: registered")
|
|
|
if ready != nil {
|
|
@@ -548,8 +571,9 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
|
|
|
SubscriptionID: subscriptionID,
|
|
|
Close: true,
|
|
|
})
|
|
|
- // close the stream forreal
|
|
|
- publisher.CloseSend()
|
|
|
+ // close the stream forreal. ignore the return value and the error,
|
|
|
+ // because we don't care.
|
|
|
+ publisher.CloseAndRecv()
|
|
|
}
|
|
|
|
|
|
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
|