Merge pull request #46338 from thaJeztah/daemon_events_cleanup

daemon: clean up event handling-code, and remove some dead code
This commit is contained in:
Sebastiaan van Stijn 2023-08-28 13:12:10 +02:00 committed by GitHub
commit 8569e8684f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 68 deletions

View file

@ -16,12 +16,6 @@ import (
swarmapi "github.com/moby/swarmkit/v2/api"
)
var clusterEventAction = map[swarmapi.WatchActionKind]string{
swarmapi.WatchActionKindCreate: "create",
swarmapi.WatchActionKindUpdate: "update",
swarmapi.WatchActionKindRemove: "remove",
}
// LogContainerEvent generates an event related to a container with only the default attributes.
func (daemon *Daemon) LogContainerEvent(container *container.Container, action string) {
daemon.LogContainerEventWithAttributes(container, action, map[string]string{})
@ -34,36 +28,26 @@ func (daemon *Daemon) LogContainerEventWithAttributes(container *container.Conta
attributes["image"] = container.Config.Image
}
attributes["name"] = strings.TrimLeft(container.Name, "/")
actor := events.Actor{
daemon.EventsService.Log(action, events.ContainerEventType, events.Actor{
ID: container.ID,
Attributes: attributes,
}
daemon.EventsService.Log(action, events.ContainerEventType, actor)
})
}
// LogPluginEvent generates an event related to a plugin with only the default attributes.
func (daemon *Daemon) LogPluginEvent(pluginID, refName, action string) {
daemon.LogPluginEventWithAttributes(pluginID, refName, action, map[string]string{})
}
// LogPluginEventWithAttributes generates an event related to a plugin with specific given attributes.
func (daemon *Daemon) LogPluginEventWithAttributes(pluginID, refName, action string, attributes map[string]string) {
attributes["name"] = refName
actor := events.Actor{
daemon.EventsService.Log(action, events.PluginEventType, events.Actor{
ID: pluginID,
Attributes: attributes,
}
daemon.EventsService.Log(action, events.PluginEventType, actor)
Attributes: map[string]string{"name": refName},
})
}
// LogVolumeEvent generates an event related to a volume.
func (daemon *Daemon) LogVolumeEvent(volumeID, action string, attributes map[string]string) {
actor := events.Actor{
daemon.EventsService.Log(action, events.VolumeEventType, events.Actor{
ID: volumeID,
Attributes: attributes,
}
daemon.EventsService.Log(action, events.VolumeEventType, actor)
})
}
// LogNetworkEvent generates an event related to a network with only the default attributes.
@ -75,11 +59,10 @@ func (daemon *Daemon) LogNetworkEvent(nw *libnetwork.Network, action string) {
func (daemon *Daemon) LogNetworkEventWithAttributes(nw *libnetwork.Network, action string, attributes map[string]string) {
attributes["name"] = nw.Name()
attributes["type"] = nw.Type()
actor := events.Actor{
daemon.EventsService.Log(action, events.NetworkEventType, events.Actor{
ID: nw.ID(),
Attributes: attributes,
}
daemon.EventsService.Log(action, events.NetworkEventType, actor)
})
}
// LogDaemonEventWithAttributes generates an event related to the daemon itself with specific given attributes.
@ -97,8 +80,7 @@ func (daemon *Daemon) LogDaemonEventWithAttributes(action string, attributes map
// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
func (daemon *Daemon) SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{}) {
ef := daemonevents.NewFilter(filter)
return daemon.EventsService.SubscribeTopic(since, until, ef)
return daemon.EventsService.SubscribeTopic(since, until, daemonevents.NewFilter(filter))
}
// UnsubscribeFromEvents stops the event subscription for a client by closing the
@ -145,39 +127,33 @@ func (daemon *Daemon) generateClusterEvent(msg *swarmapi.WatchMessage) {
case *swarmapi.Object_Service:
daemon.logServiceEvent(event.Action, v.Service, event.OldObject.GetService())
case *swarmapi.Object_Network:
daemon.logNetworkEvent(event.Action, v.Network, event.OldObject.GetNetwork())
daemon.logNetworkEvent(event.Action, v.Network)
case *swarmapi.Object_Secret:
daemon.logSecretEvent(event.Action, v.Secret, event.OldObject.GetSecret())
daemon.logSecretEvent(event.Action, v.Secret)
case *swarmapi.Object_Config:
daemon.logConfigEvent(event.Action, v.Config, event.OldObject.GetConfig())
daemon.logConfigEvent(event.Action, v.Config)
default:
log.G(context.TODO()).Warnf("unrecognized event: %v", event)
}
}
}
func (daemon *Daemon) logNetworkEvent(action swarmapi.WatchActionKind, net *swarmapi.Network, oldNet *swarmapi.Network) {
attributes := map[string]string{
func (daemon *Daemon) logNetworkEvent(action swarmapi.WatchActionKind, net *swarmapi.Network) {
daemon.logClusterEvent(action, net.ID, events.NetworkEventType, eventTimestamp(net.Meta, action), map[string]string{
"name": net.Spec.Annotations.Name,
}
eventTime := eventTimestamp(net.Meta, action)
daemon.logClusterEvent(action, net.ID, "network", attributes, eventTime)
})
}
func (daemon *Daemon) logSecretEvent(action swarmapi.WatchActionKind, secret *swarmapi.Secret, oldSecret *swarmapi.Secret) {
attributes := map[string]string{
func (daemon *Daemon) logSecretEvent(action swarmapi.WatchActionKind, secret *swarmapi.Secret) {
daemon.logClusterEvent(action, secret.ID, events.SecretEventType, eventTimestamp(secret.Meta, action), map[string]string{
"name": secret.Spec.Annotations.Name,
}
eventTime := eventTimestamp(secret.Meta, action)
daemon.logClusterEvent(action, secret.ID, "secret", attributes, eventTime)
})
}
func (daemon *Daemon) logConfigEvent(action swarmapi.WatchActionKind, config *swarmapi.Config, oldConfig *swarmapi.Config) {
attributes := map[string]string{
func (daemon *Daemon) logConfigEvent(action swarmapi.WatchActionKind, config *swarmapi.Config) {
daemon.logClusterEvent(action, config.ID, events.ConfigEventType, eventTimestamp(config.Meta, action), map[string]string{
"name": config.Spec.Annotations.Name,
}
eventTime := eventTimestamp(config.Meta, action)
daemon.logClusterEvent(action, config.ID, "config", attributes, eventTime)
})
}
func (daemon *Daemon) logNodeEvent(action swarmapi.WatchActionKind, node *swarmapi.Node, oldNode *swarmapi.Node) {
@ -222,7 +198,7 @@ func (daemon *Daemon) logNodeEvent(action swarmapi.WatchActionKind, node *swarma
}
}
daemon.logClusterEvent(action, node.ID, "node", attributes, eventTime)
daemon.logClusterEvent(action, node.ID, events.NodeEventType, eventTime, attributes)
}
func (daemon *Daemon) logServiceEvent(action swarmapi.WatchActionKind, service *swarmapi.Service, oldService *swarmapi.Service) {
@ -269,24 +245,27 @@ func (daemon *Daemon) logServiceEvent(action swarmapi.WatchActionKind, service *
}
}
}
daemon.logClusterEvent(action, service.ID, "service", attributes, eventTime)
daemon.logClusterEvent(action, service.ID, events.ServiceEventType, eventTime, attributes)
}
func (daemon *Daemon) logClusterEvent(action swarmapi.WatchActionKind, id, eventType string, attributes map[string]string, eventTime time.Time) {
actor := events.Actor{
ID: id,
Attributes: attributes,
}
var clusterEventAction = map[swarmapi.WatchActionKind]string{
swarmapi.WatchActionKindCreate: "create",
swarmapi.WatchActionKindUpdate: "update",
swarmapi.WatchActionKindRemove: "remove",
}
jm := events.Message{
Action: clusterEventAction[action],
Type: eventType,
Actor: actor,
func (daemon *Daemon) logClusterEvent(action swarmapi.WatchActionKind, id string, eventType events.Type, eventTime time.Time, attributes map[string]string) {
daemon.EventsService.PublishMessage(events.Message{
Action: clusterEventAction[action],
Type: eventType,
Actor: events.Actor{
ID: id,
Attributes: attributes,
},
Scope: "swarm",
Time: eventTime.UTC().Unix(),
TimeNano: eventTime.UTC().UnixNano(),
}
daemon.EventsService.PublishMessage(jm)
})
}
func eventTimestamp(meta swarmapi.Meta, action swarmapi.WatchActionKind) time.Time {

View file

@ -190,14 +190,14 @@ func TestLoadBufferedEvents(t *testing.T) {
t.Fatal(err)
}
events := &Events{
evts := &Events{
events: []events.Message{*m1, *m2, *m3},
}
since := time.Unix(s, sNano)
until := time.Time{}
out := events.loadBufferedEvents(since, until, nil)
out := evts.loadBufferedEvents(since, until, nil)
if len(out) != 1 {
t.Fatalf("expected 1 message, got %d: %v", len(out), out)
}
@ -236,19 +236,19 @@ func TestLoadBufferedEventsOnlyFromPast(t *testing.T) {
t.Fatal(err)
}
events := &Events{
evts := &Events{
events: []events.Message{*m1, *m2, *m3},
}
since := time.Unix(s, sNano)
until := time.Unix(u, uNano)
out := events.loadBufferedEvents(since, until, nil)
out := evts.loadBufferedEvents(since, until, nil)
if len(out) != 1 {
t.Fatalf("expected 1 message, got %d: %v", len(out), out)
}
if out[0].Type != "network" {
if out[0].Type != events.NetworkEventType {
t.Fatalf("expected network event, got %s", out[0].Type)
}
}
@ -268,14 +268,14 @@ func TestIgnoreBufferedWhenNoTimes(t *testing.T) {
t.Fatal(err)
}
events := &Events{
evts := &Events{
events: []events.Message{*m1, *m2, *m3},
}
since := time.Time{}
until := time.Time{}
out := events.loadBufferedEvents(since, until, nil)
out := evts.loadBufferedEvents(since, until, nil)
if len(out) != 0 {
t.Fatalf("expected 0 buffered events, got %q", out)
}

View file

@ -48,7 +48,7 @@ func TestPause(t *testing.T) {
messages, errs := apiClient.Events(ctx, types.EventsOptions{
Since: since,
Until: until,
Filters: filters.NewArgs(filters.Arg("container", cID)),
Filters: filters.NewArgs(filters.Arg(events.ContainerEventType, cID)),
})
assert.Check(t, is.DeepEqual([]string{"pause", "unpause"}, getEventActions(t, messages, errs)))
}

View file

@ -60,7 +60,7 @@ func TestEventsExecDie(t *testing.T) {
select {
case m := <-msg:
assert.Equal(t, m.Type, "container")
assert.Equal(t, m.Type, events.ContainerEventType)
assert.Equal(t, m.Actor.ID, cID)
assert.Equal(t, m.Action, "exec_die")
assert.Equal(t, m.Actor.Attributes["execID"], id.ID)