diff --git a/libcontainerd/remote/client.go b/libcontainerd/remote/client.go index 11a5990a7b..f21076afb2 100644 --- a/libcontainerd/remote/client.go +++ b/libcontainerd/remote/client.go @@ -711,6 +711,35 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy }) } +func (c *client) waitServe(ctx context.Context) bool { + t := 100 * time.Millisecond + delay := time.NewTimer(t) + if !delay.Stop() { + <-delay.C + } + defer delay.Stop() + + // `IsServing` will actually block until the service is ready. + // However it can return early, so we'll loop with a delay to handle it. + for { + serving, _ := c.client.IsServing(ctx) + if ctx.Err() != nil { + return false + } + + if serving { + return true + } + + delay.Reset(t) + select { + case <-ctx.Done(): + return false + case <-delay.C: + } + } +} + func (c *client) processEventStream(ctx context.Context, ns string) { var ( err error @@ -732,14 +761,11 @@ func (c *client) processEventStream(ctx context.Context, ns string) { if err != nil { errStatus, ok := status.FromError(err) if !ok || errStatus.Code() != codes.Canceled { - c.logger.WithError(err).Error("failed to get event") - - // rate limit - select { - case <-time.After(time.Second): + c.logger.WithError(err).Error("Failed to get event") + c.logger.Info("Waiting for containerd to be ready to restart event processing") + if c.waitServe(ctx) { go c.processEventStream(ctx, ns) return - case <-ctx.Done(): } } c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")