|
@@ -20,7 +20,6 @@ import (
|
|
|
"github.com/containerd/containerd/cio"
|
|
|
"github.com/containerd/containerd/content"
|
|
|
cerrdefs "github.com/containerd/containerd/errdefs"
|
|
|
- "github.com/containerd/containerd/events"
|
|
|
"github.com/containerd/containerd/images"
|
|
|
"github.com/containerd/containerd/log"
|
|
|
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
|
@@ -614,13 +613,6 @@ func (c *client) waitServe(ctx context.Context) bool {
|
|
|
}
|
|
|
|
|
|
func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
- var (
|
|
|
- err error
|
|
|
- ev *events.Envelope
|
|
|
- et libcontainerdtypes.EventType
|
|
|
- ei libcontainerdtypes.EventInfo
|
|
|
- )
|
|
|
-
|
|
|
// Create a new context specifically for this subscription.
|
|
|
// The context must be cancelled to cancel the subscription.
|
|
|
// In cases where we have to restart event stream processing,
|
|
@@ -636,7 +628,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
- case err = <-errC:
|
|
|
+ case err := <-errC:
|
|
|
if err != nil {
|
|
|
errStatus, ok := status.FromError(err)
|
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
@@ -650,7 +642,7 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|
|
|
}
|
|
|
return
|
|
|
- case ev = <-eventStream:
|
|
|
+ case ev := <-eventStream:
|
|
|
if ev.Event == nil {
|
|
|
c.logger.WithField("event", ev).Warn("invalid event")
|
|
|
continue
|
|
@@ -666,74 +658,60 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
|
|
|
|
|
|
switch t := v.(type) {
|
|
|
case *apievents.TaskCreate:
|
|
|
- et = libcontainerdtypes.EventCreate
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventCreate, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
ProcessID: t.ContainerID,
|
|
|
Pid: t.Pid,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskStart:
|
|
|
- et = libcontainerdtypes.EventStart
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventStart, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
ProcessID: t.ContainerID,
|
|
|
Pid: t.Pid,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskExit:
|
|
|
- et = libcontainerdtypes.EventExit
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventExit, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
ProcessID: t.ID,
|
|
|
Pid: t.Pid,
|
|
|
ExitCode: t.ExitStatus,
|
|
|
ExitedAt: t.ExitedAt,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskOOM:
|
|
|
- et = libcontainerdtypes.EventOOM
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventOOM, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskExecAdded:
|
|
|
- et = libcontainerdtypes.EventExecAdded
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventExecAdded, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
ProcessID: t.ExecID,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskExecStarted:
|
|
|
- et = libcontainerdtypes.EventExecStarted
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventExecStarted, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
ProcessID: t.ExecID,
|
|
|
Pid: t.Pid,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskPaused:
|
|
|
- et = libcontainerdtypes.EventPaused
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventPaused, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskResumed:
|
|
|
- et = libcontainerdtypes.EventResumed
|
|
|
- ei = libcontainerdtypes.EventInfo{
|
|
|
+ c.processEvent(ctx, libcontainerdtypes.EventResumed, libcontainerdtypes.EventInfo{
|
|
|
ContainerID: t.ContainerID,
|
|
|
- }
|
|
|
+ })
|
|
|
case *apievents.TaskDelete:
|
|
|
c.logger.WithFields(log.Fields{
|
|
|
"topic": ev.Topic,
|
|
|
"type": reflect.TypeOf(t),
|
|
|
"container": t.ContainerID,
|
|
|
- },
|
|
|
- ).Info("ignoring event")
|
|
|
- continue
|
|
|
+ }).Info("ignoring event")
|
|
|
default:
|
|
|
c.logger.WithFields(log.Fields{
|
|
|
"topic": ev.Topic,
|
|
|
"type": reflect.TypeOf(t),
|
|
|
- },
|
|
|
- ).Info("ignoring event")
|
|
|
- continue
|
|
|
+ }).Info("ignoring event")
|
|
|
}
|
|
|
-
|
|
|
- c.processEvent(ctx, et, ei)
|
|
|
}
|
|
|
}
|
|
|
}
|