assignments.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package dispatcher
  2. import (
  3. "github.com/Sirupsen/logrus"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/api/equality"
  6. "github.com/docker/swarmkit/manager/state/store"
  7. )
  8. // Used as a key in tasksUsingDependency and changes. Only using the
  9. // ID could cause (rare) collisions between different types of
  10. // objects, so we also include the type of object in the key.
  11. type objectType int
  12. const (
  13. typeTask objectType = iota
  14. typeSecret
  15. typeConfig
  16. )
  17. type typeAndID struct {
  18. id string
  19. objType objectType
  20. }
  21. type assignmentSet struct {
  22. tasksMap map[string]*api.Task
  23. tasksUsingDependency map[typeAndID]map[string]struct{}
  24. changes map[typeAndID]*api.AssignmentChange
  25. log *logrus.Entry
  26. }
  27. func newAssignmentSet(log *logrus.Entry) *assignmentSet {
  28. return &assignmentSet{
  29. changes: make(map[typeAndID]*api.AssignmentChange),
  30. tasksMap: make(map[string]*api.Task),
  31. tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
  32. log: log,
  33. }
  34. }
  35. func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
  36. var secrets []*api.SecretReference
  37. container := t.Spec.GetContainer()
  38. if container != nil {
  39. secrets = container.Secrets
  40. }
  41. for _, secretRef := range secrets {
  42. secretID := secretRef.SecretID
  43. mapKey := typeAndID{objType: typeSecret, id: secretID}
  44. if len(a.tasksUsingDependency[mapKey]) == 0 {
  45. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  46. secret := store.GetSecret(readTx, secretID)
  47. if secret == nil {
  48. a.log.WithFields(logrus.Fields{
  49. "secret.id": secretID,
  50. "secret.name": secretRef.SecretName,
  51. }).Debug("secret not found")
  52. continue
  53. }
  54. // If the secret was found, add this secret to
  55. // our set that we send down.
  56. a.changes[mapKey] = &api.AssignmentChange{
  57. Assignment: &api.Assignment{
  58. Item: &api.Assignment_Secret{
  59. Secret: secret,
  60. },
  61. },
  62. Action: api.AssignmentChange_AssignmentActionUpdate,
  63. }
  64. }
  65. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  66. }
  67. var configs []*api.ConfigReference
  68. if container != nil {
  69. configs = container.Configs
  70. }
  71. for _, configRef := range configs {
  72. configID := configRef.ConfigID
  73. mapKey := typeAndID{objType: typeConfig, id: configID}
  74. if len(a.tasksUsingDependency[mapKey]) == 0 {
  75. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  76. config := store.GetConfig(readTx, configID)
  77. if config == nil {
  78. a.log.WithFields(logrus.Fields{
  79. "config.id": configID,
  80. "config.name": configRef.ConfigName,
  81. }).Debug("config not found")
  82. continue
  83. }
  84. // If the config was found, add this config to
  85. // our set that we send down.
  86. a.changes[mapKey] = &api.AssignmentChange{
  87. Assignment: &api.Assignment{
  88. Item: &api.Assignment_Config{
  89. Config: config,
  90. },
  91. },
  92. Action: api.AssignmentChange_AssignmentActionUpdate,
  93. }
  94. }
  95. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  96. }
  97. }
  98. func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
  99. delete(a.tasksUsingDependency[mapKey], taskID)
  100. if len(a.tasksUsingDependency[mapKey]) != 0 {
  101. return false
  102. }
  103. // No tasks are using the dependency anymore
  104. delete(a.tasksUsingDependency, mapKey)
  105. a.changes[mapKey] = &api.AssignmentChange{
  106. Assignment: assignment,
  107. Action: api.AssignmentChange_AssignmentActionRemove,
  108. }
  109. return true
  110. }
  111. func (a *assignmentSet) releaseTaskDependencies(t *api.Task) bool {
  112. var modified bool
  113. container := t.Spec.GetContainer()
  114. var secrets []*api.SecretReference
  115. if container != nil {
  116. secrets = container.Secrets
  117. }
  118. for _, secretRef := range secrets {
  119. secretID := secretRef.SecretID
  120. mapKey := typeAndID{objType: typeSecret, id: secretID}
  121. assignment := &api.Assignment{
  122. Item: &api.Assignment_Secret{
  123. Secret: &api.Secret{ID: secretID},
  124. },
  125. }
  126. if a.releaseDependency(mapKey, assignment, t.ID) {
  127. modified = true
  128. }
  129. }
  130. var configs []*api.ConfigReference
  131. if container != nil {
  132. configs = container.Configs
  133. }
  134. for _, configRef := range configs {
  135. configID := configRef.ConfigID
  136. mapKey := typeAndID{objType: typeConfig, id: configID}
  137. assignment := &api.Assignment{
  138. Item: &api.Assignment_Config{
  139. Config: &api.Config{ID: configID},
  140. },
  141. }
  142. if a.releaseDependency(mapKey, assignment, t.ID) {
  143. modified = true
  144. }
  145. }
  146. return modified
  147. }
  148. func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
  149. // We only care about tasks that are ASSIGNED or higher.
  150. if t.Status.State < api.TaskStateAssigned {
  151. return false
  152. }
  153. if oldTask, exists := a.tasksMap[t.ID]; exists {
  154. // States ASSIGNED and below are set by the orchestrator/scheduler,
  155. // not the agent, so tasks in these states need to be sent to the
  156. // agent even if nothing else has changed.
  157. if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
  158. // this update should not trigger a task change for the agent
  159. a.tasksMap[t.ID] = t
  160. // If this task got updated to a final state, let's release
  161. // the dependencies that are being used by the task
  162. if t.Status.State > api.TaskStateRunning {
  163. // If releasing the dependencies caused us to
  164. // remove something from the assignment set,
  165. // mark one modification.
  166. return a.releaseTaskDependencies(t)
  167. }
  168. return false
  169. }
  170. } else if t.Status.State <= api.TaskStateRunning {
  171. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  172. // add the dependencies it references to the assignment.
  173. // Task states > RUNNING are worker reported only, are never created in
  174. // a > RUNNING state.
  175. a.addTaskDependencies(readTx, t)
  176. }
  177. a.tasksMap[t.ID] = t
  178. a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
  179. Assignment: &api.Assignment{
  180. Item: &api.Assignment_Task{
  181. Task: t,
  182. },
  183. },
  184. Action: api.AssignmentChange_AssignmentActionUpdate,
  185. }
  186. return true
  187. }
  188. func (a *assignmentSet) removeTask(t *api.Task) bool {
  189. if _, exists := a.tasksMap[t.ID]; !exists {
  190. return false
  191. }
  192. a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
  193. Assignment: &api.Assignment{
  194. Item: &api.Assignment_Task{
  195. Task: &api.Task{ID: t.ID},
  196. },
  197. },
  198. Action: api.AssignmentChange_AssignmentActionRemove,
  199. }
  200. delete(a.tasksMap, t.ID)
  201. // Release the dependencies being used by this task.
  202. // Ignoring the return here. We will always mark this as a
  203. // modification, since a task is being removed.
  204. a.releaseTaskDependencies(t)
  205. return true
  206. }
  207. func (a *assignmentSet) message() api.AssignmentsMessage {
  208. var message api.AssignmentsMessage
  209. for _, change := range a.changes {
  210. message.Changes = append(message.Changes, change)
  211. }
  212. // The the set of changes is reinitialized to prepare for formation
  213. // of the next message.
  214. a.changes = make(map[typeAndID]*api.AssignmentChange)
  215. return message
  216. }