events.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package daemon
  2. import (
  3. "context"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/api/types/events"
  9. "github.com/docker/docker/api/types/filters"
  10. "github.com/docker/docker/container"
  11. daemonevents "github.com/docker/docker/daemon/events"
  12. "github.com/docker/libnetwork"
  13. swarmapi "github.com/docker/swarmkit/api"
  14. gogotypes "github.com/gogo/protobuf/types"
  15. )
  16. var (
  17. clusterEventAction = map[swarmapi.WatchActionKind]string{
  18. swarmapi.WatchActionKindCreate: "create",
  19. swarmapi.WatchActionKindUpdate: "update",
  20. swarmapi.WatchActionKindRemove: "remove",
  21. }
  22. )
  23. // LogContainerEvent generates an event related to a container with only the default attributes.
  24. func (daemon *Daemon) LogContainerEvent(container *container.Container, action string) {
  25. daemon.LogContainerEventWithAttributes(container, action, map[string]string{})
  26. }
  27. // LogContainerEventWithAttributes generates an event related to a container with specific given attributes.
  28. func (daemon *Daemon) LogContainerEventWithAttributes(container *container.Container, action string, attributes map[string]string) {
  29. copyAttributes(attributes, container.Config.Labels)
  30. if container.Config.Image != "" {
  31. attributes["image"] = container.Config.Image
  32. }
  33. attributes["name"] = strings.TrimLeft(container.Name, "/")
  34. actor := events.Actor{
  35. ID: container.ID,
  36. Attributes: attributes,
  37. }
  38. daemon.EventsService.Log(action, events.ContainerEventType, actor)
  39. }
  40. // LogImageEvent generates an event related to an image with only the default attributes.
  41. func (daemon *Daemon) LogImageEvent(imageID, refName, action string) {
  42. daemon.LogImageEventWithAttributes(imageID, refName, action, map[string]string{})
  43. }
  44. // LogImageEventWithAttributes generates an event related to an image with specific given attributes.
  45. func (daemon *Daemon) LogImageEventWithAttributes(imageID, refName, action string, attributes map[string]string) {
  46. img, err := daemon.GetImage(imageID)
  47. if err == nil && img.Config != nil {
  48. // image has not been removed yet.
  49. // it could be missing if the event is `delete`.
  50. copyAttributes(attributes, img.Config.Labels)
  51. }
  52. if refName != "" {
  53. attributes["name"] = refName
  54. }
  55. actor := events.Actor{
  56. ID: imageID,
  57. Attributes: attributes,
  58. }
  59. daemon.EventsService.Log(action, events.ImageEventType, actor)
  60. }
  61. // LogPluginEvent generates an event related to a plugin with only the default attributes.
  62. func (daemon *Daemon) LogPluginEvent(pluginID, refName, action string) {
  63. daemon.LogPluginEventWithAttributes(pluginID, refName, action, map[string]string{})
  64. }
  65. // LogPluginEventWithAttributes generates an event related to a plugin with specific given attributes.
  66. func (daemon *Daemon) LogPluginEventWithAttributes(pluginID, refName, action string, attributes map[string]string) {
  67. attributes["name"] = refName
  68. actor := events.Actor{
  69. ID: pluginID,
  70. Attributes: attributes,
  71. }
  72. daemon.EventsService.Log(action, events.PluginEventType, actor)
  73. }
  74. // LogVolumeEvent generates an event related to a volume.
  75. func (daemon *Daemon) LogVolumeEvent(volumeID, action string, attributes map[string]string) {
  76. actor := events.Actor{
  77. ID: volumeID,
  78. Attributes: attributes,
  79. }
  80. daemon.EventsService.Log(action, events.VolumeEventType, actor)
  81. }
  82. // LogNetworkEvent generates an event related to a network with only the default attributes.
  83. func (daemon *Daemon) LogNetworkEvent(nw libnetwork.Network, action string) {
  84. daemon.LogNetworkEventWithAttributes(nw, action, map[string]string{})
  85. }
  86. // LogNetworkEventWithAttributes generates an event related to a network with specific given attributes.
  87. func (daemon *Daemon) LogNetworkEventWithAttributes(nw libnetwork.Network, action string, attributes map[string]string) {
  88. attributes["name"] = nw.Name()
  89. attributes["type"] = nw.Type()
  90. actor := events.Actor{
  91. ID: nw.ID(),
  92. Attributes: attributes,
  93. }
  94. daemon.EventsService.Log(action, events.NetworkEventType, actor)
  95. }
  96. // LogDaemonEventWithAttributes generates an event related to the daemon itself with specific given attributes.
  97. func (daemon *Daemon) LogDaemonEventWithAttributes(action string, attributes map[string]string) {
  98. if daemon.EventsService != nil {
  99. if info, err := daemon.SystemInfo(); err == nil && info.Name != "" {
  100. attributes["name"] = info.Name
  101. }
  102. actor := events.Actor{
  103. ID: daemon.ID,
  104. Attributes: attributes,
  105. }
  106. daemon.EventsService.Log(action, events.DaemonEventType, actor)
  107. }
  108. }
  109. // SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
  110. func (daemon *Daemon) SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{}) {
  111. ef := daemonevents.NewFilter(filter)
  112. return daemon.EventsService.SubscribeTopic(since, until, ef)
  113. }
  114. // UnsubscribeFromEvents stops the event subscription for a client by closing the
  115. // channel where the daemon sends events to.
  116. func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) {
  117. daemon.EventsService.Evict(listener)
  118. }
  119. // copyAttributes guarantees that labels are not mutated by event triggers.
  120. func copyAttributes(attributes, labels map[string]string) {
  121. if labels == nil {
  122. return
  123. }
  124. for k, v := range labels {
  125. attributes[k] = v
  126. }
  127. }
  128. // ProcessClusterNotifications gets changes from store and add them to event list
  129. func (daemon *Daemon) ProcessClusterNotifications(ctx context.Context, watchStream chan *swarmapi.WatchMessage) {
  130. for {
  131. select {
  132. case <-ctx.Done():
  133. return
  134. case message, ok := <-watchStream:
  135. if !ok {
  136. logrus.Debug("cluster event channel has stopped")
  137. return
  138. }
  139. daemon.generateClusterEvent(message)
  140. }
  141. }
  142. }
  143. func (daemon *Daemon) generateClusterEvent(msg *swarmapi.WatchMessage) {
  144. for _, event := range msg.Events {
  145. if event.Object == nil {
  146. logrus.Errorf("event without object: %v", event)
  147. continue
  148. }
  149. switch v := event.Object.GetObject().(type) {
  150. case *swarmapi.Object_Node:
  151. daemon.logNodeEvent(event.Action, v.Node, event.OldObject.GetNode())
  152. case *swarmapi.Object_Service:
  153. daemon.logServiceEvent(event.Action, v.Service, event.OldObject.GetService())
  154. case *swarmapi.Object_Network:
  155. daemon.logNetworkEvent(event.Action, v.Network, event.OldObject.GetNetwork())
  156. case *swarmapi.Object_Secret:
  157. daemon.logSecretEvent(event.Action, v.Secret, event.OldObject.GetSecret())
  158. default:
  159. logrus.Warnf("unrecognized event: %v", event)
  160. }
  161. }
  162. }
  163. func (daemon *Daemon) logNetworkEvent(action swarmapi.WatchActionKind, net *swarmapi.Network, oldNet *swarmapi.Network) {
  164. attributes := map[string]string{
  165. "name": net.Spec.Annotations.Name,
  166. }
  167. eventTime := eventTimestamp(net.Meta, action)
  168. daemon.logClusterEvent(action, net.ID, "network", attributes, eventTime)
  169. }
  170. func (daemon *Daemon) logSecretEvent(action swarmapi.WatchActionKind, secret *swarmapi.Secret, oldSecret *swarmapi.Secret) {
  171. attributes := map[string]string{
  172. "name": secret.Spec.Annotations.Name,
  173. }
  174. eventTime := eventTimestamp(secret.Meta, action)
  175. daemon.logClusterEvent(action, secret.ID, "secret", attributes, eventTime)
  176. }
  177. func (daemon *Daemon) logNodeEvent(action swarmapi.WatchActionKind, node *swarmapi.Node, oldNode *swarmapi.Node) {
  178. name := node.Spec.Annotations.Name
  179. if name == "" && node.Description != nil {
  180. name = node.Description.Hostname
  181. }
  182. attributes := map[string]string{
  183. "name": name,
  184. }
  185. eventTime := eventTimestamp(node.Meta, action)
  186. // In an update event, display the changes in attributes
  187. if action == swarmapi.WatchActionKindUpdate && oldNode != nil {
  188. if node.Spec.Availability != oldNode.Spec.Availability {
  189. attributes["availability.old"] = strings.ToLower(oldNode.Spec.Availability.String())
  190. attributes["availability.new"] = strings.ToLower(node.Spec.Availability.String())
  191. }
  192. if node.Role != oldNode.Role {
  193. attributes["role.old"] = strings.ToLower(oldNode.Role.String())
  194. attributes["role.new"] = strings.ToLower(node.Role.String())
  195. }
  196. if node.Status.State != oldNode.Status.State {
  197. attributes["state.old"] = strings.ToLower(oldNode.Status.State.String())
  198. attributes["state.new"] = strings.ToLower(node.Status.State.String())
  199. }
  200. // This handles change within manager role
  201. if node.ManagerStatus != nil && oldNode.ManagerStatus != nil {
  202. // leader change
  203. if node.ManagerStatus.Leader != oldNode.ManagerStatus.Leader {
  204. if node.ManagerStatus.Leader {
  205. attributes["leader.old"] = "false"
  206. attributes["leader.new"] = "true"
  207. } else {
  208. attributes["leader.old"] = "true"
  209. attributes["leader.new"] = "false"
  210. }
  211. }
  212. if node.ManagerStatus.Reachability != oldNode.ManagerStatus.Reachability {
  213. attributes["reachability.old"] = strings.ToLower(oldNode.ManagerStatus.Reachability.String())
  214. attributes["reachability.new"] = strings.ToLower(node.ManagerStatus.Reachability.String())
  215. }
  216. }
  217. }
  218. daemon.logClusterEvent(action, node.ID, "node", attributes, eventTime)
  219. }
  220. func (daemon *Daemon) logServiceEvent(action swarmapi.WatchActionKind, service *swarmapi.Service, oldService *swarmapi.Service) {
  221. attributes := map[string]string{
  222. "name": service.Spec.Annotations.Name,
  223. }
  224. eventTime := eventTimestamp(service.Meta, action)
  225. if action == swarmapi.WatchActionKindUpdate && oldService != nil {
  226. // check image
  227. if x, ok := service.Spec.Task.GetRuntime().(*swarmapi.TaskSpec_Container); ok {
  228. containerSpec := x.Container
  229. if y, ok := oldService.Spec.Task.GetRuntime().(*swarmapi.TaskSpec_Container); ok {
  230. oldContainerSpec := y.Container
  231. if containerSpec.Image != oldContainerSpec.Image {
  232. attributes["image.old"] = oldContainerSpec.Image
  233. attributes["image.new"] = containerSpec.Image
  234. }
  235. } else {
  236. // This should not happen.
  237. logrus.Errorf("service %s runtime changed from %T to %T", service.Spec.Annotations.Name, oldService.Spec.Task.GetRuntime(), service.Spec.Task.GetRuntime())
  238. }
  239. }
  240. // check replicated count change
  241. if x, ok := service.Spec.GetMode().(*swarmapi.ServiceSpec_Replicated); ok {
  242. replicas := x.Replicated.Replicas
  243. if y, ok := oldService.Spec.GetMode().(*swarmapi.ServiceSpec_Replicated); ok {
  244. oldReplicas := y.Replicated.Replicas
  245. if replicas != oldReplicas {
  246. attributes["replicas.old"] = strconv.FormatUint(oldReplicas, 10)
  247. attributes["replicas.new"] = strconv.FormatUint(replicas, 10)
  248. }
  249. } else {
  250. // This should not happen.
  251. logrus.Errorf("service %s mode changed from %T to %T", service.Spec.Annotations.Name, oldService.Spec.GetMode(), service.Spec.GetMode())
  252. }
  253. }
  254. if service.UpdateStatus != nil {
  255. if oldService.UpdateStatus == nil {
  256. attributes["updatestate.new"] = strings.ToLower(service.UpdateStatus.State.String())
  257. } else if service.UpdateStatus.State != oldService.UpdateStatus.State {
  258. attributes["updatestate.old"] = strings.ToLower(oldService.UpdateStatus.State.String())
  259. attributes["updatestate.new"] = strings.ToLower(service.UpdateStatus.State.String())
  260. }
  261. }
  262. }
  263. daemon.logClusterEvent(action, service.ID, "service", attributes, eventTime)
  264. }
  265. func (daemon *Daemon) logClusterEvent(action swarmapi.WatchActionKind, id, eventType string, attributes map[string]string, eventTime time.Time) {
  266. actor := events.Actor{
  267. ID: id,
  268. Attributes: attributes,
  269. }
  270. jm := events.Message{
  271. Action: clusterEventAction[action],
  272. Type: eventType,
  273. Actor: actor,
  274. Scope: "swarm",
  275. Time: eventTime.UTC().Unix(),
  276. TimeNano: eventTime.UTC().UnixNano(),
  277. }
  278. daemon.EventsService.PublishMessage(jm)
  279. }
  280. func eventTimestamp(meta swarmapi.Meta, action swarmapi.WatchActionKind) time.Time {
  281. var eventTime time.Time
  282. switch action {
  283. case swarmapi.WatchActionKindCreate:
  284. eventTime, _ = gogotypes.TimestampFromProto(meta.CreatedAt)
  285. case swarmapi.WatchActionKindUpdate:
  286. eventTime, _ = gogotypes.TimestampFromProto(meta.UpdatedAt)
  287. case swarmapi.WatchActionKindRemove:
  288. // There is no timestamp from store message for remove operations.
  289. // Use current time.
  290. eventTime = time.Now()
  291. }
  292. return eventTime
  293. }