assignments.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. package dispatcher
  2. import (
  3. "fmt"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/docker/swarmkit/api/equality"
  6. "github.com/docker/swarmkit/api/validation"
  7. "github.com/docker/swarmkit/identity"
  8. "github.com/docker/swarmkit/manager/drivers"
  9. "github.com/docker/swarmkit/manager/state/store"
  10. "github.com/sirupsen/logrus"
  11. )
  12. type typeAndID struct {
  13. id string
  14. objType api.ResourceType
  15. }
  16. type assignmentSet struct {
  17. nodeID string
  18. dp *drivers.DriverProvider
  19. tasksMap map[string]*api.Task
  20. // volumesMap keeps track of the VolumePublishStatus of the given volumes.
  21. // this tells us both which volumes are assigned to the node, and what the
  22. // last known VolumePublishStatus was, so we can understand if we need to
  23. // send an update.
  24. volumesMap map[string]*api.VolumePublishStatus
  25. // tasksUsingDependency tracks both tasks and volumes using a given
  26. // dependency. this works because the ID generated for swarm comes from a
  27. // large enough space that it is reliably astronomically unlikely that IDs
  28. // will ever collide.
  29. tasksUsingDependency map[typeAndID]map[string]struct{}
  30. changes map[typeAndID]*api.AssignmentChange
  31. log *logrus.Entry
  32. }
  33. func newAssignmentSet(nodeID string, log *logrus.Entry, dp *drivers.DriverProvider) *assignmentSet {
  34. return &assignmentSet{
  35. nodeID: nodeID,
  36. dp: dp,
  37. changes: make(map[typeAndID]*api.AssignmentChange),
  38. tasksMap: make(map[string]*api.Task),
  39. volumesMap: make(map[string]*api.VolumePublishStatus),
  40. tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
  41. log: log,
  42. }
  43. }
  44. func assignSecret(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID, t *api.Task) {
  45. if _, exists := a.tasksUsingDependency[mapKey]; !exists {
  46. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  47. }
  48. secret, doNotReuse, err := a.secret(readTx, t, mapKey.id)
  49. if err != nil {
  50. a.log.WithFields(logrus.Fields{
  51. "resource.type": "secret",
  52. "secret.id": mapKey.id,
  53. "error": err,
  54. }).Debug("failed to fetch secret")
  55. return
  56. }
  57. // If the secret should not be reused for other tasks, give it a unique ID
  58. // for the task to allow different values for different tasks.
  59. if doNotReuse {
  60. // Give the secret a new ID and mark it as internal
  61. originalSecretID := secret.ID
  62. taskSpecificID := identity.CombineTwoIDs(originalSecretID, t.ID)
  63. secret.ID = taskSpecificID
  64. secret.Internal = true
  65. // Create a new mapKey with the new ID and insert it into the
  66. // dependencies map for the task. This will make the changes map
  67. // contain an entry with the new ID rather than the original one.
  68. mapKey = typeAndID{objType: mapKey.objType, id: secret.ID}
  69. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  70. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  71. }
  72. a.changes[mapKey] = &api.AssignmentChange{
  73. Assignment: &api.Assignment{
  74. Item: &api.Assignment_Secret{
  75. Secret: secret,
  76. },
  77. },
  78. Action: api.AssignmentChange_AssignmentActionUpdate,
  79. }
  80. }
  81. func assignConfig(a *assignmentSet, readTx store.ReadTx, mapKey typeAndID) {
  82. a.tasksUsingDependency[mapKey] = make(map[string]struct{})
  83. config := store.GetConfig(readTx, mapKey.id)
  84. if config == nil {
  85. a.log.WithFields(logrus.Fields{
  86. "resource.type": "config",
  87. "config.id": mapKey.id,
  88. }).Debug("config not found")
  89. return
  90. }
  91. a.changes[mapKey] = &api.AssignmentChange{
  92. Assignment: &api.Assignment{
  93. Item: &api.Assignment_Config{
  94. Config: config,
  95. },
  96. },
  97. Action: api.AssignmentChange_AssignmentActionUpdate,
  98. }
  99. }
  100. func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
  101. // first, we go through all ResourceReferences, which give us the necessary
  102. // information about which secrets and configs are in use.
  103. for _, resourceRef := range t.Spec.ResourceReferences {
  104. mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
  105. // if there are no tasks using this dependency yet, then we can assign
  106. // it.
  107. if len(a.tasksUsingDependency[mapKey]) == 0 {
  108. switch resourceRef.ResourceType {
  109. case api.ResourceType_SECRET:
  110. assignSecret(a, readTx, mapKey, t)
  111. case api.ResourceType_CONFIG:
  112. assignConfig(a, readTx, mapKey)
  113. default:
  114. a.log.WithField(
  115. "resource.type", resourceRef.ResourceType,
  116. ).Debug("invalid resource type for a task dependency, skipping")
  117. continue
  118. }
  119. }
  120. // otherwise, we don't need to add a new assignment. we just need to
  121. // track the fact that another task is now using this dependency.
  122. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  123. }
  124. var secrets []*api.SecretReference
  125. container := t.Spec.GetContainer()
  126. if container != nil {
  127. secrets = container.Secrets
  128. }
  129. for _, secretRef := range secrets {
  130. secretID := secretRef.SecretID
  131. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}
  132. // This checks for the presence of each task in the dependency map for the
  133. // secret. This is currently only done for secrets since the other types of
  134. // dependencies do not support driver plugins. Arguably, the same task would
  135. // not have the same secret as a dependency more than once, but this check
  136. // makes sure the task only gets the secret assigned once.
  137. if _, exists := a.tasksUsingDependency[mapKey][t.ID]; !exists {
  138. assignSecret(a, readTx, mapKey, t)
  139. }
  140. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  141. }
  142. var configs []*api.ConfigReference
  143. if container != nil {
  144. configs = container.Configs
  145. }
  146. for _, configRef := range configs {
  147. configID := configRef.ConfigID
  148. mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}
  149. if len(a.tasksUsingDependency[mapKey]) == 0 {
  150. assignConfig(a, readTx, mapKey)
  151. }
  152. a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
  153. }
  154. }
  155. func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
  156. delete(a.tasksUsingDependency[mapKey], taskID)
  157. if len(a.tasksUsingDependency[mapKey]) != 0 {
  158. return false
  159. }
  160. // No tasks are using the dependency anymore
  161. delete(a.tasksUsingDependency, mapKey)
  162. a.changes[mapKey] = &api.AssignmentChange{
  163. Assignment: assignment,
  164. Action: api.AssignmentChange_AssignmentActionRemove,
  165. }
  166. return true
  167. }
  168. // releaseTaskDependencies needs a store transaction because volumes have
  169. // associated Secrets which need to be released.
  170. func (a *assignmentSet) releaseTaskDependencies(readTx store.ReadTx, t *api.Task) bool {
  171. var modified bool
  172. for _, resourceRef := range t.Spec.ResourceReferences {
  173. var assignment *api.Assignment
  174. switch resourceRef.ResourceType {
  175. case api.ResourceType_SECRET:
  176. assignment = &api.Assignment{
  177. Item: &api.Assignment_Secret{
  178. Secret: &api.Secret{ID: resourceRef.ResourceID},
  179. },
  180. }
  181. case api.ResourceType_CONFIG:
  182. assignment = &api.Assignment{
  183. Item: &api.Assignment_Config{
  184. Config: &api.Config{ID: resourceRef.ResourceID},
  185. },
  186. }
  187. default:
  188. a.log.WithField(
  189. "resource.type", resourceRef.ResourceType,
  190. ).Debug("invalid resource type for a task dependency, skipping")
  191. continue
  192. }
  193. mapKey := typeAndID{objType: resourceRef.ResourceType, id: resourceRef.ResourceID}
  194. if a.releaseDependency(mapKey, assignment, t.ID) {
  195. modified = true
  196. }
  197. }
  198. container := t.Spec.GetContainer()
  199. var secrets []*api.SecretReference
  200. if container != nil {
  201. secrets = container.Secrets
  202. }
  203. for _, secretRef := range secrets {
  204. secretID := secretRef.SecretID
  205. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secretID}
  206. assignment := &api.Assignment{
  207. Item: &api.Assignment_Secret{
  208. Secret: &api.Secret{ID: secretID},
  209. },
  210. }
  211. if a.releaseDependency(mapKey, assignment, t.ID) {
  212. modified = true
  213. }
  214. }
  215. var configs []*api.ConfigReference
  216. if container != nil {
  217. configs = container.Configs
  218. }
  219. for _, configRef := range configs {
  220. configID := configRef.ConfigID
  221. mapKey := typeAndID{objType: api.ResourceType_CONFIG, id: configID}
  222. assignment := &api.Assignment{
  223. Item: &api.Assignment_Config{
  224. Config: &api.Config{ID: configID},
  225. },
  226. }
  227. if a.releaseDependency(mapKey, assignment, t.ID) {
  228. modified = true
  229. }
  230. }
  231. return modified
  232. }
  233. func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
  234. // We only care about tasks that are ASSIGNED or higher.
  235. if t.Status.State < api.TaskStateAssigned {
  236. return false
  237. }
  238. if oldTask, exists := a.tasksMap[t.ID]; exists {
  239. // States ASSIGNED and below are set by the orchestrator/scheduler,
  240. // not the agent, so tasks in these states need to be sent to the
  241. // agent even if nothing else has changed.
  242. if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
  243. // this update should not trigger a task change for the agent
  244. a.tasksMap[t.ID] = t
  245. // If this task got updated to a final state, let's release
  246. // the dependencies that are being used by the task
  247. if t.Status.State > api.TaskStateRunning {
  248. // If releasing the dependencies caused us to
  249. // remove something from the assignment set,
  250. // mark one modification.
  251. return a.releaseTaskDependencies(readTx, t)
  252. }
  253. return false
  254. }
  255. } else if t.Status.State <= api.TaskStateRunning {
  256. // If this task wasn't part of the assignment set before, and it's <= RUNNING
  257. // add the dependencies it references to the assignment.
  258. // Task states > RUNNING are worker reported only, are never created in
  259. // a > RUNNING state.
  260. a.addTaskDependencies(readTx, t)
  261. }
  262. a.tasksMap[t.ID] = t
  263. a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
  264. Assignment: &api.Assignment{
  265. Item: &api.Assignment_Task{
  266. Task: t,
  267. },
  268. },
  269. Action: api.AssignmentChange_AssignmentActionUpdate,
  270. }
  271. return true
  272. }
  273. // addOrUpdateVolume tracks a Volume assigned to a node.
  274. func (a *assignmentSet) addOrUpdateVolume(readTx store.ReadTx, v *api.Volume) bool {
  275. var publishStatus *api.VolumePublishStatus
  276. for _, status := range v.PublishStatus {
  277. if status.NodeID == a.nodeID {
  278. publishStatus = status
  279. break
  280. }
  281. }
  282. // if there is no publishStatus for this Volume on this Node, or if the
  283. // Volume has not yet been published to this node, then we do not need to
  284. // track this assignment.
  285. if publishStatus == nil || publishStatus.State < api.VolumePublishStatus_PUBLISHED {
  286. return false
  287. }
  288. // check if we are already tracking this volume, and what its old status
  289. // is. if the states are identical, then we don't have any update to make.
  290. if oldStatus, ok := a.volumesMap[v.ID]; ok && oldStatus.State == publishStatus.State {
  291. return false
  292. }
  293. // if the volume has already been confirmed as unpublished, we can stop
  294. // tracking it and remove its dependencies.
  295. if publishStatus.State > api.VolumePublishStatus_PENDING_NODE_UNPUBLISH {
  296. return a.removeVolume(readTx, v)
  297. }
  298. for _, secret := range v.Spec.Secrets {
  299. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secret.Secret}
  300. if len(a.tasksUsingDependency[mapKey]) == 0 {
  301. // we can call assignSecret with task being nil, but it does mean
  302. // that any secret that uses a driver will not work. we'll call
  303. // that a limitation of volumes for now.
  304. assignSecret(a, readTx, mapKey, nil)
  305. }
  306. a.tasksUsingDependency[mapKey][v.ID] = struct{}{}
  307. }
  308. // volumes are sent to nodes as VolumeAssignments. This is because a node
  309. // needs node-specific information (the PublishContext from
  310. // ControllerPublishVolume).
  311. assignment := &api.VolumeAssignment{
  312. ID: v.ID,
  313. VolumeID: v.VolumeInfo.VolumeID,
  314. Driver: v.Spec.Driver,
  315. VolumeContext: v.VolumeInfo.VolumeContext,
  316. PublishContext: publishStatus.PublishContext,
  317. AccessMode: v.Spec.AccessMode,
  318. Secrets: v.Spec.Secrets,
  319. }
  320. volumeKey := typeAndID{objType: api.ResourceType_VOLUME, id: v.ID}
  321. // assignmentChange is the whole assignment without the action, which we
  322. // will set next
  323. assignmentChange := &api.AssignmentChange{
  324. Assignment: &api.Assignment{
  325. Item: &api.Assignment_Volume{
  326. Volume: assignment,
  327. },
  328. },
  329. }
  330. // if we're in state PENDING_NODE_UNPUBLISH, we actually need to send a
  331. // remove message. we do this every time, even if the node never got the
  332. // first add assignment. This is because the node might not know that it
  333. // has a volume published; for example, the node may be restarting, and
  334. // the in-memory store does not have knowledge of the volume.
  335. if publishStatus.State == api.VolumePublishStatus_PENDING_NODE_UNPUBLISH {
  336. assignmentChange.Action = api.AssignmentChange_AssignmentActionRemove
  337. } else {
  338. assignmentChange.Action = api.AssignmentChange_AssignmentActionUpdate
  339. }
  340. a.changes[volumeKey] = assignmentChange
  341. a.volumesMap[v.ID] = publishStatus
  342. return true
  343. }
  344. func (a *assignmentSet) removeVolume(readTx store.ReadTx, v *api.Volume) bool {
  345. if _, exists := a.volumesMap[v.ID]; !exists {
  346. return false
  347. }
  348. modified := false
  349. // if the volume does exists, we can release its secrets
  350. for _, secret := range v.Spec.Secrets {
  351. mapKey := typeAndID{objType: api.ResourceType_SECRET, id: secret.Secret}
  352. assignment := &api.Assignment{
  353. Item: &api.Assignment_Secret{
  354. Secret: &api.Secret{ID: secret.Secret},
  355. },
  356. }
  357. if a.releaseDependency(mapKey, assignment, v.ID) {
  358. modified = true
  359. }
  360. }
  361. // we don't need to add a removal message. the removal of the
  362. // VolumeAssignment will have already happened.
  363. delete(a.volumesMap, v.ID)
  364. return modified
  365. }
  366. func (a *assignmentSet) removeTask(readTx store.ReadTx, t *api.Task) bool {
  367. if _, exists := a.tasksMap[t.ID]; !exists {
  368. return false
  369. }
  370. a.changes[typeAndID{objType: api.ResourceType_TASK, id: t.ID}] = &api.AssignmentChange{
  371. Assignment: &api.Assignment{
  372. Item: &api.Assignment_Task{
  373. Task: &api.Task{ID: t.ID},
  374. },
  375. },
  376. Action: api.AssignmentChange_AssignmentActionRemove,
  377. }
  378. delete(a.tasksMap, t.ID)
  379. // Release the dependencies being used by this task.
  380. // Ignoring the return here. We will always mark this as a
  381. // modification, since a task is being removed.
  382. a.releaseTaskDependencies(readTx, t)
  383. return true
  384. }
  385. func (a *assignmentSet) message() api.AssignmentsMessage {
  386. var message api.AssignmentsMessage
  387. for _, change := range a.changes {
  388. message.Changes = append(message.Changes, change)
  389. }
  390. // The the set of changes is reinitialized to prepare for formation
  391. // of the next message.
  392. a.changes = make(map[typeAndID]*api.AssignmentChange)
  393. return message
  394. }
  395. // secret populates the secret value from raft store. For external secrets, the value is populated
  396. // from the secret driver. The function returns: a secret object; an indication of whether the value
  397. // is to be reused across tasks; and an error if the secret is not found in the store, if the secret
  398. // driver responds with one or if the payload does not pass validation.
  399. func (a *assignmentSet) secret(readTx store.ReadTx, task *api.Task, secretID string) (*api.Secret, bool, error) {
  400. secret := store.GetSecret(readTx, secretID)
  401. if secret == nil {
  402. return nil, false, fmt.Errorf("secret not found")
  403. }
  404. if secret.Spec.Driver == nil {
  405. return secret, false, nil
  406. }
  407. d, err := a.dp.NewSecretDriver(secret.Spec.Driver)
  408. if err != nil {
  409. return nil, false, err
  410. }
  411. value, doNotReuse, err := d.Get(&secret.Spec, task)
  412. if err != nil {
  413. return nil, false, err
  414. }
  415. if err := validation.ValidateSecretPayload(value); err != nil {
  416. return nil, false, err
  417. }
  418. // Assign the secret
  419. secret.Spec.Data = value
  420. return secret, doNotReuse, nil
  421. }