Update SwarmKit to ed384f3b3957f65e3111bd020f9815f3d4296fa2

This fix updates SwarmKit to ed384f3b3957f65e3111bd020f9815f3d4296fa2.
Notable changes since last update (3ca4775ba4a5519e2225c3337c7db8901ec39d26):

1. Fix duplicated ports allocation with restarted swarm. (Docker issue #29247)
2. Topology-aware scheduling (Docker PR #30725)

Docker issue #29247 was labeled 1.13.1, though it is advised that
related SwarmKit changes only to be merged to master
(based on the feedback https://github.com/docker/swarmkit/pull/1802#issuecomment-274143500)

This fix fixes #29247 (master only).
This fix is related to #30725.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
Yong Tang 2017-02-08 11:58:22 -08:00
parent d4be5a2f67
commit 6fb3a9d33c
11 changed files with 1047 additions and 351 deletions

View file

@ -102,7 +102,7 @@ github.com/docker/containerd 78fb8f45890a601e0fd9051cf9f9f74923e950fd
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 3ca4775ba4a5519e2225c3337c7db8901ec39d26
github.com/docker/swarmkit ed384f3b3957f65e3111bd020f9815f3d4296fa2
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

File diff suppressed because it is too large Load diff

View file

@ -709,10 +709,29 @@ message EncryptionConfig {
bool auto_lock_managers = 1;
}
message SpreadOver {
string spread_descriptor = 1; // label descriptor, such as engine.labels.az
// TODO: support node information beyond engine and node labels
// TODO: in the future, add a map that provides weights for weighted
// spreading.
}
message PlacementPreference {
oneof Preference {
SpreadOver spread = 1;
}
}
// Placement specifies task distribution constraints.
message Placement {
// constraints specifies a set of requirements a node should meet for a task.
// Constraints specifies a set of requirements a node should meet for a task.
repeated string constraints = 1;
// Preferences provide a way to make the scheduler aware of factors
// such as topology. They are provided in order from highest to lowest
// precedence.
repeated PlacementPreference preferences = 2;
}
// JoinToken contains the join tokens for workers and managers.

View file

@ -231,7 +231,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
var allocatedServices []*api.Service
for _, s := range services {
if nc.nwkAllocator.IsServiceAllocated(s) {
if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) {
continue
}

View file

@ -289,8 +289,25 @@ func (na *NetworkAllocator) PortsAllocatedInHostPublishMode(s *api.Service) bool
return na.portAllocator.portsAllocatedInHostPublishMode(s)
}
// ServiceAllocationOpts is struct used for functional options in IsServiceAllocated
type ServiceAllocationOpts struct {
OnInit bool
}
// OnInit is called for allocator initialization stage
func OnInit(options *ServiceAllocationOpts) {
options.OnInit = true
}
// IsServiceAllocated returns if the passed service has its network resources allocated or not.
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
// init bool indicates if the func is called during allocator initialization stage.
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*ServiceAllocationOpts)) bool {
var options ServiceAllocationOpts
for _, flag := range flags {
flag(&options)
}
// If endpoint mode is VIP and allocator does not have the
// service in VIP allocated set then it is not allocated.
if (len(s.Spec.Task.Networks) != 0 || len(s.Spec.Networks) != 0) &&
@ -313,7 +330,7 @@ func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return na.portAllocator.isPortsAllocated(s)
return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit)
}
return true

View file

