123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package dispatcher
- import (
- "github.com/Sirupsen/logrus"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/api/equality"
- "github.com/docker/swarmkit/manager/state/store"
- )
- // Used as a key in tasksUsingDependency and changes. Only using the
- // ID could cause (rare) collisions between different types of
- // objects, so we also include the type of object in the key.
- type objectType int
- const (
- typeTask objectType = iota
- typeSecret
- typeConfig
- )
- type typeAndID struct {
- id string
- objType objectType
- }
- type assignmentSet struct {
- tasksMap map[string]*api.Task
- tasksUsingDependency map[typeAndID]map[string]struct{}
- changes map[typeAndID]*api.AssignmentChange
- log *logrus.Entry
- }
- func newAssignmentSet(log *logrus.Entry) *assignmentSet {
- return &assignmentSet{
- changes: make(map[typeAndID]*api.AssignmentChange),
- tasksMap: make(map[string]*api.Task),
- tasksUsingDependency: make(map[typeAndID]map[string]struct{}),
- log: log,
- }
- }
- func (a *assignmentSet) addTaskDependencies(readTx store.ReadTx, t *api.Task) {
- var secrets []*api.SecretReference
- container := t.Spec.GetContainer()
- if container != nil {
- secrets = container.Secrets
- }
- for _, secretRef := range secrets {
- secretID := secretRef.SecretID
- mapKey := typeAndID{objType: typeSecret, id: secretID}
- if len(a.tasksUsingDependency[mapKey]) == 0 {
- a.tasksUsingDependency[mapKey] = make(map[string]struct{})
- secret := store.GetSecret(readTx, secretID)
- if secret == nil {
- a.log.WithFields(logrus.Fields{
- "secret.id": secretID,
- "secret.name": secretRef.SecretName,
- }).Debug("secret not found")
- continue
- }
- // If the secret was found, add this secret to
- // our set that we send down.
- a.changes[mapKey] = &api.AssignmentChange{
- Assignment: &api.Assignment{
- Item: &api.Assignment_Secret{
- Secret: secret,
- },
- },
- Action: api.AssignmentChange_AssignmentActionUpdate,
- }
- }
- a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
- }
- var configs []*api.ConfigReference
- if container != nil {
- configs = container.Configs
- }
- for _, configRef := range configs {
- configID := configRef.ConfigID
- mapKey := typeAndID{objType: typeConfig, id: configID}
- if len(a.tasksUsingDependency[mapKey]) == 0 {
- a.tasksUsingDependency[mapKey] = make(map[string]struct{})
- config := store.GetConfig(readTx, configID)
- if config == nil {
- a.log.WithFields(logrus.Fields{
- "config.id": configID,
- "config.name": configRef.ConfigName,
- }).Debug("config not found")
- continue
- }
- // If the config was found, add this config to
- // our set that we send down.
- a.changes[mapKey] = &api.AssignmentChange{
- Assignment: &api.Assignment{
- Item: &api.Assignment_Config{
- Config: config,
- },
- },
- Action: api.AssignmentChange_AssignmentActionUpdate,
- }
- }
- a.tasksUsingDependency[mapKey][t.ID] = struct{}{}
- }
- }
- func (a *assignmentSet) releaseDependency(mapKey typeAndID, assignment *api.Assignment, taskID string) bool {
- delete(a.tasksUsingDependency[mapKey], taskID)
- if len(a.tasksUsingDependency[mapKey]) != 0 {
- return false
- }
- // No tasks are using the dependency anymore
- delete(a.tasksUsingDependency, mapKey)
- a.changes[mapKey] = &api.AssignmentChange{
- Assignment: assignment,
- Action: api.AssignmentChange_AssignmentActionRemove,
- }
- return true
- }
- func (a *assignmentSet) releaseTaskDependencies(t *api.Task) bool {
- var modified bool
- container := t.Spec.GetContainer()
- var secrets []*api.SecretReference
- if container != nil {
- secrets = container.Secrets
- }
- for _, secretRef := range secrets {
- secretID := secretRef.SecretID
- mapKey := typeAndID{objType: typeSecret, id: secretID}
- assignment := &api.Assignment{
- Item: &api.Assignment_Secret{
- Secret: &api.Secret{ID: secretID},
- },
- }
- if a.releaseDependency(mapKey, assignment, t.ID) {
- modified = true
- }
- }
- var configs []*api.ConfigReference
- if container != nil {
- configs = container.Configs
- }
- for _, configRef := range configs {
- configID := configRef.ConfigID
- mapKey := typeAndID{objType: typeConfig, id: configID}
- assignment := &api.Assignment{
- Item: &api.Assignment_Config{
- Config: &api.Config{ID: configID},
- },
- }
- if a.releaseDependency(mapKey, assignment, t.ID) {
- modified = true
- }
- }
- return modified
- }
- func (a *assignmentSet) addOrUpdateTask(readTx store.ReadTx, t *api.Task) bool {
- // We only care about tasks that are ASSIGNED or higher.
- if t.Status.State < api.TaskStateAssigned {
- return false
- }
- if oldTask, exists := a.tasksMap[t.ID]; exists {
- // States ASSIGNED and below are set by the orchestrator/scheduler,
- // not the agent, so tasks in these states need to be sent to the
- // agent even if nothing else has changed.
- if equality.TasksEqualStable(oldTask, t) && t.Status.State > api.TaskStateAssigned {
- // this update should not trigger a task change for the agent
- a.tasksMap[t.ID] = t
- // If this task got updated to a final state, let's release
- // the dependencies that are being used by the task
- if t.Status.State > api.TaskStateRunning {
- // If releasing the dependencies caused us to
- // remove something from the assignment set,
- // mark one modification.
- return a.releaseTaskDependencies(t)
- }
- return false
- }
- } else if t.Status.State <= api.TaskStateRunning {
- // If this task wasn't part of the assignment set before, and it's <= RUNNING
- // add the dependencies it references to the assignment.
- // Task states > RUNNING are worker reported only, are never created in
- // a > RUNNING state.
- a.addTaskDependencies(readTx, t)
- }
- a.tasksMap[t.ID] = t
- a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
- Assignment: &api.Assignment{
- Item: &api.Assignment_Task{
- Task: t,
- },
- },
- Action: api.AssignmentChange_AssignmentActionUpdate,
- }
- return true
- }
- func (a *assignmentSet) removeTask(t *api.Task) bool {
- if _, exists := a.tasksMap[t.ID]; !exists {
- return false
- }
- a.changes[typeAndID{objType: typeTask, id: t.ID}] = &api.AssignmentChange{
- Assignment: &api.Assignment{
- Item: &api.Assignment_Task{
- Task: &api.Task{ID: t.ID},
- },
- },
- Action: api.AssignmentChange_AssignmentActionRemove,
- }
- delete(a.tasksMap, t.ID)
- // Release the dependencies being used by this task.
- // Ignoring the return here. We will always mark this as a
- // modification, since a task is being removed.
- a.releaseTaskDependencies(t)
- return true
- }
- func (a *assignmentSet) message() api.AssignmentsMessage {
- var message api.AssignmentsMessage
- for _, change := range a.changes {
- message.Changes = append(message.Changes, change)
- }
- // The the set of changes is reinitialized to prepare for formation
- // of the next message.
- a.changes = make(map[typeAndID]*api.AssignmentChange)
- return message
- }
|