|
@@ -711,6 +711,38 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+func (c *client) waitServe(ctx context.Context) bool {
|
|
|
+ t := 100 * time.Millisecond
|
|
|
+ delay := time.NewTimer(t)
|
|
|
+ if !delay.Stop() {
|
|
|
+ <-delay.C
|
|
|
+ }
|
|
|
+ defer delay.Stop()
|
|
|
+
|
|
|
+ // `IsServing` will actually block until the service is ready.
|
|
|
+ // However it can return early, so we'll loop with a delay to handle it.
|
|
|
+ for {
|
|
|
+ serving, err := c.client.IsServing(ctx)
|
|
|
+ if err != nil {
|
|
|
+ if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ logrus.WithError(err).Warn("Error while testing if containerd API is ready")
|
|
|
+ }
|
|
|
+
|
|
|
+ if serving {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ delay.Reset(t)
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return false
|
|
|
+ case <-delay.C:
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
var (
|
|
|
err error
|
|
@@ -719,9 +751,16 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
ei libcontainerdtypes.EventInfo
|
|
|
)
|
|
|
|
|
|
+ // Create a new context specifically for this subscription.
|
|
|
+ // The context must be cancelled to cancel the subscription.
|
|
|
+ // In cases where we have to restart event stream processing,
|
|
|
+ // we'll need the original context b/c this one will be cancelled
|
|
|
+ subCtx, cancel := context.WithCancel(ctx)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
// Filter on both namespace *and* topic. To create an "and" filter,
|
|
|
// this must be a single, comma-separated string
|
|
|
- eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
|
|
|
+ eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
|
|
|
|
|
|
c.logger.Debug("processing event stream")
|
|
|
|
|
@@ -732,14 +771,11 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
if err != nil {
|
|
|
errStatus, ok := status.FromError(err)
|
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
|
- c.logger.WithError(err).Error("failed to get event")
|
|
|
-
|
|
|
- // rate limit
|
|
|
- select {
|
|
|
- case <-time.After(time.Second):
|
|
|
+ c.logger.WithError(err).Error("Failed to get event")
|
|
|
+ c.logger.Info("Waiting for containerd to be ready to restart event processing")
|
|
|
+ if c.waitServe(ctx) {
|
|
|
go c.processEventStream(ctx, ns)
|
|
|
return
|
|
|
- case <-ctx.Done():
|
|
|
}
|
|
|
}
|
|
|
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|