@ -297,6 +297,10 @@ func (pa *portAllocator) portsAllocatedInHostPublishMode(s *api.Service) bool {
}
func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
return pa.isPortsAllocatedOnInit(s, false)
}
func (pa *portAllocator) isPortsAllocatedOnInit(s *api.Service, onInit bool) bool {
// If service has no user-defined endpoint and allocated endpoint,
// we assume it is allocated and return true.
if s.Endpoint == nil && s.Spec.Endpoint == nil {
@ -345,6 +349,13 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
if portConfig.PublishedPort == 0 && portStates.delState(portConfig) == nil {
return false
}
// If SwarmPort was not defined by user and the func
// is called during allocator initialization state then
// we are not allocated.
if portConfig.PublishedPort == 0 && onInit {
return false
}
}
return true

View file

@ -13,8 +13,10 @@ const (
eq = iota
noteq
nodeLabelPrefix = "node.labels."
engineLabelPrefix = "engine.labels."
// NodeLabelPrefix is the constraint key prefix for node labels.
NodeLabelPrefix = "node.labels."
// EngineLabelPrefix is the constraint key prefix for engine labels.
EngineLabelPrefix = "engine.labels."
)
var (
@ -168,14 +170,14 @@ func NodeMatches(constraints []Constraint, n *api.Node) bool {
}
// node labels constraint in form like 'node.labels.key==value'
case len(constraint.key) > len(nodeLabelPrefix) && strings.EqualFold(constraint.key[:len(nodeLabelPrefix)], nodeLabelPrefix):
case len(constraint.key) > len(NodeLabelPrefix) && strings.EqualFold(constraint.key[:len(NodeLabelPrefix)], NodeLabelPrefix):
if n.Spec.Annotations.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.key[len(nodeLabelPrefix):]
label := constraint.key[len(NodeLabelPrefix):]
// label itself is case sensitive
val := n.Spec.Annotations.Labels[label]
if !constraint.Match(val) {
@ -183,14 +185,14 @@ func NodeMatches(constraints []Constraint, n *api.Node) bool {
}
// engine labels constraint in form like 'engine.labels.key!=value'
case len(constraint.key) > len(engineLabelPrefix) && strings.EqualFold(constraint.key[:len(engineLabelPrefix)], engineLabelPrefix):
case len(constraint.key) > len(EngineLabelPrefix) && strings.EqualFold(constraint.key[:len(EngineLabelPrefix)], EngineLabelPrefix):
if n.Description == nil || n.Description.Engine == nil || n.Description.Engine.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.key[len(engineLabelPrefix):]
label := constraint.key[len(EngineLabelPrefix):]
val := n.Description.Engine.Labels[label]
if !constraint.Match(val) {
return false

View file

@ -0,0 +1,55 @@
package scheduler
import (
"container/heap"
)
type decisionTree struct {
// Count of tasks for the service scheduled to this subtree
tasks int
// Non-leaf point to the next level of the tree. The key is the
// value that the subtree covers.
next map[string]*decisionTree
// Leaf nodes contain a list of nodes
nodeHeap nodeMaxHeap
}
// orderedNodes returns the nodes in this decision tree entry, sorted best
// (lowest) first according to the sorting function. Must be called on a leaf
// of the decision tree.
//
// The caller may modify the nodes in the returned slice. This has the effect
// of changing the nodes in the decision tree entry. The next node to
// findBestNodes on this decisionTree entry will take into account the changes
// that were made to the nodes.
func (dt *decisionTree) orderedNodes(meetsConstraints func(*NodeInfo) bool, nodeLess func(*NodeInfo, *NodeInfo) bool) []NodeInfo {
if dt.nodeHeap.length != len(dt.nodeHeap.nodes) {
// We already collapsed the heap into a sorted slice, so
// re-heapify. There may have been modifications to the nodes
// so we can't return dt.nodeHeap.nodes as-is. We also need to
// reevaluate constraints because of the possible modifications.
for i := 0; i < len(dt.nodeHeap.nodes); {
if meetsConstraints(&dt.nodeHeap.nodes[i]) {
i++
} else {
last := len(dt.nodeHeap.nodes) - 1
dt.nodeHeap.nodes[i] = dt.nodeHeap.nodes[last]
dt.nodeHeap.nodes = dt.nodeHeap.nodes[:last]
}
}
dt.nodeHeap.length = len(dt.nodeHeap.nodes)
heap.Init(&dt.nodeHeap)
}
// Popping every element orders the nodes from best to worst. The
// first pop gets the worst node (since this a max-heap), and puts it
// at position n-1. Then the next pop puts the next-worst at n-2, and
// so on.
for dt.nodeHeap.Len() > 0 {
heap.Pop(&dt.nodeHeap)
}
return dt.nodeHeap.nodes
}

View file

@ -0,0 +1,31 @@
package scheduler
type nodeMaxHeap struct {
nodes []NodeInfo
lessFunc func(*NodeInfo, *NodeInfo) bool
length int
}
func (h nodeMaxHeap) Len() int {
return h.length
}
func (h nodeMaxHeap) Swap(i, j int) {
h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i]
}
func (h nodeMaxHeap) Less(i, j int) bool {
// reversed to make a max-heap
return h.lessFunc(&h.nodes[j], &h.nodes[i])
}
func (h *nodeMaxHeap) Push(x interface{}) {
h.nodes = append(h.nodes, x.(NodeInfo))
h.length++
}
func (h *nodeMaxHeap) Pop() interface{} {
h.length--
// return value is never used
return nil
}

View file

@ -3,9 +3,11 @@ package scheduler
import (
"container/heap"
"errors"
"strings"
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/constraint"
)
var errNodeNotFound = errors.New("node not found in scheduler dataset")
@ -56,73 +58,74 @@ func (ns *nodeSet) remove(nodeID string) {
delete(ns.nodes, nodeID)
}
type nodeMaxHeap struct {
nodes []NodeInfo
lessFunc func(*NodeInfo, *NodeInfo) bool
length int
}
func (ns *nodeSet) tree(serviceID string, preferences []*api.PlacementPreference, maxAssignments int, meetsConstraints func(*NodeInfo) bool, nodeLess func(*NodeInfo, *NodeInfo) bool) decisionTree {
var root decisionTree
func (h nodeMaxHeap) Len() int {
return h.length
}
func (h nodeMaxHeap) Swap(i, j int) {
h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i]
}
func (h nodeMaxHeap) Less(i, j int) bool {
// reversed to make a max-heap
return h.lessFunc(&h.nodes[j], &h.nodes[i])
}
func (h *nodeMaxHeap) Push(x interface{}) {
h.nodes = append(h.nodes, x.(NodeInfo))
h.length++
}
func (h *nodeMaxHeap) Pop() interface{} {
h.length--
// return value is never used
return nil
}
// findBestNodes returns n nodes (or < n if fewer nodes are available) that
// rank best (lowest) according to the sorting function.
func (ns *nodeSet) findBestNodes(n int, meetsConstraints func(*NodeInfo) bool, nodeLess func(*NodeInfo, *NodeInfo) bool) []NodeInfo {
if n == 0 {
return []NodeInfo{}
if maxAssignments == 0 {
return root
}
nodeHeap := nodeMaxHeap{lessFunc: nodeLess}
// TODO(aaronl): Is is possible to avoid checking constraints on every
// node? Perhaps we should try to schedule with n*2 nodes that weren't
// prescreened, and repeat the selection if there weren't enough nodes
// meeting the constraints.
for _, node := range ns.nodes {
// If there are fewer then n nodes in the heap, we add this
// node if it meets the constraints. Otherwise, the heap has
// n nodes, and if this node is better than the worst node in
// the heap, we replace the worst node and then fix the heap.
if nodeHeap.Len() < n {
if meetsConstraints(&node) {
heap.Push(&nodeHeap, node)
tree := &root
for _, pref := range preferences {
// Only spread is supported so far
spread := pref.GetSpread()
if spread == nil {
continue
}
} else if nodeLess(&node, &nodeHeap.nodes[0]) {
descriptor := spread.SpreadDescriptor
var value string
switch {
case len(descriptor) > len(constraint.NodeLabelPrefix) && strings.EqualFold(descriptor[:len(constraint.NodeLabelPrefix)], constraint.NodeLabelPrefix):
if node.Spec.Annotations.Labels != nil {
value = node.Spec.Annotations.Labels[descriptor[len(constraint.NodeLabelPrefix):]]
}
case len(descriptor) > len(constraint.EngineLabelPrefix) && strings.EqualFold(descriptor[:len(constraint.EngineLabelPrefix)], constraint.EngineLabelPrefix):
if node.Description != nil && node.Description.Engine != nil && node.Description.Engine.Labels != nil {
value = node.Description.Engine.Labels[descriptor[len(constraint.EngineLabelPrefix):]]
}
// TODO(aaronl): Support other items from constraint
// syntax like node ID, hostname, os/arch, etc?
default:
continue
}
// If value is still uninitialized, the value used for
// the node at this level of the tree is "". This makes
// sure that the tree structure is not affected by
// which properties nodes have and don't have.
if node.DesiredRunningTasksCountByService != nil {
tree.tasks += node.DesiredRunningTasksCountByService[serviceID]
}
if tree.next == nil {
tree.next = make(map[string]*decisionTree)
}
next := tree.next[value]
if next == nil {
next = &decisionTree{}
tree.next[value] = next
}
tree = next
}
if tree.nodeHeap.lessFunc == nil {
tree.nodeHeap.lessFunc = nodeLess
}
if tree.nodeHeap.Len() < maxAssignments {
if meetsConstraints(&node) {
nodeHeap.nodes[0] = node
heap.Fix(&nodeHeap, 0)
heap.Push(&tree.nodeHeap, node)
}
} else if nodeLess(&node, &tree.nodeHeap.nodes[0]) {
if meetsConstraints(&node) {
tree.nodeHeap.nodes[0] = node
heap.Fix(&tree.nodeHeap, 0)
}
}
}
// Popping every element orders the nodes from best to worst. The
// first pop gets the worst node (since this a max-heap), and puts it
// at position n-1. Then the next pop puts the next-worst at n-2, and
// so on.
for nodeHeap.Len() > 0 {
heap.Pop(&nodeHeap)
}
return nodeHeap.nodes
return root
}

View file

@ -531,21 +531,82 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
return a.DesiredRunningTasksCount < b.DesiredRunningTasksCount
}
nodes := s.nodeSet.findBestNodes(len(taskGroup), s.pipeline.Process, nodeLess)
nodeCount := len(nodes)
if nodeCount == 0 {
s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
return
var prefs []*api.PlacementPreference
if t.Spec.Placement != nil {
prefs = t.Spec.Placement.Preferences
}
tree := s.nodeSet.tree(t.ServiceID, prefs, len(taskGroup), s.pipeline.Process, nodeLess)
s.scheduleNTasksOnSubtree(ctx, len(taskGroup), taskGroup, &tree, schedulingDecisions, nodeLess)
if len(taskGroup) != 0 {
s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
}
}
func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGroup map[string]*api.Task, tree *decisionTree, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
if tree.next == nil {
nodes := tree.orderedNodes(s.pipeline.Process, nodeLess)
if len(nodes) == 0 {
return 0
}
return s.scheduleNTasksOnNodes(ctx, n, taskGroup, nodes, schedulingDecisions, nodeLess)
}
// Walk the tree and figure out how the tasks should be split at each
// level.
tasksScheduled := 0
tasksInUsableBranches := tree.tasks
var noRoom map[*decisionTree]struct{}
// Try to make branches even until either all branches are
// full, or all tasks have been scheduled.
for tasksScheduled != n && len(noRoom) != len(tree.next) {
desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
for _, subtree := range tree.next {
if noRoom != nil {
if _, ok := noRoom[subtree]; ok {
continue
}
}
subtreeTasks := subtree.tasks
if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
tasksToAssign := desiredTasksPerBranch - subtreeTasks
if remainder > 0 {
tasksToAssign++
}
res := s.scheduleNTasksOnSubtree(ctx, tasksToAssign, taskGroup, subtree, schedulingDecisions, nodeLess)
if res < tasksToAssign {
if noRoom == nil {
noRoom = make(map[*decisionTree]struct{})
}
noRoom[subtree] = struct{}{}
tasksInUsableBranches -= subtreeTasks
} else if remainder > 0 {
remainder--
}
tasksScheduled += res
}
}
}
return tasksScheduled
}
func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int {
tasksScheduled := 0
failedConstraints := make(map[int]bool) // key is index in nodes slice
nodeIter := 0
nodeCount := len(nodes)
for taskID, t := range taskGroup {
n := &nodes[nodeIter%nodeCount]
node := &nodes[nodeIter%nodeCount]
log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", n.ID)
log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)
newT := *t
newT.NodeID = n.ID
newT.NodeID = node.ID
newT.Status = api.TaskStatus{
State: api.TaskStateAssigned,
Timestamp: ptypes.MustTimestampProto(time.Now()),
@ -553,7 +614,7 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
}
s.allTasks[t.ID] = &newT
nodeInfo, err := s.nodeSet.nodeInfo(n.ID)
nodeInfo, err := s.nodeSet.nodeInfo(node.ID)
if err == nil && nodeInfo.addTask(&newT) {
s.nodeSet.updateNode(nodeInfo)
nodes[nodeIter%nodeCount] = nodeInfo
@ -561,6 +622,10 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
schedulingDecisions[taskID] = schedulingDecision{old: t, new: &newT}
delete(taskGroup, taskID)
tasksScheduled++
if tasksScheduled == n {
return tasksScheduled
}
if nodeIter+1 < nodeCount {
// First pass fills the nodes until they have the same
@ -581,11 +646,12 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string]
nodeIter++
if nodeIter-origNodeIter == nodeCount {
// None of the nodes meet the constraints anymore.
s.noSuitableNode(ctx, taskGroup, schedulingDecisions)
return
return tasksScheduled
}
}
}
return tasksScheduled
}
func (s *Scheduler) noSuitableNode(ctx context.Context, taskGroup map[string]*api.Task, schedulingDecisions map[string]schedulingDecision) {