libcontainer: client.processEventStream: use locally scoped variables

- use local variables and remove some intermediate variables
- handle the events inside the switch itself; this makes all the
  switch branches use the same logic, instead of "some" using
  a `continue`, and others falling through to have the event handled
  outside of the switch.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-09-17 14:28:33 +02:00
parent e598ffcdf8
commit 96faee9762
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C

View file

@ -20,7 +20,6 @@ import (
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs" cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options" v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
@ -614,13 +613,6 @@ func (c *client) waitServe(ctx context.Context) bool {
} }
func (c *client) processEventStream(ctx context.Context, ns string) { func (c *client) processEventStream(ctx context.Context, ns string) {
var (
err error
ev *events.Envelope
et libcontainerdtypes.EventType
ei libcontainerdtypes.EventInfo
)
// Create a new context specifically for this subscription. // Create a new context specifically for this subscription.
// The context must be cancelled to cancel the subscription. // The context must be cancelled to cancel the subscription.
// In cases where we have to restart event stream processing, // In cases where we have to restart event stream processing,
@ -636,7 +628,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
for { for {
select { select {
case err = <-errC: case err := <-errC:
if err != nil { if err != nil {
errStatus, ok := status.FromError(err) errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled { if !ok || errStatus.Code() != codes.Canceled {
@ -650,7 +642,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown") c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
} }
return return
case ev = <-eventStream: case ev := <-eventStream:
if ev.Event == nil { if ev.Event == nil {
c.logger.WithField("event", ev).Warn("invalid event") c.logger.WithField("event", ev).Warn("invalid event")
continue continue
@ -666,74 +658,60 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
switch t := v.(type) { switch t := v.(type) {
case *apievents.TaskCreate: case *apievents.TaskCreate:
et = libcontainerdtypes.EventCreate c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
ProcessID: t.ContainerID, ProcessID: t.ContainerID,
Pid: t.Pid, Pid: t.Pid,
} })
case *apievents.TaskStart: case *apievents.TaskStart:
et = libcontainerdtypes.EventStart c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
ProcessID: t.ContainerID, ProcessID: t.ContainerID,
Pid: t.Pid, Pid: t.Pid,
} })
case *apievents.TaskExit: case *apievents.TaskExit:
et = libcontainerdtypes.EventExit c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
ProcessID: t.ID, ProcessID: t.ID,
Pid: t.Pid, Pid: t.Pid,
ExitCode: t.ExitStatus, ExitCode: t.ExitStatus,
ExitedAt: t.ExitedAt, ExitedAt: t.ExitedAt,
} })
case *apievents.TaskOOM: case *apievents.TaskOOM:
et = libcontainerdtypes.EventOOM c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
} })
case *apievents.TaskExecAdded: case *apievents.TaskExecAdded:
et = libcontainerdtypes.EventExecAdded c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
ProcessID: t.ExecID, ProcessID: t.ExecID,
} })
case *apievents.TaskExecStarted: case *apievents.TaskExecStarted:
et = libcontainerdtypes.EventExecStarted c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
ProcessID: t.ExecID, ProcessID: t.ExecID,
Pid: t.Pid, Pid: t.Pid,
} })
case *apievents.TaskPaused: case *apievents.TaskPaused:
et = libcontainerdtypes.EventPaused c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
} })
case *apievents.TaskResumed: case *apievents.TaskResumed:
et = libcontainerdtypes.EventResumed c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
ei = libcontainerdtypes.EventInfo{
ContainerID: t.ContainerID, ContainerID: t.ContainerID,
} })
case *apievents.TaskDelete: case *apievents.TaskDelete:
c.logger.WithFields(log.Fields{ c.logger.WithFields(log.Fields{
"topic": ev.Topic, "topic": ev.Topic,
"type": reflect.TypeOf(t), "type": reflect.TypeOf(t),
"container": t.ContainerID, "container": t.ContainerID,
}, }).Info("ignoring event")
).Info("ignoring event")
continue
default: default:
c.logger.WithFields(log.Fields{ c.logger.WithFields(log.Fields{
"topic": ev.Topic, "topic": ev.Topic,
"type": reflect.TypeOf(t), "type": reflect.TypeOf(t),
}, }).Info("ignoring event")
).Info("ignoring event")
continue
} }
c.processEvent(ctx, et, ei)
} }
} }
} }