nodeinfo.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package scheduler
  2. import (
  3. "time"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/log"
  6. "golang.org/x/net/context"
  7. )
  8. // hostPortSpec specifies a used host port.
  9. type hostPortSpec struct {
  10. protocol api.PortConfig_Protocol
  11. publishedPort uint32
  12. }
  13. // NodeInfo contains a node and some additional metadata.
  14. type NodeInfo struct {
  15. *api.Node
  16. Tasks map[string]*api.Task
  17. ActiveTasksCount int
  18. ActiveTasksCountByService map[string]int
  19. AvailableResources api.Resources
  20. usedHostPorts map[hostPortSpec]struct{}
  21. // recentFailures is a map from service ID to the timestamps of the
  22. // most recent failures the node has experienced from replicas of that
  23. // service.
  24. // TODO(aaronl): When spec versioning is supported, this should track
  25. // the version of the spec that failed.
  26. recentFailures map[string][]time.Time
  27. }
  28. func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api.Resources) NodeInfo {
  29. nodeInfo := NodeInfo{
  30. Node: n,
  31. Tasks: make(map[string]*api.Task),
  32. ActiveTasksCountByService: make(map[string]int),
  33. AvailableResources: availableResources,
  34. usedHostPorts: make(map[hostPortSpec]struct{}),
  35. recentFailures: make(map[string][]time.Time),
  36. }
  37. for _, t := range tasks {
  38. nodeInfo.addTask(t)
  39. }
  40. return nodeInfo
  41. }
  42. // removeTask removes a task from nodeInfo if it's tracked there, and returns true
  43. // if nodeInfo was modified.
  44. func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
  45. oldTask, ok := nodeInfo.Tasks[t.ID]
  46. if !ok {
  47. return false
  48. }
  49. delete(nodeInfo.Tasks, t.ID)
  50. if oldTask.DesiredState <= api.TaskStateRunning {
  51. nodeInfo.ActiveTasksCount--
  52. nodeInfo.ActiveTasksCountByService[t.ServiceID]--
  53. }
  54. reservations := taskReservations(t.Spec)
  55. nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes
  56. nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs
  57. if t.Endpoint != nil {
  58. for _, port := range t.Endpoint.Ports {
  59. if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
  60. portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
  61. delete(nodeInfo.usedHostPorts, portSpec)
  62. }
  63. }
  64. }
  65. return true
  66. }
  67. // addTask adds or updates a task on nodeInfo, and returns true if nodeInfo was
  68. // modified.
  69. func (nodeInfo *NodeInfo) addTask(t *api.Task) bool {
  70. oldTask, ok := nodeInfo.Tasks[t.ID]
  71. if ok {
  72. if t.DesiredState <= api.TaskStateRunning && oldTask.DesiredState > api.TaskStateRunning {
  73. nodeInfo.Tasks[t.ID] = t
  74. nodeInfo.ActiveTasksCount++
  75. nodeInfo.ActiveTasksCountByService[t.ServiceID]++
  76. return true
  77. } else if t.DesiredState > api.TaskStateRunning && oldTask.DesiredState <= api.TaskStateRunning {
  78. nodeInfo.Tasks[t.ID] = t
  79. nodeInfo.ActiveTasksCount--
  80. nodeInfo.ActiveTasksCountByService[t.ServiceID]--
  81. return true
  82. }
  83. return false
  84. }
  85. nodeInfo.Tasks[t.ID] = t
  86. reservations := taskReservations(t.Spec)
  87. nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes
  88. nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs
  89. if t.Endpoint != nil {
  90. for _, port := range t.Endpoint.Ports {
  91. if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
  92. portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
  93. nodeInfo.usedHostPorts[portSpec] = struct{}{}
  94. }
  95. }
  96. }
  97. if t.DesiredState <= api.TaskStateRunning {
  98. nodeInfo.ActiveTasksCount++
  99. nodeInfo.ActiveTasksCountByService[t.ServiceID]++
  100. }
  101. return true
  102. }
  103. func taskReservations(spec api.TaskSpec) (reservations api.Resources) {
  104. if spec.Resources != nil && spec.Resources.Reservations != nil {
  105. reservations = *spec.Resources.Reservations
  106. }
  107. return
  108. }
  109. // taskFailed records a task failure from a given service.
  110. func (nodeInfo *NodeInfo) taskFailed(ctx context.Context, serviceID string) {
  111. expired := 0
  112. now := time.Now()
  113. for _, timestamp := range nodeInfo.recentFailures[serviceID] {
  114. if now.Sub(timestamp) < monitorFailures {
  115. break
  116. }
  117. expired++
  118. }
  119. if len(nodeInfo.recentFailures[serviceID])-expired == maxFailures-1 {
  120. log.G(ctx).Warnf("underweighting node %s for service %s because it experienced %d failures or rejections within %s", nodeInfo.ID, serviceID, maxFailures, monitorFailures.String())
  121. }
  122. nodeInfo.recentFailures[serviceID] = append(nodeInfo.recentFailures[serviceID][expired:], now)
  123. }
  124. // countRecentFailures returns the number of times the service has failed on
  125. // this node within the lookback window monitorFailures.
  126. func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, serviceID string) int {
  127. recentFailureCount := len(nodeInfo.recentFailures[serviceID])
  128. for i := recentFailureCount - 1; i >= 0; i-- {
  129. if now.Sub(nodeInfo.recentFailures[serviceID][i]) > monitorFailures {
  130. recentFailureCount -= i + 1
  131. break
  132. }
  133. }
  134. return recentFailureCount
  135. }