Vendor swarmkit for 1.12.0-rc5

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
(cherry picked from commit 60496af711)
Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
Aaron Lehmann 2016-07-26 00:15:08 -07:00 committed by Tibor Vass
parent e111f0c7b2
commit 4b986851f7
8 changed files with 126 additions and 45 deletions

View file

@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267
# cluster
clone git github.com/docker/swarmkit 4d7e44321726f011d010cdb72d2230f5db2b604e
clone git github.com/docker/swarmkit 9d4c2f73124e70f8fa85f9076635b827d17b109f
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b

View file

@ -1,11 +1,11 @@
package agent
import (
"reflect"
"time"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)
@ -175,7 +175,7 @@ func (tm *taskManager) run(ctx context.Context) {
case status := <-statusq:
tm.task.Status = *status
case task := <-tm.updateq:
if tasksEqual(task, tm.task) {
if equality.TasksEqualStable(task, tm.task) {
continue // ignore the update
}
@ -241,17 +241,3 @@ func (tm *taskManager) run(ctx context.Context) {
}
}
}
// tasksEqual returns true if the tasks are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a task update to a controller.
func tasksEqual(a, b *api.Task) bool {
// shallow copy
copyA, copyB := *a, *b
copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{}
copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -0,0 +1,21 @@
package equality
import (
"reflect"
"github.com/docker/swarmkit/api"
)
// TasksEqualStable returns true if the tasks are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a task update to a controller.
func TasksEqualStable(a, b *api.Task) bool {
// shallow copy
copyA, copyB := *a, *b
copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{}
copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state"
@ -29,7 +30,7 @@ const (
DefaultHeartBeatPeriod = 5 * time.Second
defaultHeartBeatEpsilon = 500 * time.Millisecond
defaultGracePeriodMultiplier = 3
defaultRateLimitPeriod = 16 * time.Second
defaultRateLimitPeriod = 8 * time.Second
// maxBatchItems is the threshold of queued writes that should
// trigger an actual transaction to commit them to the shared store.
@ -572,20 +573,44 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
return err
}
select {
case event := <-nodeTasks:
switch v := event.(type) {
case state.EventCreateTask:
tasksMap[v.Task.ID] = v.Task
case state.EventUpdateTask:
tasksMap[v.Task.ID] = v.Task
case state.EventDeleteTask:
delete(tasksMap, v.Task.ID)
// bursty events should be processed in batches and sent out snapshot
const modificationBatchLimit = 200
const eventPausedGap = 50 * time.Millisecond
var modificationCnt int
// eventPaused is true when there have been modifications
// but next event has not arrived within eventPausedGap
eventPaused := false
for modificationCnt < modificationBatchLimit && !eventPaused {
select {
case event := <-nodeTasks:
switch v := event.(type) {
case state.EventCreateTask:
tasksMap[v.Task.ID] = v.Task
modificationCnt++
case state.EventUpdateTask:
if oldTask, exists := tasksMap[v.Task.ID]; exists {
if equality.TasksEqualStable(oldTask, v.Task) {
// this update should not trigger action at agent
tasksMap[v.Task.ID] = v.Task
continue
}
}
tasksMap[v.Task.ID] = v.Task
modificationCnt++
case state.EventDeleteTask:
delete(tasksMap, v.Task.ID)
modificationCnt++
}
case <-time.After(eventPausedGap):
if modificationCnt > 0 {
eventPaused = true
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
}
}
}

View file

@ -12,10 +12,13 @@ import (
"github.com/docker/swarmkit/manager/dispatcher/heartbeat"
)
const rateLimitCount = 3
type registeredNode struct {
SessionID string
Heartbeat *heartbeat.Heartbeat
Registered time.Time
Attempts int
Node *api.Node
Disconnect chan struct{} // signal to disconnect
mu sync.Mutex
@ -86,9 +89,14 @@ func (s *nodeStore) CheckRateLimit(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
if existRn, ok := s.nodes[id]; ok {
if time.Since(existRn.Registered) < s.rateLimitPeriod {
return grpc.Errorf(codes.Unavailable, "node %s attempted registration too recently", id)
if time.Since(existRn.Registered) > s.rateLimitPeriod {
existRn.Attempts = 0
}
existRn.Attempts++
if existRn.Attempts > rateLimitCount {
return grpc.Errorf(codes.Unavailable, "node %s exceeded rate limit count of registrations", id)
}
existRn.Registered = time.Now()
}
return nil
}
@ -97,14 +105,22 @@ func (s *nodeStore) CheckRateLimit(id string) error {
func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode {
s.mu.Lock()
defer s.mu.Unlock()
var attempts int
var registered time.Time
if existRn, ok := s.nodes[n.ID]; ok {
attempts = existRn.Attempts
registered = existRn.Registered
existRn.Heartbeat.Stop()
delete(s.nodes, n.ID)
}
if registered.IsZero() {
registered = time.Now()
}
rn := &registeredNode{
SessionID: identity.NewID(), // session ID is local to the dispatcher.
Node: n,
Registered: time.Now(),
Registered: registered,
Attempts: attempts,
Disconnect: make(chan struct{}),
}
s.nodes[n.ID] = rn

View file

@ -3,7 +3,6 @@ package manager
import (
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"net"
"os"
@ -121,16 +120,26 @@ func New(config *Config) (*Manager, error) {
config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String()
}
tcpAddr := config.ProtoAddr["tcp"]
if config.AdvertiseAddr != "" {
tcpAddr = config.AdvertiseAddr
}
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
tcpAddr := config.AdvertiseAddr
if tcpAddr == "" {
return nil, errors.New("no tcp listen address or listener provided")
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
_, tcpAddrPort, err := net.SplitHostPort(config.ProtoAddr["tcp"])
if err != nil {
return nil, fmt.Errorf("missing or invalid listen address %s", config.ProtoAddr["tcp"])
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort)
}
// FIXME(aaronl): Remove this. It appears to be unused.
dispatcherConfig.Addr = tcpAddr
err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)

View file

@ -7,7 +7,12 @@ var (
// Always check for readiness first.
&ReadyFilter{},
&ResourceFilter{},
&PluginFilter{},
// TODO(stevvooe): Do not filter based on plugins since they are lazy
// loaded in the engine. We can add this back when we can schedule
// plugins in the future.
// &PluginFilter{},
&ConstraintFilter{},
}
)

View file

@ -401,7 +401,9 @@ func (n *Node) Run(ctx context.Context) error {
// restoring from the state, campaign to be the
// leader.
if !n.restored {
if len(n.cluster.Members()) <= 1 {
// Node ID should be in the progress list to Campaign
_, ok := n.Node.Status().Progress[n.Config.ID]
if len(n.cluster.Members()) <= 1 && ok {
if err := n.Campaign(n.Ctx); err != nil {
panic("raft: cannot campaign to be the leader on node restore")
}
@ -779,10 +781,27 @@ func (n *Node) LeaderAddr() (string, error) {
// registerNode registers a new node on the cluster memberlist
func (n *Node) registerNode(node *api.RaftMember) error {
if n.cluster.IsIDRemoved(node.RaftID) {
return nil
}
member := &membership.Member{}
if n.cluster.GetMember(node.RaftID) != nil || n.cluster.IsIDRemoved(node.RaftID) {
// member already exists
existingMember := n.cluster.GetMember(node.RaftID)
if existingMember != nil {
// Member already exists
// If the address is different from what we thought it was,
// update it. This can happen if we just joined a cluster
// and are adding ourself now with the remotely-reachable
// address.
if existingMember.Addr != node.Addr {
member.RaftMember = node
member.RaftClient = existingMember.RaftClient
member.Conn = existingMember.Conn
n.cluster.AddMember(member)
}
return nil
}