assignments.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/api/equality"
  6. "github.com/docker/swarmkit/api/validation"
  7. "github.com/docker/swarmkit/identity"
  8. "github.com/docker/swarmkit/manager/drivers"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. "github.com/sirupsen/logrus"
  11. )
  12. type typeAndID struct {
  13. id string
  14. objType api.ResourceType
  15. }
  16. type assignmentSet struct {
  17. dp *drivers.DriverProvider
  18. tasksMap map[string]*api.Task
  19. tasksUsingDependency map[typeAndID]map[string]struct{}
  20. changes map[typeAndID]*api.AssignmentChange
  21. log *logrus.Entry
  22. }
  23. func newAssignmentSet(log *logrus.Entry, dp *drivers.DriverProvider) *assignmentSet {
  24. return &assignmentSet{
  25. dp: dp,
  26. changes: make(map[typeAndID]*api.AssignmentChange),
  27. tasksMap: make(map[string]*api.Task),
  28. tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
  29. log: log,
  30. }
  31. }
  32. func assignSecret(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID, t *api.Task) {
  33. if _, exists := a.tasksUsingDependency[mapKey]; !exists {
  34. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  35. }
  36. secret, doNotReuse, err := a.secret(readTx, t, mapKey.id)
  37. if err != nil {
  38. a.log.WithFields(logrus.Fields{
  39. "resource.type": "secret",
  40. "secret.id": mapKey.id,
  41. "error": err,
  42. }).Debug("failed to fetch secret")
  43. return
  44. }
  45. // If the secret should not be reused for other tasks, give it a unique ID for the task to allow different values for different tasks.
  46. if doNotReuse {
  47. // Give the secret a new ID and mark it as internal
  48. originalSecretID := secret.ID
  49. taskSpecificID := identity.CombineTwoIDs(originalSecretID, t.ID)
  50. secret.ID = taskSpecificID
  51. secret.Internal = true
  52. // Create a new mapKey with the new ID and insert it into the dependencies map for the task.
  53. // This will make the changes map contain an entry with the new ID rather than the original one.
  54. mapKey = typeAndID{objType: mapKey.objType, id: secret.ID}
  55. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  56. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  57. }
  58. a.changes[mapKey] = &api.AssignmentChange{
  59. Assignment: &api.Assignment{
  60. Item: &api.Assignment_Secret{
  61. Secret: secret,
  62. },
  63. },
  64. Action: api.AssignmentChange_AssignmentActionUpdate,
  65. }
  66. }
  67. func assignConfig(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID) {
  68. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  69. config := store.GetConfig(readTx, mapKey.id)
  70. if config == nil {
  71. a.log.WithFields(logrus.Fields{
  72. "resource.type": "config",
  73. "config.id": mapKey.id,
  74. }).Debug("config not found")
  75. return
  76. }
  77. a.changes[mapKey] = &api.AssignmentChange{
  78. Assignment: &api.Assignment{
  79. Item: &api.Assignment_Config{
  80. Config: config,
  81. },
  82. },
  83. Action: api.AssignmentChange_AssignmentActionUpdate,
  84. }
  85. }
  86. func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
  87. for _, resourceRef := range t.Spec.ResourceReferences {
  88. mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
  89. if len(a.tasksUsingDependency[mapKey]) == 0 {
  90. switch resourceRef.ResourceType {
  91. case api.ResourceType_SECRET:
  92. assignSecret(a, readTx, mapKey, t)
  93. case api.ResourceType_CONFIG:
  94. assignConfig(a, readTx, mapKey)
  95. default:
  96. a.log.WithField(
  97. "resource.type", resourceRef.ResourceType,
  98. ).Debug("invalid resource type for a task dependency, skipping")
  99. continue
  100. }
  101. }
  102. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  103. }
  104. var secrets []*api.SecretReference
  105. container := t.Spec.GetContainer()
  106. if container != nil {
  107. secrets = container.Secrets
  108. }
  109. for _, secretRef := range secrets {
  110. secretID := secretRef.SecretID
  111. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}
  112. // This checks for the presence of each task in the dependency map for the
  113. // secret. This is currently only done for secrets since the other types of
  114. // dependencies do not support driver plugins. Arguably, the same task would
  115. // not have the same secret as a dependency more than once, but this check
  116. // makes sure the task only gets the secret assigned once.
  117. if _, exists := a.tasksUsingDependency[mapKey][t.ID]; !exists {
  118. assignSecret(a, readTx, mapKey, t)
  119. }
  120. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  121. }
  122. var configs []*api.ConfigReference
  123. if container != nil {
  124. configs = container.Configs
  125. }
  126. for _, configRef := range configs {
  127. configID := configRef.ConfigID
  128. mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}
  129. if len(a.tasksUsingDependency[mapKey]) == 0 {
  130. assignConfig(a, readTx, mapKey)
  131. }
  132. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  133. }
  134. }
  135. func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
  136. delete(a.tasksUsingDependency[mapKey], taskID)
  137. if len(a.tasksUsingDependency[mapKey]) != 0 {
  138. return false
  139. }
  140. // No tasks are using the dependency anymore
  141. delete(a.tasksUsingDependency, mapKey)
  142. a.changes[mapKey] = &api.AssignmentChange{
  143. Assignment: assignment,
  144. Action: api.AssignmentChange_AssignmentActionRemove,
  145. }
  146. return true
  147. }
  148. func (a *assignmentSet) releaseTaskDependencies(t *api.Task) bool {
  149. var modified bool
  150. for _, resourceRef := range t.Spec.ResourceReferences {
  151. var assignment *api.Assignment
  152. switch resourceRef.ResourceType {
  153. case api.ResourceType_SECRET:
  154. assignment = &api.Assignment{
  155. Item: &api.Assignment_Secret{
  156. Secret: &api.Secret{ID: resourceRef.ResourceID},
  157. },
  158. }
  159. case api.ResourceType_CONFIG:
  160. assignment = &api.Assignment{
  161. Item: &api.Assignment_Config{
  162. Config: &api.Config{ID: resourceRef.ResourceID},
  163. },
  164. }
  165. default:
  166. a.log.WithField(
  167. "resource.type", resourceRef.ResourceType,
  168. ).Debug("invalid resource type for a task dependency, skipping")
  169. continue
  170. }
  171. mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
  172. if a.releaseDependency(mapKey, assignment, t.ID) {
  173. modified = true
  174. }
  175. }
  176. container := t.Spec.GetContainer()
  177. var secrets []*api.SecretReference
  178. if container != nil {
  179. secrets = container.Secrets
  180. }
  181. for _, secretRef := range secrets {
  182. secretID := secretRef.SecretID
  183. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}
  184. assignment := &api.Assignment{
  185. Item: &api.Assignment_Secret{
  186. Secret: &api.Secret{ID: secretID},
  187. },
  188. }
  189. if a.releaseDependency(mapKey, assignment, t.ID) {
  190. modified = true
  191. }
  192. }
  193. var configs []*api.ConfigReference
  194. if container != nil {
  195. configs = container.Configs
  196. }
  197. for _, configRef := range configs {
  198. configID := configRef.ConfigID
  199. mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}
  200. assignment := &api.Assignment{
  201. Item: &api.Assignment_Config{
  202. Config: &api.Config{ID: configID},
  203. },
  204. }
  205. if a.releaseDependency(mapKey, assignment, t.ID) {
  206. modified = true
  207. }
  208. }
  209. return modified
  210. }
  211. func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
  212. // We only care about tasks that are ASSIGNED or higher.
  213. if t.Status.State < api.TaskStateAssigned {
  214. return false
  215. }
  216. if oldTask, exists := a.tasksMap[t.ID]; exists {
  217. // States ASSIGNED and below are set by the orchestrator/scheduler,
  218. // not the agent, so tasks in these states need to be sent to the
  219. // agent even if nothing else has changed.
  220. if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
  221. // this update should not trigger a task change for the agent
  222. a.tasksMap[t.ID] = t
  223. // If this task got updated to a final state, let's release
  224. // the dependencies that are being used by the task
  225. if t.Status.State > api.TaskStateRunning {
  226. // If releasing the dependencies caused us to
  227. // remove something from the assignment set,
  228. // mark one modification.
  229. return a.releaseTaskDependencies(t)
  230. }
  231. return false
  232. }
  233. } else if t.Status.State <= api.TaskStateRunning {
  234. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  235. // add the dependencies it references to the assignment.
  236. // Task states > RUNNING are worker reported only, are never created in
  237. // a > RUNNING state.
  238. a.addTaskDependencies(readTx, t)
  239. }
  240. a.tasksMap[t.ID] = t
  241. a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
  242. Assignment: &api.Assignment{
  243. Item: &api.Assignment_Task{
  244. Task: t,
  245. },
  246. },
  247. Action: api.AssignmentChange_AssignmentActionUpdate,
  248. }
  249. return true
  250. }
  251. func (a *assignmentSet) removeTask(t *api.Task) bool {
  252. if _, exists := a.tasksMap[t.ID]; !exists {
  253. return false
  254. }
  255. a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
  256. Assignment: &api.Assignment{
  257. Item: &api.Assignment_Task{
  258. Task: &api.Task{ID: t.ID},
  259. },
  260. },
  261. Action: api.AssignmentChange_AssignmentActionRemove,
  262. }
  263. delete(a.tasksMap, t.ID)
  264. // Release the dependencies being used by this task.
  265. // Ignoring the return here. We will always mark this as a
  266. // modification, since a task is being removed.
  267. a.releaseTaskDependencies(t)
  268. return true
  269. }
  270. func (a *assignmentSet) message() api.AssignmentsMessage {
  271. var message api.AssignmentsMessage
  272. for _, change := range a.changes {
  273. message.Changes = append(message.Changes, change)
  274. }
  275. // The the set of changes is reinitialized to prepare for formation
  276. // of the next message.
  277. a.changes = make(map[typeAndID]*api.AssignmentChange)
  278. return message
  279. }
  280. // secret populates the secret value from raft store. For external secrets, the value is populated
  281. // from the secret driver. The function returns: a secret object; an indication of whether the value
  282. // is to be reused across tasks; and an error if the secret is not found in the store, if the secret
  283. // driver responds with one or if the payload does not pass validation.
  284. func (a *assignmentSet) secret(readTx store.ReadTx, task *api.Task, secretID string) (*api.Secret, bool, error) {
  285. secret := store.GetSecret(readTx, secretID)
  286. if secret == nil {
  287. return nil, false, fmt.Errorf("secret not found")
  288. }
  289. if secret.Spec.Driver == nil {
  290. return secret, false, nil
  291. }
  292. d, err := a.dp.NewSecretDriver(secret.Spec.Driver)
  293. if err != nil {
  294. return nil, false, err
  295. }
  296. value, doNotReuse, err := d.Get(&secret.Spec, task)
  297. if err != nil {
  298. return nil, false, err
  299. }
  300. if err := validation.ValidateSecretPayload(value); err != nil {
  301. return nil, false, err
  302. }
  303. // Assign the secret
  304. secret.Spec.Data = value
  305. return secret, doNotReuse, nil
  306. }