|
@@ -0,0 +1,409 @@
|
|
|
+package progress
|
|
|
+
|
|
|
+import (
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "os"
|
|
|
+ "os/signal"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/docker/docker/api/types"
|
|
|
+ "github.com/docker/docker/api/types/filters"
|
|
|
+ "github.com/docker/docker/api/types/swarm"
|
|
|
+ "github.com/docker/docker/client"
|
|
|
+ "github.com/docker/docker/pkg/progress"
|
|
|
+ "github.com/docker/docker/pkg/streamformatter"
|
|
|
+ "github.com/docker/docker/pkg/stringid"
|
|
|
+ "golang.org/x/net/context"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ numberedStates = map[swarm.TaskState]int64{
|
|
|
+ swarm.TaskStateNew: 1,
|
|
|
+ swarm.TaskStateAllocated: 2,
|
|
|
+ swarm.TaskStatePending: 3,
|
|
|
+ swarm.TaskStateAssigned: 4,
|
|
|
+ swarm.TaskStateAccepted: 5,
|
|
|
+ swarm.TaskStatePreparing: 6,
|
|
|
+ swarm.TaskStateReady: 7,
|
|
|
+ swarm.TaskStateStarting: 8,
|
|
|
+ swarm.TaskStateRunning: 9,
|
|
|
+ }
|
|
|
+
|
|
|
+ longestState int
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ maxProgress = 9
|
|
|
+ maxProgressBars = 20
|
|
|
+)
|
|
|
+
|
|
|
+type progressUpdater interface {
|
|
|
+ update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
|
|
|
+}
|
|
|
+
|
|
|
+func init() {
|
|
|
+ for state := range numberedStates {
|
|
|
+ if len(state) > longestState {
|
|
|
+ longestState = len(state)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func stateToProgress(state swarm.TaskState, rollback bool) int64 {
|
|
|
+ if !rollback {
|
|
|
+ return numberedStates[state]
|
|
|
+ }
|
|
|
+ return int64(len(numberedStates)) - numberedStates[state]
|
|
|
+}
|
|
|
+
|
|
|
+// ServiceProgress outputs progress information for convergence of a service.
|
|
|
+func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
|
|
|
+ defer progressWriter.Close()
|
|
|
+
|
|
|
+ progressOut := streamformatter.NewJSONStreamFormatter().NewProgressOutput(progressWriter, false)
|
|
|
+
|
|
|
+ sigint := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(sigint, os.Interrupt)
|
|
|
+ defer signal.Stop(sigint)
|
|
|
+
|
|
|
+ taskFilter := filters.NewArgs()
|
|
|
+ taskFilter.Add("service", serviceID)
|
|
|
+ taskFilter.Add("_up-to-date", "true")
|
|
|
+
|
|
|
+ getUpToDateTasks := func() ([]swarm.Task, error) {
|
|
|
+ return client.TaskList(ctx, types.TaskListOptions{Filters: taskFilter})
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ updater progressUpdater
|
|
|
+ converged bool
|
|
|
+ convergedAt time.Time
|
|
|
+ monitor = 5 * time.Second
|
|
|
+ rollback bool
|
|
|
+ )
|
|
|
+
|
|
|
+ for {
|
|
|
+ service, _, err := client.ServiceInspectWithRaw(ctx, serviceID)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if service.Spec.UpdateConfig != nil && service.Spec.UpdateConfig.Monitor != 0 {
|
|
|
+ monitor = service.Spec.UpdateConfig.Monitor
|
|
|
+ }
|
|
|
+
|
|
|
+ if updater == nil {
|
|
|
+ updater, err = initializeUpdater(service, progressOut)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if service.UpdateStatus != nil {
|
|
|
+ switch service.UpdateStatus.State {
|
|
|
+ case swarm.UpdateStateUpdating:
|
|
|
+ rollback = false
|
|
|
+ case swarm.UpdateStateCompleted:
|
|
|
+ if !converged {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ case swarm.UpdateStatePaused:
|
|
|
+ return fmt.Errorf("service update paused: %s", service.UpdateStatus.Message)
|
|
|
+ case swarm.UpdateStateRollbackStarted:
|
|
|
+ if !rollback && service.UpdateStatus.Message != "" {
|
|
|
+ progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: "rollback",
|
|
|
+ Action: service.UpdateStatus.Message,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ rollback = true
|
|
|
+ case swarm.UpdateStateRollbackPaused:
|
|
|
+ return fmt.Errorf("service rollback paused: %s", service.UpdateStatus.Message)
|
|
|
+ case swarm.UpdateStateRollbackCompleted:
|
|
|
+ if !converged {
|
|
|
+ return fmt.Errorf("service rolled back: %s", service.UpdateStatus.Message)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if converged && time.Since(convergedAt) >= monitor {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ tasks, err := getUpToDateTasks()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ activeNodes, err := getActiveNodes(ctx, client)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ converged, err = updater.update(service, tasks, activeNodes, rollback)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if converged {
|
|
|
+ if convergedAt.IsZero() {
|
|
|
+ convergedAt = time.Now()
|
|
|
+ }
|
|
|
+ wait := monitor - time.Since(convergedAt)
|
|
|
+ if wait >= 0 {
|
|
|
+ progressOut.WriteProgress(progress.Progress{
|
|
|
+ // Ideally this would have no ID, but
|
|
|
+ // the progress rendering code behaves
|
|
|
+ // poorly on an "action" with no ID. It
|
|
|
+ // returns the cursor to the beginning
|
|
|
+ // of the line, so the first character
|
|
|
+ // may be difficult to read. Then the
|
|
|
+ // output is overwritten by the shell
|
|
|
+ // prompt when the command finishes.
|
|
|
+ ID: "verify",
|
|
|
+ Action: fmt.Sprintf("Waiting %d seconds to verify that tasks are stable...", wait/time.Second+1),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if !convergedAt.IsZero() {
|
|
|
+ progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: "verify",
|
|
|
+ Action: "Detected task failure",
|
|
|
+ })
|
|
|
+ }
|
|
|
+ convergedAt = time.Time{}
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-time.After(200 * time.Millisecond):
|
|
|
+ case <-sigint:
|
|
|
+ if !converged {
|
|
|
+ progress.Message(progressOut, "", "Operation continuing in background.")
|
|
|
+ progress.Messagef(progressOut, "", "Use `docker service ps %s` to check progress.", serviceID)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
|
|
|
+ nodes, err := client.NodeList(ctx, types.NodeListOptions{})
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ activeNodes := make(map[string]swarm.Node)
|
|
|
+ for _, n := range nodes {
|
|
|
+ if n.Status.State != swarm.NodeStateDown {
|
|
|
+ activeNodes[n.ID] = n
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return activeNodes, nil
|
|
|
+}
|
|
|
+
|
|
|
+func initializeUpdater(service swarm.Service, progressOut progress.Output) (progressUpdater, error) {
|
|
|
+ if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
|
|
|
+ return &replicatedProgressUpdater{
|
|
|
+ progressOut: progressOut,
|
|
|
+ }, nil
|
|
|
+ }
|
|
|
+ if service.Spec.Mode.Global != nil {
|
|
|
+ return &globalProgressUpdater{
|
|
|
+ progressOut: progressOut,
|
|
|
+ }, nil
|
|
|
+ }
|
|
|
+ return nil, errors.New("unrecognized service mode")
|
|
|
+}
|
|
|
+
|
|
|
+func writeOverallProgress(progressOut progress.Output, numerator, denominator int, rollback bool) {
|
|
|
+ if rollback {
|
|
|
+ progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: "overall progress",
|
|
|
+ Action: fmt.Sprintf("rolling back update: %d out of %d tasks", numerator, denominator),
|
|
|
+ })
|
|
|
+ return
|
|
|
+ }
|
|
|
+ progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: "overall progress",
|
|
|
+ Action: fmt.Sprintf("%d out of %d tasks", numerator, denominator),
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+type replicatedProgressUpdater struct {
|
|
|
+ progressOut progress.Output
|
|
|
+
|
|
|
+ // used for maping slots to a contiguous space
|
|
|
+ // this also causes progress bars to appear in order
|
|
|
+ slotMap map[int]int
|
|
|
+
|
|
|
+ initialized bool
|
|
|
+ done bool
|
|
|
+}
|
|
|
+
|
|
|
+func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
|
|
|
+ if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
|
|
|
+ return false, errors.New("no replica count")
|
|
|
+ }
|
|
|
+ replicas := *service.Spec.Mode.Replicated.Replicas
|
|
|
+
|
|
|
+ if !u.initialized {
|
|
|
+ u.slotMap = make(map[int]int)
|
|
|
+
|
|
|
+ // Draw progress bars in order
|
|
|
+ writeOverallProgress(u.progressOut, 0, int(replicas), rollback)
|
|
|
+
|
|
|
+ if replicas <= maxProgressBars {
|
|
|
+ for i := uint64(1); i <= replicas; i++ {
|
|
|
+ progress.Update(u.progressOut, fmt.Sprintf("%d/%d", i, replicas), " ")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ u.initialized = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // If there are multiple tasks with the same slot number, favor the one
|
|
|
+ // with the *lowest* desired state. This can happen in restart
|
|
|
+ // scenarios.
|
|
|
+ tasksBySlot := make(map[int]swarm.Task)
|
|
|
+ for _, task := range tasks {
|
|
|
+ if numberedStates[task.DesiredState] == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if existingTask, ok := tasksBySlot[task.Slot]; ok {
|
|
|
+ if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
|
|
|
+ tasksBySlot[task.Slot] = task
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we had reached a converged state, check if we are still converged.
|
|
|
+ if u.done {
|
|
|
+ for _, task := range tasksBySlot {
|
|
|
+ if task.Status.State != swarm.TaskStateRunning {
|
|
|
+ u.done = false
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ running := uint64(0)
|
|
|
+
|
|
|
+ for _, task := range tasksBySlot {
|
|
|
+ mappedSlot := u.slotMap[task.Slot]
|
|
|
+ if mappedSlot == 0 {
|
|
|
+ mappedSlot = len(u.slotMap) + 1
|
|
|
+ u.slotMap[task.Slot] = mappedSlot
|
|
|
+ }
|
|
|
+
|
|
|
+ if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
|
|
|
+ u.progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
|
|
|
+ Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
|
|
|
+ Current: stateToProgress(task.Status.State, rollback),
|
|
|
+ Total: maxProgress,
|
|
|
+ HideCounts: true,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if task.Status.State == swarm.TaskStateRunning {
|
|
|
+ running++
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !u.done {
|
|
|
+ writeOverallProgress(u.progressOut, int(running), int(replicas), rollback)
|
|
|
+
|
|
|
+ if running == replicas {
|
|
|
+ u.done = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return running == replicas, nil
|
|
|
+}
|
|
|
+
|
|
|
+type globalProgressUpdater struct {
|
|
|
+ progressOut progress.Output
|
|
|
+
|
|
|
+ initialized bool
|
|
|
+ done bool
|
|
|
+}
|
|
|
+
|
|
|
+func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
|
|
|
+ // If there are multiple tasks with the same node ID, favor the one
|
|
|
+ // with the *lowest* desired state. This can happen in restart
|
|
|
+ // scenarios.
|
|
|
+ tasksByNode := make(map[string]swarm.Task)
|
|
|
+ for _, task := range tasks {
|
|
|
+ if numberedStates[task.DesiredState] == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if existingTask, ok := tasksByNode[task.NodeID]; ok {
|
|
|
+ if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tasksByNode[task.NodeID] = task
|
|
|
+ }
|
|
|
+
|
|
|
+ // We don't have perfect knowledge of how many nodes meet the
|
|
|
+ // constraints for this service. But the orchestrator creates tasks
|
|
|
+ // for all eligible nodes at the same time, so we should see all those
|
|
|
+ // nodes represented among the up-to-date tasks.
|
|
|
+ nodeCount := len(tasksByNode)
|
|
|
+
|
|
|
+ if !u.initialized {
|
|
|
+ if nodeCount == 0 {
|
|
|
+ // Two possibilities: either the orchestrator hasn't created
|
|
|
+ // the tasks yet, or the service doesn't meet constraints for
|
|
|
+ // any node. Either way, we wait.
|
|
|
+ u.progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: "overall progress",
|
|
|
+ Action: "waiting for new tasks",
|
|
|
+ })
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ writeOverallProgress(u.progressOut, 0, nodeCount, rollback)
|
|
|
+ u.initialized = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we had reached a converged state, check if we are still converged.
|
|
|
+ if u.done {
|
|
|
+ for _, task := range tasksByNode {
|
|
|
+ if task.Status.State != swarm.TaskStateRunning {
|
|
|
+ u.done = false
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ running := 0
|
|
|
+
|
|
|
+ for _, task := range tasksByNode {
|
|
|
+ if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
|
|
|
+ if !u.done && nodeCount <= maxProgressBars {
|
|
|
+ u.progressOut.WriteProgress(progress.Progress{
|
|
|
+ ID: stringid.TruncateID(node.ID),
|
|
|
+ Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
|
|
|
+ Current: stateToProgress(task.Status.State, rollback),
|
|
|
+ Total: maxProgress,
|
|
|
+ HideCounts: true,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if task.Status.State == swarm.TaskStateRunning {
|
|
|
+ running++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !u.done {
|
|
|
+ writeOverallProgress(u.progressOut, running, nodeCount, rollback)
|
|
|
+
|
|
|
+ if running == nodeCount {
|
|
|
+ u.done = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return running == nodeCount, nil
|
|
|
+}
|