task.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package orchestrator
  2. import (
  3. "reflect"
  4. "time"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/api/defaults"
  7. "github.com/docker/swarmkit/identity"
  8. "github.com/docker/swarmkit/manager/constraint"
  9. "github.com/docker/swarmkit/protobuf/ptypes"
  10. google_protobuf "github.com/gogo/protobuf/types"
  11. )
  12. // NewTask creates a new task.
  13. func NewTask(cluster *api.Cluster, service *api.Service, slot uint64, nodeID string) *api.Task {
  14. var logDriver *api.Driver
  15. if service.Spec.Task.LogDriver != nil {
  16. // use the log driver specific to the task, if we have it.
  17. logDriver = service.Spec.Task.LogDriver
  18. } else if cluster != nil {
  19. // pick up the cluster default, if available.
  20. logDriver = cluster.Spec.TaskDefaults.LogDriver // nil is okay here.
  21. }
  22. taskID := identity.NewID()
  23. task := api.Task{
  24. ID: taskID,
  25. ServiceAnnotations: service.Spec.Annotations,
  26. Spec: service.Spec.Task,
  27. SpecVersion: service.SpecVersion,
  28. ServiceID: service.ID,
  29. Slot: slot,
  30. Status: api.TaskStatus{
  31. State: api.TaskStateNew,
  32. Timestamp: ptypes.MustTimestampProto(time.Now()),
  33. Message: "created",
  34. },
  35. Endpoint: &api.Endpoint{
  36. Spec: service.Spec.Endpoint.Copy(),
  37. },
  38. DesiredState: api.TaskStateRunning,
  39. LogDriver: logDriver,
  40. }
  41. // In global mode we also set the NodeID
  42. if nodeID != "" {
  43. task.NodeID = nodeID
  44. }
  45. return &task
  46. }
  47. // RestartCondition returns the restart condition to apply to this task.
  48. func RestartCondition(task *api.Task) api.RestartPolicy_RestartCondition {
  49. restartCondition := defaults.Service.Task.Restart.Condition
  50. if task.Spec.Restart != nil {
  51. restartCondition = task.Spec.Restart.Condition
  52. }
  53. return restartCondition
  54. }
  55. // IsTaskDirty determines whether a task matches the given service's spec and
  56. // if the given node satisfies the placement constraints.
  57. // Returns false if the spec version didn't change,
  58. // only the task placement constraints changed and the assigned node
  59. // satisfies the new constraints, or the service task spec and the endpoint spec
  60. // didn't change at all.
  61. // Returns true otherwise.
  62. // Note: for non-failed tasks with a container spec runtime that have already
  63. // pulled the required image (i.e., current state is between READY and
  64. // RUNNING inclusively), the value of the `PullOptions` is ignored.
  65. func IsTaskDirty(s *api.Service, t *api.Task, n *api.Node) bool {
  66. // If the spec version matches, we know the task is not dirty. However,
  67. // if it does not match, that doesn't mean the task is dirty, since
  68. // only a portion of the spec is included in the comparison.
  69. if t.SpecVersion != nil && s.SpecVersion != nil && *s.SpecVersion == *t.SpecVersion {
  70. return false
  71. }
  72. // Make a deep copy of the service and task spec for the comparison.
  73. serviceTaskSpec := *s.Spec.Task.Copy()
  74. // Task is not dirty if the placement constraints alone changed
  75. // and the node currently assigned can satisfy the changed constraints.
  76. if IsTaskDirtyPlacementConstraintsOnly(serviceTaskSpec, t) && nodeMatches(s, n) {
  77. return false
  78. }
  79. // For non-failed tasks with a container spec runtime that have already
  80. // pulled the required image (i.e., current state is between READY and
  81. // RUNNING inclusively), ignore the value of the `PullOptions` field by
  82. // setting the copied service to have the same PullOptions value as the
  83. // task. A difference in only the `PullOptions` field should not cause
  84. // a running (or ready to run) task to be considered 'dirty' when we
  85. // handle updates.
  86. // See https://github.com/docker/swarmkit/issues/971
  87. currentState := t.Status.State
  88. // Ignore PullOpts if the task is desired to be in a "runnable" state
  89. // and its last known current state is between READY and RUNNING in
  90. // which case we know that the task either successfully pulled its
  91. // container image or didn't need to.
  92. ignorePullOpts := t.DesiredState <= api.TaskStateRunning &&
  93. currentState >= api.TaskStateReady &&
  94. currentState <= api.TaskStateRunning
  95. if ignorePullOpts && serviceTaskSpec.GetContainer() != nil && t.Spec.GetContainer() != nil {
  96. // Modify the service's container spec.
  97. serviceTaskSpec.GetContainer().PullOptions = t.Spec.GetContainer().PullOptions
  98. }
  99. return !reflect.DeepEqual(serviceTaskSpec, t.Spec) ||
  100. (t.Endpoint != nil && !reflect.DeepEqual(s.Spec.Endpoint, t.Endpoint.Spec))
  101. }
  102. // Checks if the current assigned node matches the Placement.Constraints
  103. // specified in the task spec for Updater.newService.
  104. func nodeMatches(s *api.Service, n *api.Node) bool {
  105. if n == nil {
  106. return false
  107. }
  108. constraints, _ := constraint.Parse(s.Spec.Task.Placement.Constraints)
  109. return constraint.NodeMatches(constraints, n)
  110. }
  111. // IsTaskDirtyPlacementConstraintsOnly checks if the Placement field alone
  112. // in the spec has changed.
  113. func IsTaskDirtyPlacementConstraintsOnly(serviceTaskSpec api.TaskSpec, t *api.Task) bool {
  114. // Compare the task placement constraints.
  115. if reflect.DeepEqual(serviceTaskSpec.Placement, t.Spec.Placement) {
  116. return false
  117. }
  118. // Update spec placement to only the fields
  119. // other than the placement constraints in the spec.
  120. serviceTaskSpec.Placement = t.Spec.Placement
  121. return reflect.DeepEqual(serviceTaskSpec, t.Spec)
  122. }
  123. // InvalidNode is true if the node is nil, down, or drained
  124. func InvalidNode(n *api.Node) bool {
  125. return n == nil ||
  126. n.Status.State == api.NodeStatus_DOWN ||
  127. n.Spec.Availability == api.NodeAvailabilityDrain
  128. }
  129. func taskTimestamp(t *api.Task) *google_protobuf.Timestamp {
  130. if t.Status.AppliedAt != nil {
  131. return t.Status.AppliedAt
  132. }
  133. return t.Status.Timestamp
  134. }
  135. // TasksByTimestamp sorts tasks by applied timestamp if available, otherwise
  136. // status timestamp.
  137. type TasksByTimestamp []*api.Task
  138. // Len implements the Len method for sorting.
  139. func (t TasksByTimestamp) Len() int {
  140. return len(t)
  141. }
  142. // Swap implements the Swap method for sorting.
  143. func (t TasksByTimestamp) Swap(i, j int) {
  144. t[i], t[j] = t[j], t[i]
  145. }
  146. // Less implements the Less method for sorting.
  147. func (t TasksByTimestamp) Less(i, j int) bool {
  148. iTimestamp := taskTimestamp(t[i])
  149. jTimestamp := taskTimestamp(t[j])
  150. if iTimestamp == nil {
  151. return true
  152. }
  153. if jTimestamp == nil {
  154. return false
  155. }
  156. if iTimestamp.Seconds < jTimestamp.Seconds {
  157. return true
  158. }
  159. if iTimestamp.Seconds > jTimestamp.Seconds {
  160. return false
  161. }
  162. return iTimestamp.Nanos < jTimestamp.Nanos
  163. }