Merge pull request #25546 from tonistiigi/update-swarmkit

vendor: update swarmkit to cb6d8131
This commit is contained in:
Alexander Morozov 2016-08-10 15:03:08 -07:00 committed by GitHub
commit 2a540c18b6
40 changed files with 1421 additions and 577 deletions

View file

@ -276,6 +276,7 @@ func (c *containerConfig) hostConfig() *enginecontainer.HostConfig {
Resources: c.resources(),
Binds: c.binds(),
Tmpfs: c.tmpfs(),
GroupAdd: c.spec().Groups,
}
if c.task.LogDriver != nil {

View file

@ -33,7 +33,7 @@ type controller struct {
var _ exec.Controller = &controller{}
// NewController returns a dockerexec runner for the provided task.
// NewController returns a docker exec runner for the provided task.
func newController(b executorpkg.Backend, task *api.Task) (*controller, error) {
adapter, err := newContainerAdapter(b, task)
if err != nil {

View file

@ -140,7 +140,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267
# cluster
clone git github.com/docker/swarmkit e1c0d64515d839b76e2ef33d396c74933753ffaf
clone git github.com/docker/swarmkit cb6d81316727941665594f153434e5ce2e425c9b
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b

View file

@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
"github.com/docker/swarmkit/api"
@ -31,11 +32,13 @@ type Agent struct {
sessionq chan sessionOperation
worker Worker
started chan struct{}
ready chan struct{}
stopped chan struct{} // requests shutdown
closed chan struct{} // only closed in run
err error // read only after closed is closed
started chan struct{}
startOnce sync.Once // start only once
ready chan struct{}
stopped chan struct{} // requests shutdown
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
err error // read only after closed is closed
}
// New returns a new agent, ready for task dispatch.
@ -59,57 +62,50 @@ func New(config *Config) (*Agent, error) {
// Start begins execution of the agent in the provided context, if not already
// started.
//
// Start returns an error if the agent has already started.
func (a *Agent) Start(ctx context.Context) error {
select {
case <-a.started:
select {
case <-a.closed:
return a.err
case <-a.stopped:
return errAgentStopped
case <-ctx.Done():
return ctx.Err()
default:
return errAgentStarted
}
case <-ctx.Done():
return ctx.Err()
default:
}
err := errAgentStarted
close(a.started)
go a.run(ctx)
a.startOnce.Do(func() {
close(a.started)
go a.run(ctx)
err = nil // clear error above, only once.
})
return nil
return err
}
// Stop shuts down the agent, blocking until full shutdown. If the agent is not
// started, Stop will block until Started.
// started, Stop will block until the agent has fully shutdown.
func (a *Agent) Stop(ctx context.Context) error {
select {
case <-a.started:
select {
case <-a.closed:
return a.err
case <-a.stopped:
select {
case <-a.closed:
return a.err
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
default:
close(a.stopped)
// recurse and wait for closure
return a.Stop(ctx)
}
case <-ctx.Done():
return ctx.Err()
default:
return errAgentNotStarted
}
a.stop()
// wait till closed or context cancelled
select {
case <-a.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// stop signals the agent shutdown process, returning true if this call was the
// first to actually shutdown the agent.
func (a *Agent) stop() bool {
var stopped bool
a.stopOnce.Do(func() {
close(a.stopped)
stopped = true
})
return stopped
}
// Err returns the error that caused the agent to shutdown or nil. Err blocks
@ -133,7 +129,7 @@ func (a *Agent) run(ctx context.Context) {
defer cancel()
defer close(a.closed) // full shutdown.
ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))
ctx = log.WithModule(ctx, "agent")
log.G(ctx).Debugf("(*Agent).run")
defer log.G(ctx).Debugf("(*Agent).run exited")
@ -197,11 +193,6 @@ func (a *Agent) run(ctx context.Context) {
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
@ -218,6 +209,7 @@ func (a *Agent) run(ctx context.Context) {
if a.err == nil {
a.err = ctx.Err()
}
session.close()
return
}

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/picker"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Config provides values for an Agent.
@ -19,15 +19,6 @@ type Config struct {
// updated with managers weights as observed by the agent.
Managers picker.Remotes
// Conn specifies the client connection Agent will use.
Conn *grpc.ClientConn
// Picker is the picker used by Conn.
// TODO(aaronl): This is only part of the config to allow resetting the
// GRPC connection. This should be refactored to address the coupling
// between Conn and Picker.
Picker *picker.Picker
// Executor specifies the executor to use for the agent.
Executor exec.Executor
@ -36,11 +27,14 @@ type Config struct {
// NotifyRoleChange channel receives new roles from session messages.
NotifyRoleChange chan<- api.NodeRole
// Credentials is credentials for grpc connection to manager.
Credentials credentials.TransportAuthenticator
}
func (c *Config) validate() error {
if c.Conn == nil {
return fmt.Errorf("agent: Connection is required")
if c.Credentials == nil {
return fmt.Errorf("agent: Credentials is required")
}
if c.Executor == nil {

View file

@ -10,10 +10,11 @@ var (
ErrClosed = errors.New("agent: closed")
errNodeNotRegistered = fmt.Errorf("node not registered")
errNodeStarted = errors.New("node: already started")
errNodeNotStarted = errors.New("node: not started")
errAgentNotStarted = errors.New("agent: not started")
errAgentStarted = errors.New("agent: already started")
errAgentStopped = errors.New("agent: stopped")
errAgentNotStarted = errors.New("agent: not started")
errTaskNoContoller = errors.New("agent: no task controller")
errTaskNotAssigned = errors.New("agent: task not assigned")

View file

@ -2,11 +2,11 @@ package exec
import (
"fmt"
"reflect"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/pkg/errors"
@ -186,7 +186,7 @@ func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus,
defer func() {
logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
if !reflect.DeepEqual(status, task.Status) {
if !equality.TaskStatusesEqualStable(status, &task.Status) {
status.Timestamp = ptypes.MustTimestampProto(time.Now())
}
}()

View file

@ -3,7 +3,7 @@ package agent
import "golang.org/x/net/context"
// runctx blocks until the function exits, closed is closed, or the context is
// cancelled. Call as part os go statement.
// cancelled. Call as part of go statement.
func runctx(ctx context.Context, closed chan struct{}, errs chan error, fn func(ctx context.Context) error) {
select {
case errs <- fn(ctx):

View file

@ -89,7 +89,9 @@ type Node struct {
nodeID string
nodeMembership api.NodeSpec_Membership
started chan struct{}
startOnce sync.Once
stopped chan struct{}
stopOnce sync.Once
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
certificateRequested chan struct{} // closed when certificate issue request has been sent by node
closed chan struct{}
@ -137,26 +139,15 @@ func NewNode(c *NodeConfig) (*Node, error) {
// Start starts a node instance.
func (n *Node) Start(ctx context.Context) error {
select {
case <-n.started:
select {
case <-n.closed:
return n.err
case <-n.stopped:
return errAgentStopped
case <-ctx.Done():
return ctx.Err()
default:
return errAgentStarted
}
case <-ctx.Done():
return ctx.Err()
default:
}
err := errNodeStarted
close(n.started)
go n.run(ctx)
return nil
n.startOnce.Do(func() {
close(n.started)
go n.run(ctx)
err = nil // clear error above, only once.
})
return err
}
func (n *Node) run(ctx context.Context) (err error) {
@ -166,7 +157,7 @@ func (n *Node) run(ctx context.Context) (err error) {
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "node"))
ctx = log.WithModule(ctx, "node")
go func() {
select {
@ -325,27 +316,19 @@ func (n *Node) run(ctx context.Context) (err error) {
func (n *Node) Stop(ctx context.Context) error {
select {
case <-n.started:
select {
case <-n.closed:
return n.err
case <-n.stopped:
select {
case <-n.closed:
return n.err
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
default:
close(n.stopped)
// recurse and wait for closure
return n.Stop(ctx)
}
default:
return errNodeNotStarted
}
n.stopOnce.Do(func() {
close(n.stopped)
})
select {
case <-n.closed:
return nil
case <-ctx.Done():
return ctx.Err()
default:
return errAgentNotStarted
}
}
@ -361,31 +344,21 @@ func (n *Node) Err(ctx context.Context) error {
}
func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
var manager api.Peer
select {
case <-ctx.Done():
case manager = <-n.remotes.WaitSelect(ctx):
case <-n.remotes.WaitSelect(ctx):
}
if ctx.Err() != nil {
return ctx.Err()
}
picker := picker.NewPicker(n.remotes, manager.Addr)
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
if err != nil {
return err
}
agent, err := New(&Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
Executor: n.config.Executor,
DB: db,
Conn: conn,
Picker: picker,
NotifyRoleChange: n.roleChangeReq,
Credentials: creds,
})
if err != nil {
return err

View file

@ -6,6 +6,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -27,6 +28,9 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
@ -41,12 +45,27 @@ type session struct {
func newSession(ctx context.Context, agent *Agent, delay time.Duration) *session {
s := &session{
agent: agent,
errs: make(chan error),
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
tasks: make(chan *api.TasksMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc
go s.run(ctx, delay)
return s
@ -77,8 +96,6 @@ func (s *session) run(ctx context.Context, delay time.Duration) {
func (s *session) start(ctx context.Context) error {
log.G(ctx).Debugf("(*session).start")
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
@ -103,6 +120,8 @@ func (s *session) start(ctx context.Context) error {
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
@ -133,7 +152,7 @@ func (s *session) start(ctx context.Context) error {
func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()
@ -195,7 +214,7 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
watch, err := client.Tasks(ctx, &api.TasksRequest{
SessionID: s.sessionID})
if err != nil {
@ -221,7 +240,7 @@ func (s *session) watch(ctx context.Context) error {
// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
@ -262,7 +281,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
n := batchSize
if len(updates) < n {
@ -285,6 +304,10 @@ func (s *session) close() error {
case <-s.closed:
return errSessionClosed
default:
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight)
s.conn.Close()
}
close(s.closed)
return nil
}

View file

@ -68,7 +68,7 @@ func (tm *taskManager) run(ctx context.Context) {
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll() // cancel all child operations on exit.
ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "taskmanager"))
ctx = log.WithModule(ctx, "taskmanager")
var (
opctx context.Context

View file

@ -57,7 +57,7 @@ func (w *worker) Init(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "worker"))
ctx = log.WithModule(ctx, "worker")
// TODO(stevvooe): Start task cleanup process.

View file

@ -668,12 +668,12 @@ func encodeVarintCa(data []byte, offset int, v uint64) int {
type raftProxyCAServer struct {
local CAServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyCAServer(local CAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer {
func NewRaftProxyCAServer(local CAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -724,17 +724,30 @@ func (p *raftProxyCAServer) GetRootCACertificate(ctx context.Context, r *GetRoot
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewCAClient(conn).GetRootCACertificate(ctx, r)
}
type raftProxyNodeCAServer struct {
local NodeCAServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer {
func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -785,6 +798,19 @@ func (p *raftProxyNodeCAServer) IssueNodeCertificate(ctx context.Context, r *Iss
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewNodeCAClient(conn).IssueNodeCertificate(ctx, r)
}
@ -801,6 +827,19 @@ func (p *raftProxyNodeCAServer) NodeCertificateStatus(ctx context.Context, r *No
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewNodeCAClient(conn).NodeCertificateStatus(ctx, r)
}

View file

@ -4239,12 +4239,12 @@ func encodeVarintControl(data []byte, offset int, v uint64) int {
type raftProxyControlServer struct {
local ControlServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyControlServer(local ControlServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer {
func NewRaftProxyControlServer(local ControlServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -4295,6 +4295,19 @@ func (p *raftProxyControlServer) GetNode(ctx context.Context, r *GetNodeRequest)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetNode(ctx, r)
}
@ -4311,6 +4324,19 @@ func (p *raftProxyControlServer) ListNodes(ctx context.Context, r *ListNodesRequ
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListNodes(ctx, r)
}
@ -4327,6 +4353,19 @@ func (p *raftProxyControlServer) UpdateNode(ctx context.Context, r *UpdateNodeRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateNode(ctx, r)
}
@ -4343,6 +4382,19 @@ func (p *raftProxyControlServer) RemoveNode(ctx context.Context, r *RemoveNodeRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveNode(ctx, r)
}
@ -4359,6 +4411,19 @@ func (p *raftProxyControlServer) GetTask(ctx context.Context, r *GetTaskRequest)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetTask(ctx, r)
}
@ -4375,6 +4440,19 @@ func (p *raftProxyControlServer) ListTasks(ctx context.Context, r *ListTasksRequ
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListTasks(ctx, r)
}
@ -4391,6 +4469,19 @@ func (p *raftProxyControlServer) RemoveTask(ctx context.Context, r *RemoveTaskRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveTask(ctx, r)
}
@ -4407,6 +4498,19 @@ func (p *raftProxyControlServer) GetService(ctx context.Context, r *GetServiceRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetService(ctx, r)
}
@ -4423,6 +4527,19 @@ func (p *raftProxyControlServer) ListServices(ctx context.Context, r *ListServic
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListServices(ctx, r)
}
@ -4439,6 +4556,19 @@ func (p *raftProxyControlServer) CreateService(ctx context.Context, r *CreateSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).CreateService(ctx, r)
}
@ -4455,6 +4585,19 @@ func (p *raftProxyControlServer) UpdateService(ctx context.Context, r *UpdateSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateService(ctx, r)
}
@ -4471,6 +4614,19 @@ func (p *raftProxyControlServer) RemoveService(ctx context.Context, r *RemoveSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveService(ctx, r)
}
@ -4487,6 +4643,19 @@ func (p *raftProxyControlServer) GetNetwork(ctx context.Context, r *GetNetworkRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetNetwork(ctx, r)
}
@ -4503,6 +4672,19 @@ func (p *raftProxyControlServer) ListNetworks(ctx context.Context, r *ListNetwor
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListNetworks(ctx, r)
}
@ -4519,6 +4701,19 @@ func (p *raftProxyControlServer) CreateNetwork(ctx context.Context, r *CreateNet
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).CreateNetwork(ctx, r)
}
@ -4535,6 +4730,19 @@ func (p *raftProxyControlServer) RemoveNetwork(ctx context.Context, r *RemoveNet
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveNetwork(ctx, r)
}
@ -4551,6 +4759,19 @@ func (p *raftProxyControlServer) GetCluster(ctx context.Context, r *GetClusterRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetCluster(ctx, r)
}
@ -4567,6 +4788,19 @@ func (p *raftProxyControlServer) ListClusters(ctx context.Context, r *ListCluste
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListClusters(ctx, r)
}
@ -4583,6 +4817,19 @@ func (p *raftProxyControlServer) UpdateCluster(ctx context.Context, r *UpdateClu
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateCluster(ctx, r)
}

View file

@ -1072,12 +1072,12 @@ func encodeVarintDispatcher(data []byte, offset int, v uint64) int {
type raftProxyDispatcherServer struct {
local DispatcherServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer {
func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1128,6 +1128,19 @@ func (p *raftProxyDispatcherServer) Session(r *SessionRequest, stream Dispatcher
if err != nil {
return err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
clientStream, err := NewDispatcherClient(conn).Session(ctx, r)
if err != nil {
@ -1162,6 +1175,19 @@ func (p *raftProxyDispatcherServer) Heartbeat(ctx context.Context, r *HeartbeatR
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewDispatcherClient(conn).Heartbeat(ctx, r)
}
@ -1178,6 +1204,19 @@ func (p *raftProxyDispatcherServer) UpdateTaskStatus(ctx context.Context, r *Upd
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewDispatcherClient(conn).UpdateTaskStatus(ctx, r)
}
@ -1194,6 +1233,19 @@ func (p *raftProxyDispatcherServer) Tasks(r *TasksRequest, stream Dispatcher_Tas
if err != nil {
return err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
clientStream, err := NewDispatcherClient(conn).Tasks(ctx, r)
if err != nil {

View file

@ -19,3 +19,11 @@ func TasksEqualStable(a, b *api.Task) bool {
return reflect.DeepEqual(&copyA, &copyB)
}
// TaskStatusesEqualStable compares the task status excluding timestamp fields.
func TaskStatusesEqualStable(a, b *api.TaskStatus) bool {
copyA, copyB := *a, *b
copyA.Timestamp, copyB.Timestamp = nil, nil
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -319,12 +319,12 @@ func encodeVarintHealth(data []byte, offset int, v uint64) int {
type raftProxyHealthServer struct {
local HealthServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyHealthServer(local HealthServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer {
func NewRaftProxyHealthServer(local HealthServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -375,6 +375,19 @@ func (p *raftProxyHealthServer) Check(ctx context.Context, r *HealthCheckRequest
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewHealthClient(conn).Check(ctx, r)
}

View file

@ -1438,12 +1438,12 @@ func encodeVarintRaft(data []byte, offset int, v uint64) int {
type raftProxyRaftServer struct {
local RaftServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyRaftServer(local RaftServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer {
func NewRaftProxyRaftServer(local RaftServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1494,6 +1494,19 @@ func (p *raftProxyRaftServer) ProcessRaftMessage(ctx context.Context, r *Process
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftClient(conn).ProcessRaftMessage(ctx, r)
}
@ -1510,17 +1523,30 @@ func (p *raftProxyRaftServer) ResolveAddress(ctx context.Context, r *ResolveAddr
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftClient(conn).ResolveAddress(ctx, r)
}
type raftProxyRaftMembershipServer struct {
local RaftMembershipServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer {
func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1571,6 +1597,19 @@ func (p *raftProxyRaftMembershipServer) Join(ctx context.Context, r *JoinRequest
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftMembershipClient(conn).Join(ctx, r)
}
@ -1587,6 +1626,19 @@ func (p *raftProxyRaftMembershipServer) Leave(ctx context.Context, r *LeaveReque
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftMembershipClient(conn).Leave(ctx, r)
}

View file

@ -432,8 +432,14 @@ type ContainerSpec struct {
// Dir defines the working directory to set for the container process.
Dir string `protobuf:"bytes,6,opt,name=dir,proto3" json:"dir,omitempty"`
// User specifies the user that should be employed to run the container.
User string `protobuf:"bytes,7,opt,name=user,proto3" json:"user,omitempty"`
Mounts []Mount `protobuf:"bytes,8,rep,name=mounts" json:"mounts"`
//
// Note that the primary group may be specified by appending the group name
// or id to the user name, separated by a `:`. This syntax is
// `<user>:<group>`.
User string `protobuf:"bytes,7,opt,name=user,proto3" json:"user,omitempty"`
// Groups specifies supplementary groups available to the user.
Groups []string `protobuf:"bytes,11,rep,name=groups" json:"groups,omitempty"`
Mounts []Mount `protobuf:"bytes,8,rep,name=mounts" json:"mounts"`
// StopGracePeriod the grace period for stopping the container before
// forcefully killing the container.
StopGracePeriod *docker_swarmkit_v11.Duration `protobuf:"bytes,9,opt,name=stop_grace_period,json=stopGracePeriod" json:"stop_grace_period,omitempty"`
@ -688,6 +694,13 @@ func (m *ContainerSpec) Copy() *ContainerSpec {
}
}
if m.Groups != nil {
o.Groups = make([]string, 0, len(m.Groups))
for _, v := range m.Groups {
o.Groups = append(o.Groups, v)
}
}
if m.Mounts != nil {
o.Mounts = make([]Mount, 0, len(m.Mounts))
for _, v := range m.Mounts {
@ -881,7 +894,7 @@ func (this *ContainerSpec) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 14)
s := make([]string, 0, 15)
s = append(s, "&api.ContainerSpec{")
s = append(s, "Image: "+fmt.Sprintf("%#v", this.Image)+",\n")
keysForLabels := make([]string, 0, len(this.Labels))
@ -902,6 +915,7 @@ func (this *ContainerSpec) GoString() string {
s = append(s, "Env: "+fmt.Sprintf("%#v", this.Env)+",\n")
s = append(s, "Dir: "+fmt.Sprintf("%#v", this.Dir)+",\n")
s = append(s, "User: "+fmt.Sprintf("%#v", this.User)+",\n")
s = append(s, "Groups: "+fmt.Sprintf("%#v", this.Groups)+",\n")
if this.Mounts != nil {
s = append(s, "Mounts: "+fmt.Sprintf("%#v", this.Mounts)+",\n")
}
@ -1424,6 +1438,21 @@ func (m *ContainerSpec) MarshalTo(data []byte) (int, error) {
}
i += n16
}
if len(m.Groups) > 0 {
for _, s := range m.Groups {
data[i] = 0x5a
i++
l = len(s)
for l >= 1<<7 {
data[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
data[i] = uint8(l)
i++
i += copy(data[i:], s)
}
}
return i, nil
}
@ -1838,6 +1867,12 @@ func (m *ContainerSpec) Size() (n int) {
l = m.PullOptions.Size()
n += 1 + l + sovSpecs(uint64(l))
}
if len(m.Groups) > 0 {
for _, s := range m.Groups {
l = len(s)
n += 1 + l + sovSpecs(uint64(l))
}
}
return n
}
@ -2048,6 +2083,7 @@ func (this *ContainerSpec) String() string {
`Mounts:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Mounts), "Mount", "Mount", 1), `&`, ``, 1) + `,`,
`StopGracePeriod:` + strings.Replace(fmt.Sprintf("%v", this.StopGracePeriod), "Duration", "docker_swarmkit_v11.Duration", 1) + `,`,
`PullOptions:` + strings.Replace(fmt.Sprintf("%v", this.PullOptions), "ContainerSpec_PullOptions", "ContainerSpec_PullOptions", 1) + `,`,
`Groups:` + fmt.Sprintf("%v", this.Groups) + `,`,
`}`,
}, "")
return s
@ -3371,6 +3407,35 @@ func (m *ContainerSpec) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 11:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Groups", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpecs
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSpecs
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Groups = append(m.Groups, string(data[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSpecs(data[iNdEx:])
@ -4123,89 +4188,89 @@ var (
)
var fileDescriptorSpecs = []byte{
// 1332 bytes of a gzipped FileDescriptorProto
// 1344 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x57, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0x8f, 0x93, 0x8d, 0xe3, 0xbc, 0x75, 0xda, 0x74, 0x54, 0x5a, 0xd7, 0x2d, 0x49, 0x6a, 0x0a,
0x14, 0x24, 0x1c, 0x30, 0xa8, 0x7f, 0xf8, 0x23, 0x70, 0x6c, 0x93, 0x86, 0x92, 0x74, 0x35, 0x69,
0x2b, 0x71, 0xb2, 0x26, 0xbb, 0x53, 0x67, 0x95, 0xf5, 0xee, 0x32, 0x3b, 0xeb, 0x2a, 0x37, 0x8e,
0x15, 0x07, 0x6e, 0x70, 0xe3, 0x84, 0xc4, 0x47, 0xe0, 0x33, 0xf4, 0xc8, 0x05, 0x89, 0x53, 0x45,
0xfb, 0x09, 0x90, 0xf8, 0x02, 0xbc, 0x99, 0x1d, 0xdb, 0x6b, 0xba, 0x69, 0x39, 0xf4, 0x60, 0x69,
0xe6, 0xcd, 0xef, 0xf7, 0x66, 0xe6, 0xbd, 0xdf, 0xbc, 0xb7, 0x06, 0x3b, 0x89, 0xb9, 0x9b, 0x34,
0x63, 0x11, 0xc9, 0x88, 0x10, 0x2f, 0x72, 0x8f, 0xb8, 0x68, 0x26, 0x0f, 0x99, 0x18, 0x1e, 0xf9,
0xb2, 0x39, 0xfa, 0xa0, 0x6e, 0xcb, 0xe3, 0x98, 0x1b, 0x40, 0xfd, 0xec, 0x20, 0x1a, 0x44, 0x7a,
0xb8, 0xa9, 0x46, 0xc6, 0x7a, 0xde, 0x4b, 0x05, 0x93, 0x7e, 0x14, 0x6e, 0x8e, 0x07, 0xd9, 0x42,
0xe3, 0x07, 0x0b, 0x2a, 0x7b, 0x91, 0xc7, 0xf7, 0x71, 0x0f, 0xb2, 0x0d, 0x36, 0x0b, 0xc3, 0x48,
0x6a, 0x40, 0x52, 0x2b, 0x6d, 0x94, 0xae, 0xda, 0xad, 0xf5, 0xe6, 0xf3, 0x5b, 0x36, 0xdb, 0x53,
0xd8, 0x96, 0xf5, 0xf8, 0xc9, 0xfa, 0x1c, 0xcd, 0x33, 0xc9, 0xfb, 0x60, 0x89, 0x28, 0xe0, 0xb5,
0x79, 0xf4, 0x70, 0xaa, 0x75, 0xa9, 0xc8, 0x83, 0xda, 0x94, 0x22, 0x86, 0x6a, 0x24, 0x6e, 0x0d,
0x43, 0x3e, 0x3c, 0xe0, 0x22, 0x39, 0xf4, 0xe3, 0xda, 0x82, 0xe6, 0xbd, 0x7d, 0x12, 0x4f, 0x1d,
0xb6, 0xb9, 0x3b, 0x81, 0xd3, 0x1c, 0x95, 0xec, 0x42, 0x95, 0x8d, 0x98, 0x1f, 0xb0, 0x03, 0x3f,
0xf0, 0xe5, 0x71, 0xcd, 0xd2, 0xae, 0xde, 0x79, 0xa1, 0xab, 0x76, 0x8e, 0x40, 0x67, 0xe8, 0x0d,
0x0f, 0x60, 0xba, 0x11, 0x79, 0x0b, 0x96, 0x9c, 0xde, 0x5e, 0x77, 0x67, 0x6f, 0x7b, 0x75, 0xae,
0x7e, 0xe1, 0xfb, 0x9f, 0x37, 0x5e, 0x53, 0x3e, 0xa6, 0x00, 0x87, 0x87, 0x9e, 0x1f, 0x0e, 0xc8,
0x55, 0xa8, 0xb4, 0x3b, 0x9d, 0x9e, 0x73, 0xb7, 0xd7, 0x5d, 0x2d, 0xd5, 0xeb, 0x08, 0x3c, 0x37,
0x0b, 0x6c, 0xbb, 0x2e, 0x8f, 0x25, 0xf7, 0xea, 0xd6, 0xa3, 0x5f, 0xd6, 0xe6, 0x1a, 0x8f, 0x4a,
0x50, 0xcd, 0x1f, 0x02, 0x37, 0x2a, 0xb7, 0x3b, 0x77, 0x77, 0xee, 0xf7, 0x70, 0x9f, 0x09, 0x3d,
0x8f, 0x68, 0xbb, 0xd2, 0x1f, 0x71, 0x72, 0x05, 0x16, 0x9d, 0xf6, 0xbd, 0xfd, 0x1e, 0xee, 0x32,
0x39, 0x4e, 0x1e, 0xe6, 0xb0, 0x34, 0xd1, 0xa8, 0x2e, 0x6d, 0xef, 0xec, 0xad, 0xce, 0x17, 0xa3,
0xba, 0x82, 0xf9, 0xa1, 0x39, 0xca, 0x6f, 0x16, 0xd8, 0xfb, 0x5c, 0x8c, 0x7c, 0xf7, 0x15, 0x6b,
0xe2, 0x1a, 0x58, 0x92, 0x25, 0x47, 0x5a, 0x13, 0x76, 0xb1, 0x26, 0xee, 0xe2, 0xba, 0xda, 0xd4,
0xd0, 0x35, 0x5e, 0x29, 0x43, 0xf0, 0x38, 0xf0, 0x5d, 0x86, 0xf1, 0xd2, 0xca, 0xb0, 0x5b, 0x6f,
0x16, 0xb1, 0xe9, 0x04, 0x65, 0xce, 0x7f, 0x6b, 0x8e, 0xe6, 0xa8, 0xe4, 0x13, 0x28, 0x0f, 0x82,
0xe8, 0x80, 0x05, 0x5a, 0x13, 0x76, 0xeb, 0x72, 0x91, 0x93, 0x6d, 0x8d, 0x98, 0x3a, 0x30, 0x14,
0x72, 0x03, 0xca, 0x69, 0xec, 0xa1, 0x9f, 0x5a, 0x59, 0x93, 0x37, 0x8a, 0xc8, 0xf7, 0x34, 0xa2,
0x13, 0x85, 0x0f, 0xfc, 0x01, 0x35, 0x78, 0xb2, 0x0f, 0x95, 0x90, 0xcb, 0x87, 0x91, 0x38, 0x4a,
0x6a, 0x4b, 0x1b, 0x0b, 0xc8, 0xbd, 0x5e, 0xc4, 0xcd, 0xc5, 0xbc, 0xb9, 0x97, 0xe1, 0xdb, 0x52,
0x32, 0xf7, 0x70, 0xc8, 0x43, 0x69, 0x5c, 0x4e, 0x1c, 0x91, 0x4f, 0xa1, 0x82, 0x52, 0x8b, 0x23,
0x3f, 0x94, 0xb5, 0xca, 0xc9, 0x07, 0xea, 0x19, 0x8c, 0xf2, 0x4a, 0x27, 0x8c, 0xfa, 0x6d, 0x38,
0x7f, 0xc2, 0x16, 0xe4, 0x1c, 0x94, 0x25, 0x13, 0x03, 0x2e, 0x75, 0xa6, 0x97, 0xa9, 0x99, 0x91,
0x1a, 0x2c, 0xb1, 0xc0, 0x67, 0x09, 0x4f, 0x30, 0x81, 0x0b, 0xb8, 0x30, 0x9e, 0x6e, 0x95, 0xc1,
0x1a, 0xa2, 0x9e, 0x1a, 0x9b, 0x70, 0xe6, 0xb9, 0x0c, 0x90, 0x3a, 0x54, 0x4c, 0x06, 0x32, 0xe9,
0x58, 0x74, 0x32, 0x6f, 0x9c, 0x86, 0x95, 0x99, 0x68, 0x37, 0xfe, 0x98, 0x87, 0xca, 0x58, 0x02,
0xa4, 0x0d, 0xcb, 0x6e, 0x14, 0x4a, 0x14, 0x26, 0x17, 0x46, 0x75, 0x85, 0x09, 0xeb, 0x8c, 0x41,
0x8a, 0x85, 0x09, 0x9b, 0xb2, 0xc8, 0x97, 0xb0, 0x2c, 0x78, 0x12, 0xa5, 0xc2, 0xd5, 0xa7, 0x56,
0x2e, 0xae, 0x16, 0x0b, 0x27, 0x03, 0x51, 0xfe, 0x6d, 0xea, 0x0b, 0xae, 0xa2, 0x91, 0xd0, 0x29,
0x15, 0x85, 0xb3, 0x84, 0x13, 0x0c, 0x84, 0x7c, 0x91, 0x72, 0x68, 0x06, 0x71, 0x22, 0xbc, 0xdd,
0x31, 0x1d, 0x33, 0x90, 0xbc, 0x1c, 0x07, 0xcc, 0xd5, 0x5e, 0x6b, 0x8b, 0x9a, 0xfe, 0x7a, 0x11,
0xdd, 0x19, 0x83, 0xe8, 0x14, 0x4f, 0x6e, 0x02, 0x04, 0xd1, 0xa0, 0xef, 0x09, 0x7c, 0xeb, 0xc2,
0x28, 0xaf, 0x5e, 0xc4, 0xee, 0x6a, 0x04, 0x5d, 0x46, 0x74, 0x36, 0xdc, 0x5a, 0xc6, 0x43, 0xa7,
0xa1, 0xf4, 0x87, 0xbc, 0xf1, 0x93, 0x05, 0x2b, 0x33, 0x61, 0x22, 0x67, 0x61, 0xd1, 0x1f, 0xb2,
0x01, 0x37, 0x49, 0xce, 0x26, 0xa4, 0x07, 0x65, 0xac, 0x08, 0x3c, 0xc8, 0x52, 0x6c, 0xb7, 0xde,
0x7b, 0x69, 0xbc, 0x9b, 0x5f, 0x6b, 0x7c, 0x2f, 0x94, 0xe2, 0x98, 0x1a, 0xb2, 0x92, 0x8a, 0x1b,
0x0d, 0x87, 0x2c, 0x54, 0xaf, 0x55, 0x4b, 0xc5, 0x4c, 0x09, 0x01, 0x0b, 0xd5, 0x94, 0x60, 0x14,
0x95, 0x59, 0x8f, 0xc9, 0x2a, 0x2c, 0xf0, 0x70, 0x84, 0x91, 0x51, 0x26, 0x35, 0x54, 0x16, 0xcf,
0xcf, 0x6e, 0x8b, 0x16, 0x1c, 0x2a, 0x1e, 0x96, 0x31, 0x81, 0xcf, 0x47, 0x99, 0xf4, 0x98, 0x5c,
0x87, 0xf2, 0x30, 0xc2, 0x0b, 0x26, 0xa8, 0x7f, 0x75, 0xd8, 0x0b, 0x45, 0x87, 0xdd, 0x55, 0x08,
0x53, 0x4d, 0x0c, 0x9c, 0xdc, 0x82, 0x33, 0x89, 0x8c, 0xe2, 0xfe, 0x40, 0x60, 0x94, 0xfb, 0x31,
0x17, 0x7e, 0xe4, 0xd5, 0x96, 0x4f, 0x2e, 0x4a, 0x5d, 0xd3, 0x30, 0xe9, 0x69, 0x45, 0xdb, 0x56,
0x2c, 0x47, 0x93, 0x88, 0x03, 0xd5, 0x38, 0x0d, 0x82, 0x7e, 0x14, 0x67, 0xb5, 0x11, 0xb4, 0x93,
0xff, 0x11, 0x35, 0x07, 0x59, 0x77, 0x32, 0x12, 0xb5, 0xe3, 0xe9, 0xa4, 0x7e, 0x13, 0xec, 0x5c,
0x44, 0x55, 0x24, 0x8e, 0xf8, 0xb1, 0x49, 0x92, 0x1a, 0xaa, 0xc4, 0x8d, 0x58, 0x90, 0x66, 0x9d,
0x15, 0x13, 0xa7, 0x27, 0x1f, 0xcf, 0xdf, 0x28, 0xd5, 0x5b, 0x60, 0xe7, 0xdc, 0x92, 0x37, 0x60,
0x45, 0xf0, 0x81, 0x9f, 0xa0, 0x9b, 0x3e, 0x4b, 0xe5, 0x61, 0xed, 0x0b, 0x4d, 0xa8, 0x8e, 0x8d,
0x6d, 0xb4, 0x35, 0xfe, 0xc1, 0xb6, 0x93, 0x2f, 0x11, 0xa4, 0x93, 0xbd, 0x65, 0xbd, 0xe3, 0xa9,
0xd6, 0xe6, 0xcb, 0x4a, 0x8a, 0x7e, 0x39, 0x41, 0xaa, 0x76, 0xdc, 0x55, 0xed, 0x5c, 0x93, 0xc9,
0x47, 0xb0, 0x18, 0x47, 0x42, 0x8e, 0x55, 0xb4, 0x56, 0xa8, 0x76, 0x04, 0x98, 0xa2, 0x96, 0x81,
0x1b, 0x87, 0x70, 0x6a, 0xd6, 0x1b, 0x76, 0xad, 0x85, 0xfb, 0x3b, 0x0e, 0x36, 0xc0, 0x8b, 0xd8,
0xb3, 0xce, 0xcf, 0x2e, 0xde, 0xf7, 0x85, 0x4c, 0x59, 0xb0, 0xe3, 0x90, 0x77, 0xb1, 0xb7, 0xed,
0xed, 0x53, 0x8a, 0x1d, 0x70, 0x1d, 0x71, 0x17, 0x67, 0x71, 0x6a, 0x09, 0xd3, 0xee, 0xd1, 0xe8,
0x60, 0xd2, 0xe1, 0x7e, 0x9c, 0x07, 0xdb, 0x94, 0xbf, 0x57, 0xdb, 0xe1, 0x3e, 0x87, 0x95, 0xec,
0xa5, 0xf6, 0x5d, 0x7d, 0x35, 0x53, 0x73, 0x5e, 0xf4, 0x60, 0xab, 0x19, 0xc1, 0x14, 0xdf, 0xcb,
0x50, 0xf5, 0xe3, 0xd1, 0xb5, 0x3e, 0x0f, 0xd9, 0x41, 0x60, 0x9a, 0x5d, 0x85, 0xda, 0xca, 0xd6,
0xcb, 0x4c, 0xaa, 0xa0, 0x62, 0xf0, 0xb9, 0x08, 0x4d, 0x1b, 0xab, 0xd0, 0xc9, 0x9c, 0x7c, 0x06,
0x96, 0x1f, 0xb3, 0xa1, 0xa9, 0x32, 0x85, 0x37, 0xd8, 0x71, 0xda, 0xbb, 0x46, 0x22, 0x5b, 0x95,
0x67, 0x4f, 0xd6, 0x2d, 0x65, 0xa0, 0x9a, 0xd6, 0xf8, 0x15, 0x3b, 0x7f, 0x27, 0x48, 0x13, 0x69,
0x8a, 0xc4, 0x2b, 0x8b, 0xcb, 0x37, 0x70, 0x86, 0xe9, 0xef, 0x1d, 0x16, 0xaa, 0x17, 0xa7, 0x0b,
0xa4, 0x89, 0xcd, 0x95, 0x42, 0x77, 0x13, 0x70, 0x56, 0x4c, 0xb7, 0xca, 0xca, 0x67, 0xad, 0x44,
0x57, 0xd9, 0x7f, 0x56, 0xb0, 0xb9, 0xae, 0x44, 0xc2, 0x3d, 0xc4, 0x5a, 0x9b, 0x3d, 0x52, 0xf3,
0x7d, 0x50, 0xf8, 0xe5, 0x78, 0x27, 0x0f, 0xcc, 0x22, 0x6e, 0x4e, 0x3b, 0xeb, 0x03, 0x7b, 0xbd,
0x25, 0xd8, 0x83, 0x71, 0xb1, 0x2f, 0xd4, 0x2f, 0xc5, 0xf5, 0x19, 0x17, 0x9a, 0x41, 0xbe, 0x02,
0xf0, 0xfc, 0x24, 0x66, 0x12, 0xdd, 0x09, 0x93, 0x87, 0xc2, 0x2b, 0x76, 0x27, 0xa8, 0x19, 0x2f,
0x39, 0x36, 0xb9, 0x8d, 0x0d, 0x90, 0x8d, 0x95, 0x54, 0x3e, 0xb9, 0x3e, 0x75, 0xda, 0xc6, 0xc5,
0xaa, 0x72, 0x81, 0x39, 0xad, 0x8c, 0x2d, 0xb4, 0xe2, 0x32, 0xa3, 0xac, 0xdb, 0xb0, 0xa2, 0x3e,
0xa6, 0xfa, 0x1e, 0x7f, 0xc0, 0xd2, 0x40, 0x26, 0xba, 0x94, 0x9e, 0xf0, 0xd1, 0xa0, 0x5a, 0x70,
0xd7, 0xe0, 0xcc, 0xb9, 0xaa, 0x32, 0x6f, 0xbb, 0xf4, 0xf8, 0xe9, 0xda, 0xdc, 0x9f, 0xf8, 0xfb,
0xfb, 0xe9, 0x5a, 0xe9, 0xbb, 0x67, 0x6b, 0xa5, 0xc7, 0xf8, 0xfb, 0x1d, 0x7f, 0x7f, 0xe1, 0xef,
0xa0, 0xac, 0xff, 0x58, 0x7c, 0xf8, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcc, 0x10, 0x79, 0x5b,
0xb7, 0x0c, 0x00, 0x00,
0x14, 0x24, 0x1c, 0x30, 0xa8, 0x7f, 0xf8, 0x23, 0x70, 0x6c, 0x93, 0x86, 0x92, 0xd4, 0x9a, 0xb4,
0x95, 0x38, 0x59, 0x93, 0xf5, 0xd4, 0x59, 0x65, 0xbd, 0xb3, 0xcc, 0xce, 0xba, 0xca, 0x8d, 0x63,
0xc5, 0x81, 0x1b, 0x47, 0x4e, 0x48, 0x1c, 0x39, 0xf2, 0x19, 0x7a, 0xe4, 0x82, 0xc4, 0xa9, 0xa2,
0xfd, 0x04, 0x48, 0x7c, 0x01, 0xde, 0xcc, 0x8e, 0xed, 0x35, 0xdd, 0xb4, 0x1c, 0x7a, 0xb0, 0x34,
0x7f, 0x7e, 0xbf, 0xf7, 0x66, 0xdf, 0xfb, 0xcd, 0x7b, 0x63, 0x70, 0xe3, 0x88, 0x7b, 0x71, 0x3d,
0x92, 0x42, 0x09, 0x42, 0xfa, 0xc2, 0x3b, 0xe2, 0xb2, 0x1e, 0x3f, 0x64, 0x72, 0x78, 0xe4, 0xab,
0xfa, 0xe8, 0x83, 0xaa, 0xab, 0x8e, 0x23, 0x6e, 0x01, 0xd5, 0xb3, 0x03, 0x31, 0x10, 0x66, 0xb8,
0xa9, 0x47, 0x76, 0xf5, 0x7c, 0x3f, 0x91, 0x4c, 0xf9, 0x22, 0xdc, 0x1c, 0x0f, 0xd2, 0x8d, 0xda,
0x0f, 0x0e, 0x94, 0xf6, 0x44, 0x9f, 0xef, 0xa3, 0x0f, 0xb2, 0x0d, 0x2e, 0x0b, 0x43, 0xa1, 0x0c,
0x20, 0xae, 0x14, 0x36, 0x0a, 0x57, 0xdd, 0xc6, 0x7a, 0xfd, 0x79, 0x97, 0xf5, 0xe6, 0x14, 0xb6,
0xe5, 0x3c, 0x7e, 0xb2, 0x3e, 0x47, 0xb3, 0x4c, 0xf2, 0x3e, 0x38, 0x52, 0x04, 0xbc, 0x32, 0x8f,
0x16, 0x4e, 0x35, 0x2e, 0xe5, 0x59, 0xd0, 0x4e, 0x29, 0x62, 0xa8, 0x41, 0xa2, 0x6b, 0x18, 0xf2,
0xe1, 0x01, 0x97, 0xf1, 0xa1, 0x1f, 0x55, 0x16, 0x0c, 0xef, 0xed, 0x93, 0x78, 0xfa, 0xb0, 0xf5,
0xdd, 0x09, 0x9c, 0x66, 0xa8, 0x64, 0x17, 0xca, 0x6c, 0xc4, 0xfc, 0x80, 0x1d, 0xf8, 0x81, 0xaf,
0x8e, 0x2b, 0x8e, 0x31, 0xf5, 0xce, 0x0b, 0x4d, 0x35, 0x33, 0x04, 0x3a, 0x43, 0xaf, 0xf5, 0x01,
0xa6, 0x8e, 0xc8, 0x5b, 0xb0, 0xd4, 0xed, 0xec, 0xb5, 0x77, 0xf6, 0xb6, 0x57, 0xe7, 0xaa, 0x17,
0xbe, 0xff, 0x69, 0xe3, 0x35, 0x6d, 0x63, 0x0a, 0xe8, 0xf2, 0xb0, 0xef, 0x87, 0x03, 0x72, 0x15,
0x4a, 0xcd, 0x56, 0xab, 0xd3, 0xbd, 0xdb, 0x69, 0xaf, 0x16, 0xaa, 0x55, 0x04, 0x9e, 0x9b, 0x05,
0x36, 0x3d, 0x8f, 0x47, 0x8a, 0xf7, 0xab, 0xce, 0xa3, 0x9f, 0xd7, 0xe6, 0x6a, 0x8f, 0x0a, 0x50,
0xce, 0x1e, 0x02, 0x1d, 0x15, 0x9b, 0xad, 0xbb, 0x3b, 0xf7, 0x3b, 0xe8, 0x67, 0x42, 0xcf, 0x22,
0x9a, 0x9e, 0xf2, 0x47, 0x9c, 0x5c, 0x81, 0xc5, 0x6e, 0xf3, 0xde, 0x7e, 0x07, 0xbd, 0x4c, 0x8e,
0x93, 0x85, 0x75, 0x59, 0x12, 0x1b, 0x54, 0x9b, 0x36, 0x77, 0xf6, 0x56, 0xe7, 0xf3, 0x51, 0x6d,
0xc9, 0xfc, 0xd0, 0x1e, 0xe5, 0x37, 0x07, 0xdc, 0x7d, 0x2e, 0x47, 0xbe, 0xf7, 0x8a, 0x35, 0x71,
0x0d, 0x1c, 0xc5, 0xe2, 0x23, 0xa3, 0x09, 0x37, 0x5f, 0x13, 0x77, 0x71, 0x5f, 0x3b, 0xb5, 0x74,
0x83, 0xd7, 0xca, 0x90, 0x3c, 0x0a, 0x7c, 0x8f, 0x61, 0xbc, 0x8c, 0x32, 0xdc, 0xc6, 0x9b, 0x79,
0x6c, 0x3a, 0x41, 0xd9, 0xf3, 0xdf, 0x9a, 0xa3, 0x19, 0x2a, 0xf9, 0x04, 0x8a, 0x83, 0x40, 0x1c,
0xb0, 0xc0, 0x68, 0xc2, 0x6d, 0x5c, 0xce, 0x33, 0xb2, 0x6d, 0x10, 0x53, 0x03, 0x96, 0x42, 0x6e,
0x40, 0x31, 0x89, 0xfa, 0x68, 0xa7, 0x52, 0x34, 0xe4, 0x8d, 0x3c, 0xf2, 0x3d, 0x83, 0x68, 0x89,
0xf0, 0x81, 0x3f, 0xa0, 0x16, 0x4f, 0xf6, 0xa1, 0x14, 0x72, 0xf5, 0x50, 0xc8, 0xa3, 0xb8, 0xb2,
0xb4, 0xb1, 0x80, 0xdc, 0xeb, 0x79, 0xdc, 0x4c, 0xcc, 0xeb, 0x7b, 0x29, 0xbe, 0xa9, 0x14, 0xf3,
0x0e, 0x87, 0x3c, 0x54, 0xd6, 0xe4, 0xc4, 0x10, 0xf9, 0x14, 0x4a, 0x28, 0xb5, 0x48, 0xf8, 0xa1,
0xaa, 0x94, 0x4e, 0x3e, 0x50, 0xc7, 0x62, 0xb4, 0x55, 0x3a, 0x61, 0x54, 0x6f, 0xc3, 0xf9, 0x13,
0x5c, 0x90, 0x73, 0x50, 0x54, 0x4c, 0x0e, 0xb8, 0x32, 0x99, 0x5e, 0xa6, 0x76, 0x46, 0x2a, 0xb0,
0xc4, 0x02, 0x9f, 0xc5, 0x3c, 0xc6, 0x04, 0x2e, 0xe0, 0xc6, 0x78, 0xba, 0x55, 0x04, 0x67, 0x88,
0x7a, 0xaa, 0x6d, 0xc2, 0x99, 0xe7, 0x32, 0x40, 0xaa, 0x50, 0xb2, 0x19, 0x48, 0xa5, 0xe3, 0xd0,
0xc9, 0xbc, 0x76, 0x1a, 0x56, 0x66, 0xa2, 0x5d, 0xfb, 0x63, 0x1e, 0x4a, 0x63, 0x09, 0x90, 0x26,
0x2c, 0x7b, 0x22, 0x54, 0x28, 0x4c, 0x2e, 0xad, 0xea, 0x72, 0x13, 0xd6, 0x1a, 0x83, 0x34, 0x0b,
0x13, 0x36, 0x65, 0x91, 0x2f, 0x61, 0x59, 0xf2, 0x58, 0x24, 0xd2, 0x33, 0xa7, 0xd6, 0x26, 0xae,
0xe6, 0x0b, 0x27, 0x05, 0x51, 0xfe, 0x6d, 0xe2, 0x4b, 0xae, 0xa3, 0x11, 0xd3, 0x29, 0x15, 0x85,
0xb3, 0x84, 0x13, 0x0c, 0x84, 0x7a, 0x91, 0x72, 0x68, 0x0a, 0xe9, 0x0a, 0xfc, 0xba, 0x63, 0x3a,
0x66, 0x20, 0x79, 0x39, 0x0a, 0x98, 0x67, 0xac, 0x56, 0x16, 0x0d, 0xfd, 0xf5, 0x3c, 0x7a, 0x77,
0x0c, 0xa2, 0x53, 0x3c, 0xb9, 0x09, 0x10, 0x88, 0x41, 0xaf, 0x2f, 0xf1, 0xae, 0x4b, 0xab, 0xbc,
0x6a, 0x1e, 0xbb, 0x6d, 0x10, 0x74, 0x19, 0xd1, 0xe9, 0x70, 0x6b, 0x19, 0x0f, 0x9d, 0x84, 0xca,
0x1f, 0xf2, 0xda, 0xaf, 0x0e, 0xac, 0xcc, 0x84, 0x89, 0x9c, 0x85, 0x45, 0x7f, 0xc8, 0x06, 0xdc,
0x26, 0x39, 0x9d, 0x90, 0x0e, 0x14, 0xb1, 0x22, 0xf0, 0x20, 0x4d, 0xb1, 0xdb, 0x78, 0xef, 0xa5,
0xf1, 0xae, 0x7f, 0x6d, 0xf0, 0x9d, 0x50, 0xc9, 0x63, 0x6a, 0xc9, 0x5a, 0x2a, 0x9e, 0x18, 0x0e,
0x59, 0xa8, 0x6f, 0xab, 0x91, 0x8a, 0x9d, 0x12, 0x02, 0x0e, 0xaa, 0x29, 0xc6, 0x28, 0xea, 0x65,
0x33, 0x26, 0xab, 0xb0, 0xc0, 0xc3, 0x11, 0x46, 0x46, 0x2f, 0xe9, 0xa1, 0x5e, 0xe9, 0xfb, 0xe9,
0xd7, 0xe2, 0x0a, 0x0e, 0x35, 0x0f, 0xcb, 0x98, 0xc4, 0xeb, 0xa3, 0x97, 0xcc, 0x98, 0x5c, 0x87,
0xe2, 0x50, 0xe0, 0x07, 0xc6, 0xa8, 0x7f, 0x7d, 0xd8, 0x0b, 0x79, 0x87, 0xdd, 0xd5, 0x08, 0x5b,
0x4d, 0x2c, 0x9c, 0xdc, 0x82, 0x33, 0xb1, 0x12, 0x51, 0x6f, 0x20, 0x31, 0xca, 0xbd, 0x88, 0x4b,
0x5f, 0xf4, 0x2b, 0xcb, 0x27, 0x17, 0xa5, 0xb6, 0x6d, 0x98, 0xf4, 0xb4, 0xa6, 0x6d, 0x6b, 0x56,
0xd7, 0x90, 0x48, 0x17, 0xca, 0x51, 0x12, 0x04, 0x3d, 0x11, 0xa5, 0xb5, 0x11, 0x8c, 0x91, 0xff,
0x11, 0xb5, 0x2e, 0xb2, 0xee, 0xa4, 0x24, 0xea, 0x46, 0xd3, 0x89, 0xbe, 0x7d, 0x03, 0x29, 0x92,
0x28, 0xae, 0xb8, 0x26, 0x1e, 0x76, 0x56, 0xbd, 0x09, 0x6e, 0x26, 0xd2, 0x3a, 0x42, 0x47, 0xfc,
0xd8, 0x26, 0x4f, 0x0f, 0x75, 0x42, 0x47, 0x2c, 0x48, 0xd2, 0x8e, 0x8b, 0x09, 0x35, 0x93, 0x8f,
0xe7, 0x6f, 0x14, 0xaa, 0x0d, 0x70, 0x33, 0xee, 0xc8, 0x1b, 0xb0, 0x22, 0xf9, 0xc0, 0x8f, 0xd1,
0x4c, 0x8f, 0x25, 0xea, 0xb0, 0xf2, 0x85, 0x21, 0x94, 0xc7, 0x8b, 0x4d, 0x5c, 0xab, 0xfd, 0x83,
0xed, 0x28, 0x5b, 0x3a, 0x48, 0x2b, 0xbd, 0xe3, 0xc6, 0xe3, 0xa9, 0xc6, 0xe6, 0xcb, 0x4a, 0x8d,
0xb9, 0x51, 0x41, 0xa2, 0x3d, 0xee, 0xea, 0x36, 0x6f, 0xc8, 0xe4, 0x23, 0x58, 0x8c, 0x84, 0x54,
0x63, 0x75, 0xad, 0xe5, 0xde, 0x02, 0x04, 0xd8, 0x62, 0x97, 0x82, 0x6b, 0x87, 0x70, 0x6a, 0xd6,
0x1a, 0x76, 0xb3, 0x85, 0xfb, 0x3b, 0x5d, 0x6c, 0x8c, 0x17, 0xb1, 0x97, 0x9d, 0x9f, 0xdd, 0xbc,
0xef, 0x4b, 0x95, 0xb0, 0x60, 0xa7, 0x4b, 0xde, 0xc5, 0x9e, 0xb7, 0xb7, 0x4f, 0x29, 0x76, 0xc6,
0x75, 0xc4, 0x5d, 0x9c, 0xc5, 0xe9, 0x2d, 0x94, 0x43, 0x9f, 0x8a, 0x83, 0x49, 0xe7, 0xfb, 0x71,
0x1e, 0x5c, 0x5b, 0x16, 0x5f, 0x6d, 0xe7, 0xfb, 0x1c, 0x56, 0xd2, 0x1b, 0xdc, 0xf3, 0xcc, 0xa7,
0xd9, 0x5a, 0xf4, 0xa2, 0x8b, 0x5c, 0x4e, 0x09, 0xb6, 0x28, 0x5f, 0x86, 0xb2, 0x1f, 0x8d, 0xae,
0xf5, 0x78, 0xc8, 0x0e, 0x02, 0xdb, 0x04, 0x4b, 0xd4, 0xd5, 0x6b, 0x9d, 0x74, 0x49, 0x17, 0x5a,
0x0c, 0x3e, 0x97, 0xa1, 0x6d, 0x6f, 0x25, 0x3a, 0x99, 0x93, 0xcf, 0xc0, 0xf1, 0x23, 0x36, 0xb4,
0xd5, 0x27, 0xf7, 0x0b, 0x76, 0xba, 0xcd, 0x5d, 0x2b, 0x91, 0xad, 0xd2, 0xb3, 0x27, 0xeb, 0x8e,
0x5e, 0xa0, 0x86, 0x56, 0xfb, 0x05, 0x5f, 0x04, 0xad, 0x20, 0x89, 0x95, 0x2d, 0x1e, 0xaf, 0x2c,
0x2e, 0xdf, 0xc0, 0x19, 0x66, 0xde, 0x41, 0x2c, 0xd4, 0x37, 0xd1, 0x14, 0x4e, 0x1b, 0x9b, 0x2b,
0xb9, 0xe6, 0x26, 0xe0, 0xb4, 0xc8, 0x6e, 0x15, 0xb5, 0xcd, 0x4a, 0x81, 0xae, 0xb2, 0xff, 0xec,
0x60, 0xd3, 0x5d, 0x11, 0xd2, 0x3b, 0xc4, 0x1a, 0x9c, 0x5e, 0x5e, 0xfb, 0x6e, 0xc8, 0x7d, 0x51,
0xde, 0xc9, 0x02, 0xd3, 0x88, 0xdb, 0xd3, 0xce, 0xda, 0xc0, 0x37, 0x80, 0x23, 0xd9, 0x83, 0x71,
0x13, 0xc8, 0xd5, 0x2f, 0xc5, 0xfd, 0x19, 0x13, 0x86, 0x41, 0xbe, 0x02, 0xe8, 0xfb, 0x71, 0xc4,
0x14, 0x9a, 0x93, 0x36, 0x0f, 0xb9, 0x9f, 0xd8, 0x9e, 0xa0, 0x66, 0xac, 0x64, 0xd8, 0xe4, 0x36,
0x36, 0x46, 0x36, 0x56, 0x52, 0xf1, 0xe4, 0xba, 0xd5, 0x6a, 0x5a, 0x13, 0xab, 0xda, 0x04, 0xe6,
0xb4, 0x34, 0x5e, 0xa1, 0x25, 0x8f, 0x59, 0x65, 0xdd, 0x86, 0x15, 0xfd, 0xc8, 0xea, 0xf5, 0xf9,
0x03, 0x96, 0x04, 0x2a, 0x36, 0x25, 0xf6, 0x84, 0xc7, 0x84, 0x6e, 0xcd, 0x6d, 0x8b, 0xb3, 0xe7,
0x2a, 0xab, 0xec, 0xda, 0xa5, 0xc7, 0x4f, 0xd7, 0xe6, 0xfe, 0xc4, 0xdf, 0xdf, 0x4f, 0xd7, 0x0a,
0xdf, 0x3d, 0x5b, 0x2b, 0x3c, 0xc6, 0xdf, 0xef, 0xf8, 0xfb, 0x0b, 0x7f, 0x07, 0x45, 0xf3, 0x87,
0xe3, 0xc3, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xa4, 0x46, 0x7c, 0xf6, 0xcf, 0x0c, 0x00, 0x00,
}

View file

@ -160,8 +160,15 @@ message ContainerSpec {
string dir = 6;
// User specifies the user that should be employed to run the container.
//
// Note that the primary group may be specified by appending the group name
// or id to the user name, separated by a `:`. This syntax is
// `<user>:<group>`.
string user = 7;
// Groups specifies supplementary groups available to the user.
repeated string groups = 11;
repeated Mount mounts = 8 [(gogoproto.nullable) = false];
// StopGracePeriod the grace period for stopping the container before

View file

@ -57,19 +57,17 @@ const (
RootCAExpiration = "630720000s"
// DefaultNodeCertExpiration represents the default expiration for node certificates (3 months)
DefaultNodeCertExpiration = 2160 * time.Hour
// CertBackdate represents the amount of time each certificate is backdated to try to avoid
// clock drift issues.
CertBackdate = 1 * time.Hour
// CertLowerRotationRange represents the minimum fraction of time that we will wait when randomly
// choosing our next certificate rotation
CertLowerRotationRange = 0.5
// CertUpperRotationRange represents the maximum fraction of time that we will wait when randomly
// choosing our next certificate rotation
CertUpperRotationRange = 0.8
// MinNodeCertExpiration represents the minimum expiration for node certificates (25 + 5 minutes)
// X - 5 > CertUpperRotationRange * X <=> X < 5/(1 - CertUpperRotationRange)
// Since we're issuing certificates 5 minutes in the past to get around clock drifts, and
// we're selecting a random rotation distribution range from CertLowerRotationRange to
// CertUpperRotationRange, we need to ensure that we don't accept an expiration time that will
// make a node able to randomly choose the next rotation after the expiration of the certificate.
MinNodeCertExpiration = 30 * time.Minute
// MinNodeCertExpiration represents the minimum expiration for node certificates
MinNodeCertExpiration = 1 * time.Hour
)
// ErrNoLocalRootCA is an error type used to indicate that the local root CA

View file

@ -109,12 +109,6 @@ func (s *SecurityConfig) UpdateRootCA(cert, key []byte, certExpiry time.Duration
return err
}
// DefaultPolicy is the default policy used by the signers to ensure that the only fields
// from the remote CSRs we trust are: PublicKey, PublicKeyAlgorithm and SignatureAlgorithm.
func DefaultPolicy() *cfconfig.Signing {
return SigningPolicy(DefaultNodeCertExpiration)
}
// SigningPolicy creates a policy used by the signer to ensure that the only fields
// from the remote CSRs we trust are: PublicKey, PublicKeyAlgorithm and SignatureAlgorithm.
// It receives the duration a certificate will be valid for
@ -124,10 +118,14 @@ func SigningPolicy(certExpiry time.Duration) *cfconfig.Signing {
certExpiry = DefaultNodeCertExpiration
}
// Add the backdate
certExpiry = certExpiry + CertBackdate
return &cfconfig.Signing{
Default: &cfconfig.SigningProfile{
Usage: []string{"signing", "key encipherment", "server auth", "client auth"},
Expiry: certExpiry,
Usage: []string{"signing", "key encipherment", "server auth", "client auth"},
Expiry: certExpiry,
Backdate: CertBackdate,
// Only trust the key components from the CSR. Everything else should
// come directly from API call params.
CSRWhitelist: &cfconfig.CSRWhitelist{
@ -396,7 +394,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string,
// calculateRandomExpiry returns a random duration between 50% and 80% of the original
// duration
func calculateRandomExpiry(expiresIn time.Duration) time.Duration {
if expiresIn.Minutes() < 1 {
if expiresIn.Minutes() <= 1 {
return time.Second
}

View file

@ -306,8 +306,7 @@ func (s *Server) Run(ctx context.Context) error {
s.mu.Unlock()
defer s.wg.Done()
logger := log.G(ctx).WithField("module", "ca")
ctx = log.WithLogger(ctx, logger)
ctx = log.WithModule(ctx, "ca")
// Retrieve the channels to keep track of changes in the cluster
// Retrieve all the currently registered nodes

View file

@ -1,6 +1,8 @@
package log
import (
"path"
"github.com/Sirupsen/logrus"
"golang.org/x/net/context"
)
@ -16,7 +18,10 @@ var (
L = logrus.NewEntry(logrus.StandardLogger())
)
type loggerKey struct{}
type (
loggerKey struct{}
moduleKey struct{}
)
// WithLogger returns a new context with the provided logger. Use in
// combination with logger.WithField(s) for great effect.
@ -35,3 +40,42 @@ func GetLogger(ctx context.Context) *logrus.Entry {
return logger.(*logrus.Entry)
}
// WithModule adds the module to the context, appending it with a slash if a
// module already exists. A module is just an roughly correlated defined by the
// call tree for a given context.
//
// As an example, we might have a "node" module already part of a context. If
// this function is called with "tls", the new value of module will be
// "node/tls".
//
// Modules represent the call path. If the new module and last module are the
// same, a new module entry will not be created. If the new module and old
// older module are the same but separated by other modules, the cycle will be
// represented by the module path.
func WithModule(ctx context.Context, module string) context.Context {
parent := GetModulePath(ctx)
if parent != "" {
// don't re-append module when module is the same.
if path.Base(parent) == module {
return ctx
}
module = path.Join(parent, module)
}
ctx = WithLogger(ctx, GetLogger(ctx).WithField("module", module))
return context.WithValue(ctx, moduleKey{}, module)
}
// GetModulePath returns the module path for the provided context. If no module
// is set, an empty string is returned.
func GetModulePath(ctx context.Context) string {
module := ctx.Value(moduleKey{})
if module == nil {
return ""
}
return module.(string)
}

View file

@ -9,10 +9,10 @@
// allocation, they all have to agree on that. The way this achieved
// in `allocator` is by creating a `taskBallot` to which all task
// allocators register themselves as mandatory voters. For each task
// that needs allocation, each allocator indepdently votes to indicate
// that needs allocation, each allocator independently votes to indicate
// the completion of their allocation. Once all registered voters have
// voted then the task is moved to ALLOCATED state.
//
// Other than the coordination needed for task ALLOCATED state, all
// the allocators function fairly indepdently.
// the allocators function fairly independently.
package allocator

View file

@ -193,7 +193,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}
for _, s := range services {
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
continue
}
@ -304,7 +304,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateService:
s := v.Service.Copy()
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
break
}
@ -315,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventUpdateService:
s := v.Service.Copy()
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
break
}
@ -326,13 +326,13 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventDeleteService:
s := v.Service.Copy()
if serviceAllocationNeeded(s, nc) {
break
}
if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
log.G(ctx).Errorf("Failed deallocation during delete of service %s: %v", s.ID, err)
}
// Remove it from unallocatedServices just in case
// it's still there.
delete(nc.unallocatedServices, s.ID)
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, nc, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
@ -382,23 +382,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
}
}
// serviceAllocationNeeded returns if a service needs allocation or not.
func serviceAllocationNeeded(s *api.Service, nc *networkContext) bool {
// Service needs allocation if:
// Spec has network attachments and endpoint resolution mode is VIP OR
// Spec has non-zero number of exposed ports and ingress routing is SwarmPort
if (len(s.Spec.Networks) != 0 &&
(s.Spec.Endpoint == nil ||
(s.Spec.Endpoint != nil &&
s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP))) ||
(s.Spec.Endpoint != nil &&
len(s.Spec.Endpoint.Ports) != 0) {
return !nc.nwkAllocator.IsServiceAllocated(s)
}
return false
}
// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
@ -420,7 +403,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo
// network configured or service endpoints have been
// allocated.
return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
(s == nil || !serviceAllocationNeeded(s, nc))
(s == nil || nc.nwkAllocator.IsServiceAllocated(s))
}
func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
@ -599,6 +582,22 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
return err
}
// If the service doesn't expose ports any more and if we have
// any lingering virtual IP references for ingress network
// clean them up here.
if s.Spec.Endpoint == nil || len(s.Spec.Endpoint.Ports) == 0 {
if s.Endpoint != nil {
for i, vip := range s.Endpoint.VirtualIPs {
if vip.NetworkID == nc.ingressNetwork.ID {
n := len(s.Endpoint.VirtualIPs)
s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
break
}
}
}
}
if err := a.store.Update(func(tx store.Tx) error {
for {
err := store.UpdateService(tx, s)
@ -670,7 +669,7 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return nil, fmt.Errorf("could not find service %s", t.ServiceID)
}
if serviceAllocationNeeded(s, nc) {
if !nc.nwkAllocator.IsServiceAllocated(s) {
return nil, fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
}
@ -733,7 +732,7 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
for _, s := range nc.unallocatedServices {
if serviceAllocationNeeded(s, nc) {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if err := a.allocateService(ctx, nc, s); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
continue

View file

@ -165,15 +165,29 @@ func (na *NetworkAllocator) ServiceAllocate(s *api.Service) (err error) {
}
}()
// If ResolutionMode is DNSRR do not try allocating VIPs.
if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin {
return
}
if s.Endpoint == nil {
s.Endpoint = &api.Endpoint{
Spec: s.Spec.Endpoint.Copy(),
s.Endpoint = &api.Endpoint{}
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
// If ResolutionMode is DNSRR do not try allocating VIPs, but
// free any VIP from previous state.
if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin {
if s.Endpoint != nil {
for _, vip := range s.Endpoint.VirtualIPs {
if err := na.deallocateVIP(vip); err != nil {
// don't bail here, deallocate as many as possible.
log.L.WithError(err).
WithField("vip.network", vip.NetworkID).
WithField("vip.addr", vip.Addr).Error("error deallocating vip")
}
}
s.Endpoint.VirtualIPs = nil
}
delete(na.services, s.ID)
return
}
// First allocate VIPs for all the pre-populated endpoint attachments
@ -198,7 +212,6 @@ outer:
s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, vip)
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
na.services[s.ID] = struct{}{}
return
@ -232,7 +245,7 @@ func (na *NetworkAllocator) IsAllocated(n *api.Network) bool {
return ok
}
// IsTaskAllocated returns if the passed task has it's network resources allocated or not.
// IsTaskAllocated returns if the passed task has its network resources allocated or not.
func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
// If the task is not found in the allocated set, then it is
// not allocated.
@ -245,7 +258,7 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
return false
}
// To determine whether the task has it's resources allocated,
// To determine whether the task has its resources allocated,
// we just need to look at one network(in case of
// multi-network attachment). This is because we make sure we
// allocate for every network or we allocate for none.
@ -269,13 +282,30 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
return true
}
// IsServiceAllocated returns if the passed service has it's network resources allocated or not.
// IsServiceAllocated returns if the passed service has its network resources allocated or not.
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
if _, ok := na.services[s.ID]; !ok {
return false
// If endpoint mode is VIP and allocator does not have the
// service in VIP allocated set then it is not allocated.
if len(s.Spec.Networks) != 0 &&
(s.Spec.Endpoint == nil ||
s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP) {
if _, ok := na.services[s.ID]; !ok {
return false
}
}
if s.Spec.Endpoint != nil {
// If the endpoint mode is DNSRR and allocator has the service
// in VIP allocated set then we return not allocated to make
// sure the allocator triggers networkallocator to free up the
// resources if any.
if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin {
if _, ok := na.services[s.ID]; ok {
return false
}
}
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return na.portAllocator.isPortsAllocated(s)
}

View file

@ -186,7 +186,7 @@ func (s *Server) ListClusters(ctx context.Context, request *api.ListClustersRequ
}
// redactClusters is a method that enforces a whitelist of fields that are ok to be
// returned in the Cluster object. It should filter out all senstive information.
// returned in the Cluster object. It should filter out all sensitive information.
func redactClusters(clusters []*api.Cluster) []*api.Cluster {
var redactedClusters []*api.Cluster
// Only add public fields to the new clusters

View file

@ -0,0 +1,12 @@
package hackpicker
// AddrSelector is interface which should track cluster for its leader address.
type AddrSelector interface {
LeaderAddr() (string, error)
}
// RaftCluster is interface which combines useful methods for clustering.
type RaftCluster interface {
AddrSelector
IsLeader() bool
}

View file

@ -0,0 +1,141 @@
// Package hackpicker is temporary solution to provide more seamless experience
// for controlapi. It has drawback of slow reaction to leader change, but it
// tracks leader automatically without erroring out to client.
package hackpicker
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)
// picker always picks address of cluster leader.
type picker struct {
mu sync.Mutex
addr string
raft AddrSelector
conn *grpc.Conn
cc *grpc.ClientConn
}
// Init does initial processing for the Picker, e.g., initiate some connections.
func (p *picker) Init(cc *grpc.ClientConn) error {
p.cc = cc
return nil
}
func (p *picker) initConn() error {
if p.conn == nil {
conn, err := grpc.NewConn(p.cc)
if err != nil {
return err
}
p.conn = conn
}
return nil
}
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// or some error happens.
func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Lock()
if err := p.initConn(); err != nil {
p.mu.Unlock()
return nil, err
}
p.mu.Unlock()
addr, err := p.raft.LeaderAddr()
if err != nil {
return nil, err
}
p.mu.Lock()
if p.addr != addr {
p.addr = addr
p.conn.NotifyReset()
}
p.mu.Unlock()
return p.conn.Wait(ctx)
}
// PickAddr picks a peer address for connecting. This will be called repeated for
// connecting/reconnecting.
func (p *picker) PickAddr() (string, error) {
addr, err := p.raft.LeaderAddr()
if err != nil {
return "", err
}
p.mu.Lock()
p.addr = addr
p.mu.Unlock()
return addr, nil
}
// State returns the connectivity state of the underlying connections.
func (p *picker) State() (grpc.ConnectivityState, error) {
return p.conn.State(), nil
}
// WaitForStateChange blocks until the state changes to something other than
// the sourceState. It returns the new state or error.
func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
return p.conn.WaitForStateChange(ctx, sourceState)
}
// Reset the current connection and force a reconnect to another address.
func (p *picker) Reset() error {
p.conn.NotifyReset()
return nil
}
// Close closes all the Conn's owned by this Picker.
func (p *picker) Close() error {
return p.conn.Close()
}
// ConnSelector is struct for obtaining connection with raftpicker.
type ConnSelector struct {
mu sync.Mutex
cc *grpc.ClientConn
cluster RaftCluster
opts []grpc.DialOption
}
// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which
// will be used for Dial on first call of Conn.
func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector {
return &ConnSelector{
cluster: cluster,
opts: opts,
}
}
// Conn returns *grpc.ClientConn with picker which picks raft cluster leader.
// Internal connection estabilished lazily on this call.
// It can return error if cluster wasn't ready at the moment of initial call.
func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cc != nil {
return c.cc, nil
}
addr, err := c.cluster.LeaderAddr()
if err != nil {
return nil, err
}
picker := &picker{raft: c.cluster, addr: addr}
opts := append(c.opts, grpc.WithPicker(picker))
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
c.cc = cc
return c.cc, nil
}
// Reset does nothing for hackpicker.
func (c *ConnSelector) Reset() error {
return nil
}

View file

@ -182,7 +182,7 @@ func validateServiceSpec(spec *api.ServiceSpec) error {
// checkPortConflicts does a best effort to find if the passed in spec has port
// conflicts with existing services.
func (s *Server) checkPortConflicts(spec *api.ServiceSpec) error {
func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) error {
if spec.Endpoint == nil {
return nil
}
@ -215,17 +215,21 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec) error {
}
for _, service := range services {
// If service ID is the same (and not "") then this is an update
if serviceID != "" && serviceID == service.ID {
continue
}
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
}
}
if service.Endpoint != nil {
for _, pc := range service.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
}
}
@ -243,7 +247,7 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.checkPortConflicts(request.Spec); err != nil {
if err := s.checkPortConflicts(request.Spec, ""); err != nil {
return nil, err
}
@ -309,7 +313,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
if request.Spec.Endpoint != nil && !reflect.DeepEqual(request.Spec.Endpoint, service.Spec.Endpoint) {
if err := s.checkPortConflicts(request.Spec); err != nil {
if err := s.checkPortConflicts(request.Spec, request.ServiceID); err != nil {
return nil, err
}
}

View file

@ -59,7 +59,7 @@ var (
)
// Config is configuration for Dispatcher. For default you should use
// DefautConfig.
// DefaultConfig.
type Config struct {
HeartbeatPeriod time.Duration
HeartbeatEpsilon time.Duration
@ -79,13 +79,20 @@ func DefaultConfig() *Config {
}
}
// Cluster is interface which represent raft cluster. mananger/state/raft.Node
// is implenents it. This interface needed only for easier unit-testing.
// Cluster is interface which represent raft cluster. manager/state/raft.Node
// is implements it. This interface needed only for easier unit-testing.
type Cluster interface {
GetMemberlist() map[uint64]*api.RaftMember
MemoryStore() *store.MemoryStore
}
// nodeUpdate provides a new status and/or description to apply to a node
// object.
type nodeUpdate struct {
status *api.NodeStatus
description *api.NodeDescription
}
// Dispatcher is responsible for dispatching tasks and tracking agent health.
type Dispatcher struct {
mu sync.Mutex
@ -103,7 +110,14 @@ type Dispatcher struct {
taskUpdates map[string]*api.TaskStatus // indexed by task ID
taskUpdatesLock sync.Mutex
processTaskUpdatesTrigger chan struct{}
nodeUpdates map[string]nodeUpdate // indexed by node ID
nodeUpdatesLock sync.Mutex
processUpdatesTrigger chan struct{}
// for waiting for the next task/node batch update
processUpdatesLock sync.Mutex
processUpdatesCond *sync.Cond
}
// weightedPeerByNodeID is a sort wrapper for []*api.WeightedPeer
@ -118,16 +132,21 @@ func (b weightedPeerByNodeID) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
// New returns Dispatcher with cluster interface(usually raft.Node).
// NOTE: each handler which does something with raft must add to Dispatcher.wg
func New(cluster Cluster, c *Config) *Dispatcher {
return &Dispatcher{
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
store: cluster.MemoryStore(),
cluster: cluster,
mgrQueue: watch.NewQueue(16),
keyMgrQueue: watch.NewQueue(16),
taskUpdates: make(map[string]*api.TaskStatus),
processTaskUpdatesTrigger: make(chan struct{}, 1),
config: c,
d := &Dispatcher{
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
store: cluster.MemoryStore(),
cluster: cluster,
mgrQueue: watch.NewQueue(16),
keyMgrQueue: watch.NewQueue(16),
taskUpdates: make(map[string]*api.TaskStatus),
nodeUpdates: make(map[string]nodeUpdate),
processUpdatesTrigger: make(chan struct{}, 1),
config: c,
}
d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
return d
}
func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
@ -157,10 +176,9 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.mu.Unlock()
return fmt.Errorf("dispatcher is already running")
}
logger := log.G(ctx).WithField("module", "dispatcher")
ctx = log.WithLogger(ctx, logger)
ctx = log.WithModule(ctx, "dispatcher")
if err := d.markNodesUnknown(ctx); err != nil {
logger.Errorf(`failed to move all nodes to "unknown" state: %v`, err)
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
}
configWatcher, cancel, err := store.ViewAndWatch(
d.store,
@ -214,11 +232,11 @@ func (d *Dispatcher) Run(ctx context.Context) error {
select {
case <-publishTicker.C:
publishManagers()
case <-d.processTaskUpdatesTrigger:
d.processTaskUpdates()
case <-d.processUpdatesTrigger:
d.processUpdates()
batchTimer.Reset(maxBatchInterval)
case <-batchTimer.C:
d.processTaskUpdates()
d.processUpdates()
batchTimer.Reset(maxBatchInterval)
case v := <-configWatcher:
cluster := v.(state.EventUpdateCluster)
@ -251,6 +269,14 @@ func (d *Dispatcher) Stop() error {
d.cancel()
d.mu.Unlock()
d.nodes.Clean()
d.processUpdatesLock.Lock()
// In case there are any waiters. There is no chance of any starting
// after this point, because they check if the context is canceled
// before waiting.
d.processUpdatesCond.Broadcast()
d.processUpdatesLock.Unlock()
return nil
}
@ -340,26 +366,39 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
return "", err
}
// create or update node in store
// TODO(stevvooe): Validate node specification.
var node *api.Node
err := d.store.Update(func(tx store.Tx) error {
d.store.View(func(tx store.ReadTx) {
node = store.GetNode(tx, nodeID)
if node == nil {
return ErrNodeNotFound
}
node.Description = description
node.Status = api.NodeStatus{
State: api.NodeStatus_READY,
}
return store.UpdateNode(tx, node)
})
if err != nil {
return "", err
if node == nil {
return "", ErrNodeNotFound
}
d.nodeUpdatesLock.Lock()
d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description}
numUpdates := len(d.nodeUpdates)
d.nodeUpdatesLock.Unlock()
if numUpdates >= maxBatchItems {
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
return "", d.ctx.Err()
}
}
// Wait until the node update batch happens before unblocking register.
d.processUpdatesLock.Lock()
select {
case <-d.ctx.Done():
return "", d.ctx.Err()
default:
}
d.processUpdatesCond.Wait()
d.processUpdatesLock.Unlock()
expireFunc := func() {
nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"}
log.G(ctx).Debugf("heartbeat expiration")
@ -444,23 +483,39 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
d.taskUpdatesLock.Unlock()
if numUpdates >= maxBatchItems {
d.processTaskUpdatesTrigger <- struct{}{}
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
}
}
return nil, nil
}
func (d *Dispatcher) processTaskUpdates() {
func (d *Dispatcher) processUpdates() {
var (
taskUpdates map[string]*api.TaskStatus
nodeUpdates map[string]nodeUpdate
)
d.taskUpdatesLock.Lock()
if len(d.taskUpdates) == 0 {
d.taskUpdatesLock.Unlock()
return
if len(d.taskUpdates) != 0 {
taskUpdates = d.taskUpdates
d.taskUpdates = make(map[string]*api.TaskStatus)
}
taskUpdates := d.taskUpdates
d.taskUpdates = make(map[string]*api.TaskStatus)
d.taskUpdatesLock.Unlock()
d.nodeUpdatesLock.Lock()
if len(d.nodeUpdates) != 0 {
nodeUpdates = d.nodeUpdates
d.nodeUpdates = make(map[string]nodeUpdate)
}
d.nodeUpdatesLock.Unlock()
if len(taskUpdates) == 0 && len(nodeUpdates) == 0 {
return
}
log := log.G(d.ctx).WithFields(logrus.Fields{
"method": "(*Dispatcher).processTaskUpdates",
"method": "(*Dispatcher).processUpdates",
})
_, err := d.store.Batch(func(batch *store.Batch) error {
@ -494,14 +549,45 @@ func (d *Dispatcher) processTaskUpdates() {
return nil
})
if err != nil {
log.WithError(err).Error("dispatcher transaction failed")
log.WithError(err).Error("dispatcher task update transaction failed")
}
}
for nodeID, nodeUpdate := range nodeUpdates {
err := batch.Update(func(tx store.Tx) error {
logger := log.WithField("node.id", nodeID)
node := store.GetNode(tx, nodeID)
if node == nil {
logger.Errorf("node unavailable")
return nil
}
if nodeUpdate.status != nil {
node.Status = *nodeUpdate.status
}
if nodeUpdate.description != nil {
node.Description = nodeUpdate.description
}
if err := store.UpdateNode(tx, node); err != nil {
logger.WithError(err).Error("failed to update node status")
return nil
}
logger.Debug("node status updated")
return nil
})
if err != nil {
log.WithError(err).Error("dispatcher node update transaction failed")
}
}
return nil
})
if err != nil {
log.WithError(err).Error("dispatcher batch failed")
}
d.processUpdatesCond.Broadcast()
}
// Tasks is a stream of tasks state for node. Each message contains full list
@ -595,7 +681,10 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
modificationCnt++
case state.EventUpdateTask:
if oldTask, exists := tasksMap[v.Task.ID]; exists {
if equality.TasksEqualStable(oldTask, v.Task) {
// States ASSIGNED and below are set by the orchestrator/scheduler,
// not the agent, so tasks in these states need to be sent to the
// agent even if nothing else has changed.
if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
// this update should not trigger action at agent
tasksMap[v.Task.ID] = v.Task
continue
@ -632,17 +721,17 @@ func (d *Dispatcher) nodeRemove(id string, status api.NodeStatus) error {
if err := d.isRunningLocked(); err != nil {
return err
}
// TODO(aaronl): Is it worth batching node removals?
err := d.store.Update(func(tx store.Tx) error {
node := store.GetNode(tx, id)
if node == nil {
return errors.New("node not found")
d.nodeUpdatesLock.Lock()
d.nodeUpdates[id] = nodeUpdate{status: status.Copy(), description: d.nodeUpdates[id].description}
numUpdates := len(d.nodeUpdates)
d.nodeUpdatesLock.Unlock()
if numUpdates >= maxBatchItems {
select {
case d.processUpdatesTrigger <- struct{}{}:
case <-d.ctx.Done():
}
node.Status = status
return store.UpdateNode(tx, node)
})
if err != nil {
return fmt.Errorf("failed to update node %s status to down: %v", id, err)
}
if rn := d.nodes.Delete(id); rn == nil {

View file

@ -122,7 +122,6 @@ func (k *KeyManager) updateKey(cluster *api.Cluster) error {
}
func (k *KeyManager) rotateKey(ctx context.Context) error {
log := log.G(ctx).WithField("module", "keymanager")
var (
clusters []*api.Cluster
err error
@ -132,7 +131,7 @@ func (k *KeyManager) rotateKey(ctx context.Context) error {
})
if err != nil {
log.Errorf("reading cluster config failed, %v", err)
log.G(ctx).Errorf("reading cluster config failed, %v", err)
return err
}
@ -173,7 +172,7 @@ func (k *KeyManager) rotateKey(ctx context.Context) error {
// Run starts the keymanager, it doesn't return
func (k *KeyManager) Run(ctx context.Context) error {
k.mu.Lock()
log := log.G(ctx).WithField("module", "keymanager")
ctx = log.WithModule(ctx, "keymanager")
var (
clusters []*api.Cluster
err error
@ -183,7 +182,7 @@ func (k *KeyManager) Run(ctx context.Context) error {
})
if err != nil {
log.Errorf("reading cluster config failed, %v", err)
log.G(ctx).Errorf("reading cluster config failed, %v", err)
k.mu.Unlock()
return err
}
@ -196,7 +195,7 @@ func (k *KeyManager) Run(ctx context.Context) error {
}
}
if err := k.updateKey(cluster); err != nil {
log.Errorf("store update failed %v", err)
log.G(ctx).Errorf("store update failed %v", err)
}
} else {
k.keyRing.lClock = cluster.EncryptionKeyLamportClock

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/controlapi/hackpicker"
"github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
@ -350,11 +351,13 @@ func (m *Manager) Run(parent context.Context) error {
// creating the allocator but then use it anyway.
}
go func(keyManager *keymanager.KeyManager) {
if err := keyManager.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("keymanager failed with an error")
}
}(m.keyManager)
if m.keyManager != nil {
go func(keyManager *keymanager.KeyManager) {
if err := keyManager.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("keymanager failed with an error")
}
}(m.keyManager)
}
go func(d *dispatcher.Dispatcher) {
if err := d.Run(ctx); err != nil {
@ -385,14 +388,17 @@ func (m *Manager) Run(parent context.Context) error {
log.G(ctx).WithError(err).Error("scheduler exited with an error")
}
}(m.scheduler)
go func(taskReaper *orchestrator.TaskReaper) {
taskReaper.Run()
}(m.taskReaper)
go func(orchestrator *orchestrator.ReplicatedOrchestrator) {
if err := orchestrator.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
}
}(m.replicatedOrchestrator)
go func(globalOrchestrator *orchestrator.GlobalOrchestrator) {
if err := globalOrchestrator.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
@ -420,21 +426,34 @@ func (m *Manager) Run(parent context.Context) error {
m.scheduler.Stop()
m.scheduler = nil
m.keyManager.Stop()
m.keyManager = nil
if m.keyManager != nil {
m.keyManager.Stop()
m.keyManager = nil
}
}
m.mu.Unlock()
}
}()
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTimeout(5 * time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
// We need special connSelector for controlapi because it provides automatic
// leader tracking.
// Other APIs are using connSelector which errors out on leader change, but
// allows to react quickly to reelections.
controlAPIProxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
controlAPIConnSelector := hackpicker.NewConnSelector(m.RaftNode, controlAPIProxyOpts...)
authorize := func(ctx context.Context, roles []string) error {
// Authorize the remote roles, ensure they can only be forwarded by managers
_, err := ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization())
@ -464,7 +483,7 @@ func (m *Manager) Run(parent context.Context) error {
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, controlAPIConnSelector, m.RaftNode, forwardAsOwnRequest)
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!

View file

@ -2,6 +2,7 @@ package orchestrator
import (
"container/list"
"errors"
"sync"
"time"
@ -76,6 +77,9 @@ func (r *RestartSupervisor) waitRestart(ctx context.Context, oldDelay *delayedSt
if t == nil {
return nil
}
if t.DesiredState > api.TaskStateRunning {
return nil
}
service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
@ -108,6 +112,13 @@ func (r *RestartSupervisor) Restart(ctx context.Context, tx store.Tx, cluster *a
}
r.mu.Unlock()
// Sanity check: was the task shut down already by a separate call to
// Restart? If so, we must avoid restarting it, because this will create
// an extra task. This should never happen unless there is a bug.
if t.DesiredState > api.TaskStateRunning {
return errors.New("Restart called on task that was already shut down")
}
t.DesiredState = api.TaskStateShutdown
err := store.UpdateTask(tx, &t)
if err != nil {

View file

@ -163,7 +163,7 @@ func (r *ReplicatedOrchestrator) tickTasks(ctx context.Context) {
})
if err != nil {
log.G(ctx).WithError(err).Errorf("orchestator task removal batch failed")
log.G(ctx).WithError(err).Errorf("orchestrator task removal batch failed")
}
r.restartTasks = make(map[string]struct{})

View file

@ -4,136 +4,43 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)
// picker always picks address of cluster leader.
type picker struct {
mu sync.Mutex
addr string
raft AddrSelector
conn *grpc.Conn
stop chan struct{}
done chan struct{}
// Interface is interface to replace implementation with controlapi/hackpicker.
// TODO: it should be done cooler.
type Interface interface {
Conn() (*grpc.ClientConn, error)
Reset() error
}
func newPicker(raft AddrSelector, addr string) *picker {
return &picker{
raft: raft,
addr: addr,
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Init does initial processing for the Picker, e.g., initiate some connections.
func (p *picker) Init(cc *grpc.ClientConn) error {
conn, err := grpc.NewConn(cc)
if err != nil {
return err
}
p.conn = conn
return nil
}
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// or some error happens.
func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
if err := p.updateConn(); err != nil {
return nil, err
}
return p.conn.Wait(ctx)
}
// PickAddr picks a peer address for connecting. This will be called repeated for
// connecting/reconnecting.
func (p *picker) PickAddr() (string, error) {
addr, err := p.raft.LeaderAddr()
if err != nil {
return "", err
}
p.mu.Lock()
p.addr = addr
p.mu.Unlock()
return addr, nil
}
// State returns the connectivity state of the underlying connections.
func (p *picker) State() (grpc.ConnectivityState, error) {
return p.conn.State(), nil
}
// WaitForStateChange blocks until the state changes to something other than
// the sourceState. It returns the new state or error.
func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
return p.conn.WaitForStateChange(ctx, sourceState)
}
// Reset the current connection and force a reconnect to another address.
func (p *picker) Reset() error {
p.conn.NotifyReset()
return nil
}
// Close closes all the Conn's owned by this Picker.
func (p *picker) Close() error {
close(p.stop)
<-p.done
return p.conn.Close()
}
func (p *picker) updateConn() error {
addr, err := p.raft.LeaderAddr()
if err != nil {
return err
}
p.mu.Lock()
if p.addr != addr {
p.addr = addr
p.Reset()
}
p.mu.Unlock()
return nil
}
func (p *picker) updateLoop() {
defer close(p.done)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.updateConn()
case <-p.stop:
return
}
}
}
// ConnSelector is struct for obtaining connection with raftpicker.
// ConnSelector is struct for obtaining connection connected to cluster leader.
type ConnSelector struct {
mu sync.Mutex
cc *grpc.ClientConn
cluster RaftCluster
opts []grpc.DialOption
picker *picker
cc *grpc.ClientConn
addr string
stop chan struct{}
}
// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which
// will be used for Dial on first call of Conn.
// will be used for connection create.
func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector {
return &ConnSelector{
cs := &ConnSelector{
cluster: cluster,
opts: opts,
stop: make(chan struct{}),
}
go cs.updateLoop()
return cs
}
// Conn returns *grpc.ClientConn with picker which picks raft cluster leader.
// Internal connection estabilished lazily on this call.
// Conn returns *grpc.ClientConn which connected to cluster leader.
// It can return error if cluster wasn't ready at the moment of initial call.
func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
c.mu.Lock()
@ -145,23 +52,76 @@ func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
c.picker = newPicker(c.cluster, addr)
go c.picker.updateLoop()
opts := append(c.opts, grpc.WithPicker(c.picker))
cc, err := grpc.Dial(addr, opts...)
cc, err := grpc.Dial(addr, c.opts...)
if err != nil {
return nil, err
}
c.cc = cc
return c.cc, nil
c.addr = addr
return cc, nil
}
// Stop cancels tracking loop for raftpicker and closes it.
func (c *ConnSelector) Stop() {
// Reset recreates underlying connection.
func (c *ConnSelector) Reset() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.picker == nil {
return
if c.cc != nil {
c.cc.Close()
c.cc = nil
}
addr, err := c.cluster.LeaderAddr()
if err != nil {
logrus.WithError(err).Errorf("error obtaining leader address")
return err
}
cc, err := grpc.Dial(addr, c.opts...)
if err != nil {
logrus.WithError(err).Errorf("error reestabilishing connection to leader")
return err
}
c.cc = cc
c.addr = addr
return nil
}
// Stop cancels updating connection loop.
func (c *ConnSelector) Stop() {
close(c.stop)
}
func (c *ConnSelector) updateConn() error {
addr, err := c.cluster.LeaderAddr()
if err != nil {
return err
}
c.mu.Lock()
defer c.mu.Unlock()
if c.addr != addr {
if c.cc != nil {
c.cc.Close()
c.cc = nil
}
conn, err := grpc.Dial(addr, c.opts...)
if err != nil {
return err
}
c.cc = conn
c.addr = addr
}
return nil
}
func (c *ConnSelector) updateLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.updateConn(); err != nil {
logrus.WithError(err).Errorf("error reestabilishing connection to leader")
}
case <-c.stop:
return
}
}
c.picker.Close()
}

View file

@ -61,8 +61,8 @@ func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
tasksByNode := make(map[string]map[string]*api.Task)
for _, t := range tasks {
// Ignore all tasks that have not reached ALLOCATED
// state.
if t.Status.State < api.TaskStateAllocated {
// state and tasks that no longer consume resources.
if t.Status.State < api.TaskStateAllocated || t.Status.State > api.TaskStateRunning {
continue
}
@ -109,8 +109,31 @@ func (s *Scheduler) Run(ctx context.Context) error {
// Queue all unassigned tasks before processing changes.
s.tick(ctx)
const (
// commitDebounceGap is the amount of time to wait between
// commit events to debounce them.
commitDebounceGap = 50 * time.Millisecond
// maxLatency is a time limit on the debouncing.
maxLatency = time.Second
)
var (
debouncingStarted time.Time
commitDebounceTimer *time.Timer
commitDebounceTimeout <-chan time.Time
)
pendingChanges := 0
schedule := func() {
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
}
}
// Watch for changes.
for {
select {
@ -131,15 +154,25 @@ func (s *Scheduler) Run(ctx context.Context) error {
case state.EventDeleteNode:
s.nodeHeap.remove(v.Node.ID)
case state.EventCommit:
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
if commitDebounceTimer != nil {
if time.Since(debouncingStarted) > maxLatency {
commitDebounceTimer.Stop()
commitDebounceTimer = nil
commitDebounceTimeout = nil
schedule()
} else {
commitDebounceTimer.Reset(commitDebounceGap)
}
} else {
commitDebounceTimer = time.NewTimer(commitDebounceGap)
commitDebounceTimeout = commitDebounceTimer.C
debouncingStarted = time.Now()
}
}
case <-commitDebounceTimeout:
schedule()
commitDebounceTimer = nil
commitDebounceTimeout = nil
case <-s.stopChan:
return nil
}

View file

@ -87,27 +87,23 @@ type Node struct {
StateDir string
Error error
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
wasLeader bool
restored bool
isMember uint32
joinAddr string
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
restored bool
signalledLeadership uint32
isMember uint32
joinAddr string
// waitProp waits for all the proposals to be terminated before
// shutting down the node.
waitProp sync.WaitGroup
// forceNewCluster is a special flag used to recover from disaster
// scenario by pointing to an existing or backed up data directory.
forceNewCluster bool
confState raftpb.ConfState
appliedIndex uint64
snapshotIndex uint64
@ -118,7 +114,7 @@ type Node struct {
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
removeRaftOnce sync.Once
removeRaftFunc func()
leadershipBroadcast *events.Broadcaster
// used to coordinate shutdown
@ -192,7 +188,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
MaxInflightMsgs: cfg.MaxInflightMsgs,
Logger: cfg.Logger,
},
forceNewCluster: opts.ForceNewCluster,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
removeRaftCh: make(chan struct{}),
@ -215,6 +210,15 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()
n.removeRaftFunc = func(n *Node) func() {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
}
}(n)
return n
}
@ -329,6 +333,8 @@ func (n *Node) Run(ctx context.Context) error {
close(n.doneCh)
}()
wasLeader := false
for {
select {
case <-n.ticker.C():
@ -348,9 +354,11 @@ func (n *Node) Run(ctx context.Context) error {
n.Config.Logger.Error(err)
}
// Send raft messages to peers
if err := n.send(rd.Messages); err != nil {
n.Config.Logger.Error(err)
if len(rd.Messages) != 0 {
// Send raft messages to peers
if err := n.send(rd.Messages); err != nil {
n.Config.Logger.Error(err)
}
}
// Apply snapshot to memory store. The snapshot
@ -358,7 +366,7 @@ func (n *Node) Run(ctx context.Context) error {
// saveToStorage.
if !raft.IsEmptySnap(rd.Snapshot) {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(rd.Snapshot.Data, n.forceNewCluster); err != nil {
if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
n.Config.Logger.Error(err)
}
n.appliedIndex = rd.Snapshot.Metadata.Index
@ -387,12 +395,23 @@ func (n *Node) Run(ctx context.Context) error {
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if n.wasLeader && rd.SoftState.RaftState != raft.StateLeader {
n.wasLeader = false
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
n.leadershipBroadcast.Write(IsFollower)
} else if !n.wasLeader && rd.SoftState.RaftState == raft.StateLeader {
n.wasLeader = true
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
@ -451,17 +470,6 @@ func (n *Node) Shutdown() {
}
}
// isShutdown indicates if node was shut down.
// This method should be called under n.stopMu to avoid races with n.stop().
func (n *Node) isShutdown() bool {
select {
case <-n.Ctx.Done():
return true
default:
return false
}
}
func (n *Node) stop() {
n.stopMu.Lock()
defer n.stopMu.Unlock()
@ -476,16 +484,18 @@ func (n *Node) stop() {
_ = member.Conn.Close()
}
}
n.Stop()
n.ticker.Stop()
if err := n.wal.Close(); err != nil {
n.Config.Logger.Errorf("raft: error closing WAL: %v", err)
}
atomic.StoreUint32(&n.isMember, 0)
// TODO(stevvooe): Handle ctx.Done()
}
// IsLeader checks if we are the leader or not
func (n *Node) IsLeader() bool {
// isLeader checks if we are the leader or not, without the protection of lock
func (n *Node) isLeader() bool {
if !n.IsMember() {
return false
}
@ -496,14 +506,43 @@ func (n *Node) IsLeader() bool {
return false
}
// Leader returns the id of the leader
func (n *Node) Leader() uint64 {
// IsLeader checks if we are the leader or not, with the protection of lock
func (n *Node) IsLeader() bool {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
return n.isLeader()
}
// leader returns the id of the leader, without the protection of lock
func (n *Node) leader() uint64 {
if !n.IsMember() {
return 0
}
return n.Node.Status().Lead
}
// Leader returns the id of the leader, with the protection of lock
func (n *Node) Leader() uint64 {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
return n.leader()
}
// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
return atomic.LoadUint32(&n.signalledLeadership) == 1
}
func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}
// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
@ -534,12 +573,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, ErrNoRaftMember
}
if n.IsStopped() {
log.WithError(ErrStopped).Errorf(ErrStopped.Error())
return nil, ErrStopped
}
if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}
@ -670,11 +704,7 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
return nil, ErrNoRaftMember
}
if n.IsStopped() {
return nil, ErrStopped
}
if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}
@ -717,12 +747,24 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
}
// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
}
if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
}
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
@ -731,10 +773,6 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
return nil, ErrNoRaftMember
}
if n.IsStopped() {
return nil, ErrStopped
}
if err := n.Step(n.Ctx, *msg.Message); err != nil {
return nil, err
}
@ -772,21 +810,16 @@ func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressReques
// LeaderAddr returns address of current cluster leader.
// With this method Node satisfies raftpicker.AddrSelector interface.
func (n *Node) LeaderAddr() (string, error) {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if n.isShutdown() {
return "", fmt.Errorf("raft node is shut down")
}
ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second)
defer cancel()
if err := WaitForLeader(ctx, n); err != nil {
return "", ErrNoClusterLeader
}
if n.IsStopped() {
return "", ErrStopped
if !n.IsMember() {
return "", ErrNoRaftMember
}
ms := n.cluster.Members()
l := ms[n.Leader()]
l := ms[n.leader()]
if l == nil {
return "", ErrNoClusterLeader
}
@ -864,6 +897,13 @@ func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction,
// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return nil
}
status := n.Node.Status()
return &api.Version{Index: status.Commit}
}
@ -921,14 +961,6 @@ func (n *Node) IsMember() bool {
return atomic.LoadUint32(&n.isMember) == 1
}
// IsStopped checks if the raft node is stopped or not
func (n *Node) IsStopped() bool {
if n.Node == nil {
return true
}
return false
}
// canSubmitProposal defines if any more proposals
// could be submitted and processed.
func (n *Node) canSubmitProposal() bool {
@ -980,6 +1012,14 @@ func (n *Node) send(messages []raftpb.Message) error {
continue
}
if m.Type == raftpb.MsgProp {
// We don't forward proposals to the leader. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
continue
}
n.asyncTasks.Add(1)
go n.sendToMember(members, m)
}
@ -1044,15 +1084,14 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
_, err := conn.ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if n.IsStopped() {
panic("node is nil")
if !n.IsMember() {
// node is removed from cluster or stopped
return
}
n.ReportUnreachable(m.To)
@ -1091,7 +1130,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
ch := n.wait.register(r.ID, cb)
// Do this check after calling register to avoid a race.
if !n.IsLeader() {
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
@ -1262,13 +1301,15 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
// a follower and the leader steps down, Campaign
// to be the leader.
if cc.NodeID == n.Leader() && !n.IsLeader() {
if cc.NodeID == n.leader() && !n.isLeader() {
if err = n.Campaign(n.Ctx); err != nil {
return err
}
}
if cc.NodeID == n.Config.ID {
n.removeRaftFunc()
// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()

View file

@ -185,7 +185,7 @@ func clip(x float64) float64 {
func (mwr *remotesWeightedRandom) observe(peer api.Peer, weight float64) {
// While we have a decent, ad-hoc approach here to weight subsequent
// observerations, we may want to look into applying forward decay:
// observations, we may want to look into applying forward decay:
//
// http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
//