|
@@ -426,14 +426,19 @@ func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
|
|
|
}
|
|
|
|
|
|
func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error {
|
|
|
- w.taskevents.Publish(task.Copy())
|
|
|
_, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation.
|
|
|
|
|
|
if err != nil {
|
|
|
log.G(ctx).WithError(err).Error("failed to start taskManager")
|
|
|
+ // we ignore this error: it gets reported in the taskStatus within
|
|
|
+ // `newTaskManager`. We log it here and move on. If their is an
|
|
|
+ // attempted restart, the lack of taskManager will have this retry
|
|
|
+ // again.
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
- // TODO(stevvooe): Add start method for taskmanager
|
|
|
+ // only publish if controller resolution was successful.
|
|
|
+ w.taskevents.Publish(task.Copy())
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -464,7 +469,7 @@ func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
- log.G(ctx).Error("controller resolution failed")
|
|
|
+ log.G(ctx).WithError(err).Error("controller resolution failed")
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
@@ -568,9 +573,14 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
|
|
|
case v := <-ch:
|
|
|
task := v.(*api.Task)
|
|
|
if match(task) {
|
|
|
- w.mu.Lock()
|
|
|
- go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
|
|
|
- w.mu.Unlock()
|
|
|
+ w.mu.RLock()
|
|
|
+ tm, ok := w.taskManagers[task.ID]
|
|
|
+ w.mu.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ go tm.Logs(ctx, *subscription.Options, publisher)
|
|
|
}
|
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|