assignments.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "github.com/Sirupsen/logrus"
  5. "github.com/docker/swarmkit/api"
  6. "github.com/docker/swarmkit/api/equality"
  7. "github.com/docker/swarmkit/api/validation"
  8. "github.com/docker/swarmkit/manager/drivers"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. )
  11. // Used as a key in tasksUsingDependency and changes. Only using the
  12. // ID could cause (rare) collisions between different types of
  13. // objects, so we also include the type of object in the key.
  14. type objectType int
  15. const (
  16. typeTask objectType = iota
  17. typeSecret
  18. typeConfig
  19. )
  20. type typeAndID struct {
  21. id string
  22. objType objectType
  23. }
  24. type assignmentSet struct {
  25. dp *drivers.DriverProvider
  26. tasksMap map[string]*api.Task
  27. tasksUsingDependency map[typeAndID]map[string]struct{}
  28. changes map[typeAndID]*api.AssignmentChange
  29. log *logrus.Entry
  30. }
  31. func newAssignmentSet(log *logrus.Entry, dp *drivers.DriverProvider) *assignmentSet {
  32. return &assignmentSet{
  33. dp: dp,
  34. changes: make(map[typeAndID]*api.AssignmentChange),
  35. tasksMap: make(map[string]*api.Task),
  36. tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
  37. log: log,
  38. }
  39. }
  40. func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
  41. var secrets []*api.SecretReference
  42. container := t.Spec.GetContainer()
  43. if container != nil {
  44. secrets = container.Secrets
  45. }
  46. for _, secretRef := range secrets {
  47. secretID := secretRef.SecretID
  48. mapKey := typeAndID{objType: typeSecret, id: secretID}
  49. if len(a.tasksUsingDependency[mapKey]) == 0 {
  50. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  51. secret, err := a.secret(readTx, secretID)
  52. if err != nil {
  53. a.log.WithFields(logrus.Fields{
  54. "secret.id": secretID,
  55. "secret.name": secretRef.SecretName,
  56. "error": err,
  57. }).Error("failed to fetch secret")
  58. continue
  59. }
  60. // If the secret was found, add this secret to
  61. // our set that we send down.
  62. a.changes[mapKey] = &api.AssignmentChange{
  63. Assignment: &api.Assignment{
  64. Item: &api.Assignment_Secret{
  65. Secret: secret,
  66. },
  67. },
  68. Action: api.AssignmentChange_AssignmentActionUpdate,
  69. }
  70. }
  71. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  72. }
  73. var configs []*api.ConfigReference
  74. if container != nil {
  75. configs = container.Configs
  76. }
  77. for _, configRef := range configs {
  78. configID := configRef.ConfigID
  79. mapKey := typeAndID{objType: typeConfig, id: configID}
  80. if len(a.tasksUsingDependency[mapKey]) == 0 {
  81. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  82. config := store.GetConfig(readTx, configID)
  83. if config == nil {
  84. a.log.WithFields(logrus.Fields{
  85. "config.id": configID,
  86. "config.name": configRef.ConfigName,
  87. }).Debug("config not found")
  88. continue
  89. }
  90. // If the config was found, add this config to
  91. // our set that we send down.
  92. a.changes[mapKey] = &api.AssignmentChange{
  93. Assignment: &api.Assignment{
  94. Item: &api.Assignment_Config{
  95. Config: config,
  96. },
  97. },
  98. Action: api.AssignmentChange_AssignmentActionUpdate,
  99. }
  100. }
  101. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  102. }
  103. }
  104. func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
  105. delete(a.tasksUsingDependency[mapKey], taskID)
  106. if len(a.tasksUsingDependency[mapKey]) != 0 {
  107. return false
  108. }
  109. // No tasks are using the dependency anymore
  110. delete(a.tasksUsingDependency, mapKey)
  111. a.changes[mapKey] = &api.AssignmentChange{
  112. Assignment: assignment,
  113. Action: api.AssignmentChange_AssignmentActionRemove,
  114. }
  115. return true
  116. }
  117. func (a *assignmentSet) releaseTaskDependencies(t *api.Task) bool {
  118. var modified bool
  119. container := t.Spec.GetContainer()
  120. var secrets []*api.SecretReference
  121. if container != nil {
  122. secrets = container.Secrets
  123. }
  124. for _, secretRef := range secrets {
  125. secretID := secretRef.SecretID
  126. mapKey := typeAndID{objType: typeSecret, id: secretID}
  127. assignment := &api.Assignment{
  128. Item: &api.Assignment_Secret{
  129. Secret: &api.Secret{ID: secretID},
  130. },
  131. }
  132. if a.releaseDependency(mapKey, assignment, t.ID) {
  133. modified = true
  134. }
  135. }
  136. var configs []*api.ConfigReference
  137. if container != nil {
  138. configs = container.Configs
  139. }
  140. for _, configRef := range configs {
  141. configID := configRef.ConfigID
  142. mapKey := typeAndID{objType: typeConfig, id: configID}
  143. assignment := &api.Assignment{
  144. Item: &api.Assignment_Config{
  145. Config: &api.Config{ID: configID},
  146. },
  147. }
  148. if a.releaseDependency(mapKey, assignment, t.ID) {
  149. modified = true
  150. }
  151. }
  152. return modified
  153. }
  154. func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
  155. // We only care about tasks that are ASSIGNED or higher.
  156. if t.Status.State < api.TaskStateAssigned {
  157. return false
  158. }
  159. if oldTask, exists := a.tasksMap[t.ID]; exists {
  160. // States ASSIGNED and below are set by the orchestrator/scheduler,
  161. // not the agent, so tasks in these states need to be sent to the
  162. // agent even if nothing else has changed.
  163. if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
  164. // this update should not trigger a task change for the agent
  165. a.tasksMap[t.ID] = t
  166. // If this task got updated to a final state, let's release
  167. // the dependencies that are being used by the task
  168. if t.Status.State > api.TaskStateRunning {
  169. // If releasing the dependencies caused us to
  170. // remove something from the assignment set,
  171. // mark one modification.
  172. return a.releaseTaskDependencies(t)
  173. }
  174. return false
  175. }
  176. } else if t.Status.State <= api.TaskStateRunning {
  177. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  178. // add the dependencies it references to the assignment.
  179. // Task states > RUNNING are worker reported only, are never created in
  180. // a > RUNNING state.
  181. a.addTaskDependencies(readTx, t)
  182. }
  183. a.tasksMap[t.ID] = t
  184. a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
  185. Assignment: &api.Assignment{
  186. Item: &api.Assignment_Task{
  187. Task: t,
  188. },
  189. },
  190. Action: api.AssignmentChange_AssignmentActionUpdate,
  191. }
  192. return true
  193. }
  194. func (a *assignmentSet) removeTask(t *api.Task) bool {
  195. if _, exists := a.tasksMap[t.ID]; !exists {
  196. return false
  197. }
  198. a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
  199. Assignment: &api.Assignment{
  200. Item: &api.Assignment_Task{
  201. Task: &api.Task{ID: t.ID},
  202. },
  203. },
  204. Action: api.AssignmentChange_AssignmentActionRemove,
  205. }
  206. delete(a.tasksMap, t.ID)
  207. // Release the dependencies being used by this task.
  208. // Ignoring the return here. We will always mark this as a
  209. // modification, since a task is being removed.
  210. a.releaseTaskDependencies(t)
  211. return true
  212. }
  213. func (a *assignmentSet) message() api.AssignmentsMessage {
  214. var message api.AssignmentsMessage
  215. for _, change := range a.changes {
  216. message.Changes = append(message.Changes, change)
  217. }
  218. // The the set of changes is reinitialized to prepare for formation
  219. // of the next message.
  220. a.changes = make(map[typeAndID]*api.AssignmentChange)
  221. return message
  222. }
  223. // secret populates the secret value from raft store. For external secrets, the value is populated
  224. // from the secret driver.
  225. func (a *assignmentSet) secret(readTx store.ReadTx, secretID string) (*api.Secret, error) {
  226. secret := store.GetSecret(readTx, secretID)
  227. if secret == nil {
  228. return nil, fmt.Errorf("secret not found")
  229. }
  230. if secret.Spec.Driver == nil {
  231. return secret, nil
  232. }
  233. d, err := a.dp.NewSecretDriver(secret.Spec.Driver)
  234. if err != nil {
  235. return nil, err
  236. }
  237. value, err := d.Get(&secret.Spec)
  238. if err != nil {
  239. return nil, err
  240. }
  241. if err := validation.ValidateSecretPayload(value); err != nil {
  242. return nil, err
  243. }
  244. // Assign the secret
  245. secret.Spec.Data = value
  246. return secret, nil
  247. }