|
@@ -711,6 +711,35 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+func (c *client) waitServe(ctx context.Context) bool {
|
|
|
+ t := 100 * time.Millisecond
|
|
|
+ delay := time.NewTimer(t)
|
|
|
+ if !delay.Stop() {
|
|
|
+ <-delay.C
|
|
|
+ }
|
|
|
+ defer delay.Stop()
|
|
|
+
|
|
|
+ // `IsServing` will actually block until the service is ready.
|
|
|
+ // However it can return early, so we'll loop with a delay to handle it.
|
|
|
+ for {
|
|
|
+ serving, _ := c.client.IsServing(ctx)
|
|
|
+ if ctx.Err() != nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ if serving {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ delay.Reset(t)
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return false
|
|
|
+ case <-delay.C:
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
var (
|
|
|
err error
|
|
@@ -732,14 +761,11 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
if err != nil {
|
|
|
errStatus, ok := status.FromError(err)
|
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
|
- c.logger.WithError(err).Error("failed to get event")
|
|
|
-
|
|
|
- // rate limit
|
|
|
- select {
|
|
|
- case <-time.After(time.Second):
|
|
|
+ c.logger.WithError(err).Error("Failed to get event")
|
|
|
+ c.logger.Info("Waiting for containerd to be ready to restart event processing")
|
|
|
+ if c.waitServe(ctx) {
|
|
|
go c.processEventStream(ctx, ns)
|
|
|
return
|
|
|
- case <-ctx.Done():
|
|
|
}
|
|
|
}
|
|
|
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|