|
@@ -703,10 +703,16 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
errStatus, ok := status.FromError(err)
|
|
errStatus, ok := status.FromError(err)
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
c.logger.WithError(err).Error("failed to get event")
|
|
c.logger.WithError(err).Error("failed to get event")
|
|
- go c.processEventStream(ctx, ns)
|
|
|
|
- } else {
|
|
|
|
- c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|
|
|
|
|
|
+
|
|
|
|
+ // rate limit
|
|
|
|
+ select {
|
|
|
|
+ case <-time.After(time.Second):
|
|
|
|
+ go c.processEventStream(ctx, ns)
|
|
|
|
+ return
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|
|
}
|
|
}
|
|
return
|
|
return
|
|
case ev = <-eventStream:
|
|
case ev = <-eventStream:
|