Vendor engine-api and swarmkit for 1.12.1

Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
Tibor Vass 2016-08-11 12:45:17 -07:00
parent af4ff2541a
commit bdc0a24156
33 changed files with 1244 additions and 387 deletions

View file

@ -60,7 +60,7 @@ clone git golang.org/x/net 2beffdc2e92c8a3027590f898fe88f69af48a3f8 https://gith
clone git golang.org/x/sys eb2c74142fd19a79b3f237334c7384d5167b1b46 https://github.com/golang/sys.git
clone git github.com/docker/go-units 651fc226e7441360384da338d0fd37f2440ffbe3
clone git github.com/docker/go-connections fa2850ff103453a9ad190da0df0af134f0314b3d
clone git github.com/docker/engine-api 3d1601b9d2436a70b0dfc045a23f6503d19195df
clone git github.com/docker/engine-api 4eca04ae18f4f93f40196a17b9aa6e11262a7269
clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837
clone git github.com/imdario/mergo 0.2.1
@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267
# cluster
clone git github.com/docker/swarmkit 9d4c2f73124e70f8fa85f9076635b827d17b109f
clone git github.com/docker/swarmkit 3708fb309aacfff321759bcdcc99b0f57806d27f
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b

View file

@ -94,7 +94,7 @@ type NetworkAPIClient interface {
type NodeAPIClient interface {
NodeInspectWithRaw(ctx context.Context, nodeID string) (swarm.Node, []byte, error)
NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
NodeRemove(ctx context.Context, nodeID string) error
NodeRemove(ctx context.Context, nodeID string, options types.NodeRemoveOptions) error
NodeUpdate(ctx context.Context, nodeID string, version swarm.Version, node swarm.NodeSpec) error
}

View file

@ -1,10 +1,21 @@
package client
import "golang.org/x/net/context"
import (
"net/url"
"github.com/docker/engine-api/types"
"golang.org/x/net/context"
)
// NodeRemove removes a Node.
func (cli *Client) NodeRemove(ctx context.Context, nodeID string) error {
resp, err := cli.delete(ctx, "/nodes/"+nodeID, nil, nil)
func (cli *Client) NodeRemove(ctx context.Context, nodeID string, options types.NodeRemoveOptions) error {
query := url.Values{}
if options.Force {
query.Set("force", "1")
}
resp, err := cli.delete(ctx, "/nodes/"+nodeID, query, nil)
ensureReaderClosed(resp)
return err
}

View file

@ -246,6 +246,11 @@ type NodeListOptions struct {
Filter filters.Args
}
// NodeRemoveOptions holds parameters to remove nodes with.
type NodeRemoveOptions struct {
Force bool
}
// ServiceCreateOptions contains the options to use when creating a service.
type ServiceCreateOptions struct {
// EncodedRegistryAuth is the encoded registry authorization credentials to

View file

@ -12,7 +12,7 @@ import (
)
const (
initialSessionFailureBackoff = time.Second
initialSessionFailureBackoff = 100 * time.Millisecond
maxSessionFailureBackoff = 8 * time.Second
)
@ -197,11 +197,6 @@ func (a *Agent) run(ctx context.Context) {
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
@ -218,6 +213,7 @@ func (a *Agent) run(ctx context.Context) {
if a.err == nil {
a.err = ctx.Err()
}
session.close()
return
}

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/picker"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Config provides values for an Agent.
@ -19,15 +19,6 @@ type Config struct {
// updated with managers weights as observed by the agent.
Managers picker.Remotes
// Conn specifies the client connection Agent will use.
Conn *grpc.ClientConn
// Picker is the picker used by Conn.
// TODO(aaronl): This is only part of the config to allow resetting the
// GRPC connection. This should be refactored to address the coupling
// between Conn and Picker.
Picker *picker.Picker
// Executor specifies the executor to use for the agent.
Executor exec.Executor
@ -36,11 +27,14 @@ type Config struct {
// NotifyRoleChange channel receives new roles from session messages.
NotifyRoleChange chan<- api.NodeRole
// Credentials is credentials for grpc connection to manager.
Credentials credentials.TransportAuthenticator
}
func (c *Config) validate() error {
if c.Conn == nil {
return fmt.Errorf("agent: Connection is required")
if c.Credentials == nil {
return fmt.Errorf("agent: Credentials is required")
}
if c.Executor == nil {

View file

@ -2,11 +2,11 @@ package exec
import (
"fmt"
"reflect"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/pkg/errors"
@ -186,7 +186,7 @@ func Do(ctx context.Context, task *api.Task, ctlr Controller) (*api.TaskStatus,
defer func() {
logStateChange(ctx, task.DesiredState, task.Status.State, status.State)
if !reflect.DeepEqual(status, task.Status) {
if !equality.TaskStatusesEqualStable(status, &task.Status) {
status.Timestamp = ptypes.MustTimestampProto(time.Now())
}
}()

View file

@ -187,7 +187,7 @@ func (n *Node) run(ctx context.Context) (err error) {
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
if n.config.JoinAddr != "" {
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, 1)
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, picker.DefaultObservationWeight)
}
}
@ -361,31 +361,21 @@ func (n *Node) Err(ctx context.Context) error {
}
func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
var manager api.Peer
select {
case <-ctx.Done():
case manager = <-n.remotes.WaitSelect(ctx):
case <-n.remotes.WaitSelect(ctx):
}
if ctx.Err() != nil {
return ctx.Err()
}
picker := picker.NewPicker(n.remotes, manager.Addr)
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
if err != nil {
return err
}
agent, err := New(&Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
Executor: n.config.Executor,
DB: db,
Conn: conn,
Picker: picker,
NotifyRoleChange: n.roleChangeReq,
Credentials: creds,
})
if err != nil {
return err
@ -647,7 +637,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, 5)
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)

View file

@ -6,6 +6,7 @@ import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -27,6 +28,9 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
@ -41,12 +45,27 @@ type session struct {
func newSession(ctx context.Context, agent *Agent, delay time.Duration) *session {
s := &session{
agent: agent,
errs: make(chan error),
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
tasks: make(chan *api.TasksMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc
go s.run(ctx, delay)
return s
@ -77,8 +96,6 @@ func (s *session) run(ctx context.Context, delay time.Duration) {
func (s *session) start(ctx context.Context) error {
log.G(ctx).Debugf("(*session).start")
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
@ -103,6 +120,8 @@ func (s *session) start(ctx context.Context) error {
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
@ -133,7 +152,7 @@ func (s *session) start(ctx context.Context) error {
func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()
@ -195,7 +214,7 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess
func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
watch, err := client.Tasks(ctx, &api.TasksRequest{
SessionID: s.sessionID})
if err != nil {
@ -221,7 +240,7 @@ func (s *session) watch(ctx context.Context) error {
// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
@ -262,7 +281,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
n := batchSize
if len(updates) < n {
@ -285,6 +304,10 @@ func (s *session) close() error {
case <-s.closed:
return errSessionClosed
default:
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight)
s.conn.Close()
}
close(s.closed)
return nil
}

View file

@ -668,12 +668,12 @@ func encodeVarintCa(data []byte, offset int, v uint64) int {
type raftProxyCAServer struct {
local CAServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyCAServer(local CAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer {
func NewRaftProxyCAServer(local CAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) CAServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -724,17 +724,30 @@ func (p *raftProxyCAServer) GetRootCACertificate(ctx context.Context, r *GetRoot
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewCAClient(conn).GetRootCACertificate(ctx, r)
}
type raftProxyNodeCAServer struct {
local NodeCAServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer {
func NewRaftProxyNodeCAServer(local NodeCAServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) NodeCAServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -785,6 +798,19 @@ func (p *raftProxyNodeCAServer) IssueNodeCertificate(ctx context.Context, r *Iss
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewNodeCAClient(conn).IssueNodeCertificate(ctx, r)
}
@ -801,6 +827,19 @@ func (p *raftProxyNodeCAServer) NodeCertificateStatus(ctx context.Context, r *No
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewNodeCAClient(conn).NodeCertificateStatus(ctx, r)
}

View file

@ -106,6 +106,7 @@ func (*UpdateNodeResponse) Descriptor() ([]byte, []int) { return fileDescriptorC
// RemoveNodeRequest requests to delete the specified node from store.
type RemoveNodeRequest struct {
NodeID string `protobuf:"bytes,1,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
Force bool `protobuf:"varint,2,opt,name=force,proto3" json:"force,omitempty"`
}
func (m *RemoveNodeRequest) Reset() { *m = RemoveNodeRequest{} }
@ -786,6 +787,7 @@ func (m *RemoveNodeRequest) Copy() *RemoveNodeRequest {
o := &RemoveNodeRequest{
NodeID: m.NodeID,
Force: m.Force,
}
return o
@ -1473,9 +1475,10 @@ func (this *RemoveNodeRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 5)
s := make([]string, 0, 6)
s = append(s, "&api.RemoveNodeRequest{")
s = append(s, "NodeID: "+fmt.Sprintf("%#v", this.NodeID)+",\n")
s = append(s, "Force: "+fmt.Sprintf("%#v", this.Force)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -2938,6 +2941,16 @@ func (m *RemoveNodeRequest) MarshalTo(data []byte) (int, error) {
i = encodeVarintControl(data, i, uint64(len(m.NodeID)))
i += copy(data[i:], m.NodeID)
}
if m.Force {
data[i] = 0x10
i++
if m.Force {
data[i] = 1
} else {
data[i] = 0
}
i++
}
return i, nil
}
@ -4226,12 +4239,12 @@ func encodeVarintControl(data []byte, offset int, v uint64) int {
type raftProxyControlServer struct {
local ControlServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyControlServer(local ControlServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer {
func NewRaftProxyControlServer(local ControlServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) ControlServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -4282,6 +4295,19 @@ func (p *raftProxyControlServer) GetNode(ctx context.Context, r *GetNodeRequest)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetNode(ctx, r)
}
@ -4298,6 +4324,19 @@ func (p *raftProxyControlServer) ListNodes(ctx context.Context, r *ListNodesRequ
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListNodes(ctx, r)
}
@ -4314,6 +4353,19 @@ func (p *raftProxyControlServer) UpdateNode(ctx context.Context, r *UpdateNodeRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateNode(ctx, r)
}
@ -4330,6 +4382,19 @@ func (p *raftProxyControlServer) RemoveNode(ctx context.Context, r *RemoveNodeRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveNode(ctx, r)
}
@ -4346,6 +4411,19 @@ func (p *raftProxyControlServer) GetTask(ctx context.Context, r *GetTaskRequest)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetTask(ctx, r)
}
@ -4362,6 +4440,19 @@ func (p *raftProxyControlServer) ListTasks(ctx context.Context, r *ListTasksRequ
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListTasks(ctx, r)
}
@ -4378,6 +4469,19 @@ func (p *raftProxyControlServer) RemoveTask(ctx context.Context, r *RemoveTaskRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveTask(ctx, r)
}
@ -4394,6 +4498,19 @@ func (p *raftProxyControlServer) GetService(ctx context.Context, r *GetServiceRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetService(ctx, r)
}
@ -4410,6 +4527,19 @@ func (p *raftProxyControlServer) ListServices(ctx context.Context, r *ListServic
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListServices(ctx, r)
}
@ -4426,6 +4556,19 @@ func (p *raftProxyControlServer) CreateService(ctx context.Context, r *CreateSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).CreateService(ctx, r)
}
@ -4442,6 +4585,19 @@ func (p *raftProxyControlServer) UpdateService(ctx context.Context, r *UpdateSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateService(ctx, r)
}
@ -4458,6 +4614,19 @@ func (p *raftProxyControlServer) RemoveService(ctx context.Context, r *RemoveSer
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveService(ctx, r)
}
@ -4474,6 +4643,19 @@ func (p *raftProxyControlServer) GetNetwork(ctx context.Context, r *GetNetworkRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetNetwork(ctx, r)
}
@ -4490,6 +4672,19 @@ func (p *raftProxyControlServer) ListNetworks(ctx context.Context, r *ListNetwor
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListNetworks(ctx, r)
}
@ -4506,6 +4701,19 @@ func (p *raftProxyControlServer) CreateNetwork(ctx context.Context, r *CreateNet
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).CreateNetwork(ctx, r)
}
@ -4522,6 +4730,19 @@ func (p *raftProxyControlServer) RemoveNetwork(ctx context.Context, r *RemoveNet
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).RemoveNetwork(ctx, r)
}
@ -4538,6 +4759,19 @@ func (p *raftProxyControlServer) GetCluster(ctx context.Context, r *GetClusterRe
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).GetCluster(ctx, r)
}
@ -4554,6 +4788,19 @@ func (p *raftProxyControlServer) ListClusters(ctx context.Context, r *ListCluste
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).ListClusters(ctx, r)
}
@ -4570,6 +4817,19 @@ func (p *raftProxyControlServer) UpdateCluster(ctx context.Context, r *UpdateClu
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewControlClient(conn).UpdateCluster(ctx, r)
}
@ -4692,6 +4952,9 @@ func (m *RemoveNodeRequest) Size() (n int) {
if l > 0 {
n += 1 + l + sovControl(uint64(l))
}
if m.Force {
n += 2
}
return n
}
@ -5286,6 +5549,7 @@ func (this *RemoveNodeRequest) String() string {
}
s := strings.Join([]string{`&RemoveNodeRequest{`,
`NodeID:` + fmt.Sprintf("%v", this.NodeID) + `,`,
`Force:` + fmt.Sprintf("%v", this.Force) + `,`,
`}`,
}, "")
return s
@ -6617,6 +6881,26 @@ func (m *RemoveNodeRequest) Unmarshal(data []byte) error {
}
m.NodeID = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Force", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowControl
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Force = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipControl(data[iNdEx:])
@ -10521,99 +10805,100 @@ var (
)
var fileDescriptorControl = []byte{
// 1498 bytes of a gzipped FileDescriptorProto
// 1512 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x59, 0xcf, 0x6f, 0x1b, 0xc5,
0x17, 0xaf, 0x9d, 0x34, 0x8e, 0x9f, 0x6b, 0xb7, 0x9e, 0xba, 0xfa, 0x46, 0x6e, 0xbf, 0x09, 0xda,
0x17, 0xaf, 0x9d, 0x34, 0x8e, 0x9f, 0x6b, 0xb7, 0x9e, 0xba, 0xfa, 0x46, 0x6e, 0xbf, 0x0d, 0xda,
0xd2, 0x34, 0x91, 0x82, 0x03, 0x8e, 0x2a, 0x02, 0x48, 0x20, 0x9c, 0xd0, 0xca, 0xd0, 0x86, 0x6a,
0xd3, 0x02, 0xb7, 0xc8, 0xb1, 0xa7, 0x61, 0xf1, 0x8f, 0x35, 0xbb, 0x9b, 0xb4, 0x11, 0x17, 0x38,
0xd3, 0x02, 0xb7, 0xc8, 0xb1, 0x27, 0x61, 0xf1, 0x8f, 0x35, 0xbb, 0x9b, 0xb4, 0x11, 0x17, 0x38,
0x20, 0xf1, 0x27, 0x70, 0xe5, 0xca, 0x81, 0x7f, 0x81, 0x6b, 0xc4, 0x89, 0x0b, 0x12, 0xa7, 0x88,
0xf6, 0xc4, 0x09, 0xf1, 0x17, 0x20, 0xe6, 0xc7, 0x9b, 0xdd, 0xf5, 0x7a, 0x76, 0x6d, 0x27, 0x41,
0xe9, 0xc1, 0xca, 0xee, 0xcc, 0xe7, 0xfd, 0x98, 0x79, 0x9f, 0xf7, 0xf6, 0xcd, 0x04, 0xf2, 0x4d,
0xbb, 0xe7, 0x39, 0x76, 0xa7, 0xd2, 0x77, 0x6c, 0xcf, 0x26, 0xa4, 0x65, 0x37, 0xdb, 0xd4, 0xa9,
0xb8, 0x4f, 0x1b, 0x4e, 0xb7, 0x6d, 0x79, 0x95, 0x83, 0x37, 0xca, 0x39, 0xb7, 0x4f, 0x9b, 0xae,
0x04, 0x94, 0xf3, 0xf6, 0xee, 0x17, 0xb4, 0xe9, 0xa9, 0xd7, 0x9c, 0x77, 0xd8, 0xa7, 0xea, 0xa5,
0xb4, 0x67, 0xef, 0xd9, 0xe2, 0x71, 0x95, 0x3f, 0xe1, 0xe8, 0xd5, 0x7e, 0x67, 0x7f, 0xcf, 0xea,
0xad, 0xca, 0x3f, 0x72, 0xd0, 0xb8, 0x03, 0x85, 0x7b, 0xd4, 0xdb, 0xb2, 0x5b, 0xd4, 0xa4, 0x5f,
0xee, 0x53, 0xd7, 0x23, 0x37, 0x21, 0xd3, 0x63, 0xaf, 0x3b, 0x56, 0x6b, 0x2e, 0xf5, 0x4a, 0x6a,
0x29, 0x5b, 0x83, 0x17, 0xc7, 0x0b, 0x33, 0x1c, 0x51, 0xdf, 0x34, 0x67, 0xf8, 0x54, 0xbd, 0x65,
0xbc, 0x07, 0x97, 0x7d, 0x31, 0xb7, 0x6f, 0xf7, 0x5c, 0x4a, 0x56, 0x60, 0x9a, 0x4f, 0x0a, 0xa1,
0x5c, 0x75, 0xae, 0x32, 0xbc, 0x80, 0x8a, 0xc0, 0x0b, 0x94, 0x71, 0x3c, 0x05, 0x57, 0xee, 0x5b,
0xae, 0x50, 0xe1, 0x2a, 0xd3, 0x77, 0x21, 0xf3, 0xc4, 0xea, 0x78, 0xd4, 0x71, 0x51, 0xcb, 0x8a,
0x4e, 0x4b, 0x54, 0xac, 0x72, 0x57, 0xca, 0x98, 0x4a, 0xb8, 0xfc, 0xcd, 0x14, 0x64, 0x70, 0x90,
0x94, 0xe0, 0x62, 0xaf, 0xd1, 0xa5, 0x5c, 0xe3, 0xd4, 0x52, 0xd6, 0x94, 0x2f, 0x64, 0x15, 0x72,
0x56, 0x6b, 0xa7, 0xef, 0xd0, 0x27, 0xd6, 0x33, 0x36, 0x97, 0xe6, 0x73, 0xb5, 0x02, 0x5b, 0x28,
0xd4, 0x37, 0x1f, 0xe2, 0xa8, 0x09, 0x56, 0x4b, 0x3d, 0x93, 0x87, 0x30, 0xd3, 0x69, 0xec, 0xd2,
0x8e, 0x3b, 0x37, 0xc5, 0xb0, 0xb9, 0xea, 0xfa, 0x24, 0x9e, 0x55, 0xee, 0x0b, 0xd1, 0x0f, 0x58,
0x80, 0x0f, 0x4d, 0xd4, 0x43, 0xea, 0x90, 0xeb, 0xd2, 0xee, 0x2e, 0x9b, 0xfe, 0xdc, 0xea, 0xbb,
0x73, 0xd3, 0x4c, 0x6d, 0xa1, 0x7a, 0x3b, 0x6e, 0xdb, 0xb6, 0x59, 0xe8, 0x2b, 0x0f, 0x7c, 0xbc,
0x19, 0x96, 0x25, 0x55, 0xb8, 0xc8, 0x98, 0xc3, 0xd6, 0x71, 0x51, 0x28, 0xb9, 0x11, 0xbb, 0xf7,
0x0c, 0x64, 0x4a, 0x28, 0x0b, 0x73, 0x9e, 0x6f, 0x45, 0xb0, 0x07, 0x33, 0x62, 0x7f, 0x2e, 0xf1,
0x41, 0xb5, 0xea, 0xf2, 0x5b, 0x90, 0x0b, 0xb9, 0x4e, 0xae, 0xc0, 0x54, 0x9b, 0x1e, 0x4a, 0x5a,
0x98, 0xfc, 0x91, 0xef, 0xee, 0x41, 0xa3, 0xb3, 0x4f, 0xd9, 0x0e, 0xf2, 0x31, 0xf9, 0xf2, 0x76,
0x7a, 0x3d, 0x65, 0x6c, 0x40, 0x31, 0xb4, 0x1d, 0xc8, 0x91, 0x0a, 0x0b, 0x06, 0x1f, 0x10, 0xc1,
0x48, 0x22, 0x89, 0x84, 0x19, 0x3f, 0xa6, 0xa0, 0xf8, 0xb8, 0xdf, 0x6a, 0x78, 0x74, 0x52, 0x86,
0x92, 0x77, 0xe1, 0x92, 0x00, 0x1d, 0xb0, 0x4d, 0xb2, 0xec, 0x9e, 0x70, 0x30, 0x57, 0xbd, 0xae,
0xb3, 0xf8, 0x89, 0x84, 0x98, 0x39, 0x2e, 0x80, 0x2f, 0xe4, 0x75, 0x98, 0xe6, 0xe9, 0xc6, 0xc2,
0xcd, 0xe5, 0x6e, 0x24, 0xc5, 0xc5, 0x14, 0x48, 0xa3, 0x06, 0x24, 0xec, 0xeb, 0x89, 0xd2, 0x62,
0x1d, 0x8a, 0x26, 0xed, 0xda, 0x07, 0x13, 0xaf, 0xd7, 0x28, 0x01, 0x09, 0x4b, 0x4a, 0xeb, 0x98,
0xde, 0x8f, 0x1a, 0x6e, 0x3b, 0xa4, 0xcc, 0x63, 0xaf, 0x11, 0x65, 0x1c, 0xc1, 0x95, 0xf1, 0x29,
0x3f, 0xbd, 0xa5, 0x58, 0xb0, 0x0e, 0x3e, 0x99, 0xb4, 0x0e, 0x81, 0x17, 0xa8, 0x60, 0x1d, 0x13,
0x9b, 0xf6, 0xd7, 0x11, 0xb6, 0x6e, 0xfc, 0x83, 0xe5, 0x82, 0x0f, 0x9e, 0xa0, 0x5c, 0x84, 0xc5,
0x86, 0xcb, 0xc5, 0x0f, 0xe7, 0x58, 0x2e, 0x74, 0x9e, 0x69, 0xcb, 0x05, 0x73, 0xc1, 0xa5, 0xce,
0x81, 0xd5, 0xe4, 0x3c, 0x90, 0xe5, 0x02, 0x5d, 0xd8, 0x96, 0xc3, 0xf5, 0x4d, 0xe6, 0x02, 0x42,
0xea, 0x2d, 0x97, 0x2c, 0xc2, 0x2c, 0xb2, 0x46, 0xd6, 0x85, 0x6c, 0x2d, 0xc7, 0xd0, 0x19, 0x49,
0x1b, 0xb6, 0x7a, 0xc9, 0x1b, 0x97, 0x6c, 0x42, 0x81, 0xa5, 0x9a, 0xe5, 0xd0, 0xd6, 0x8e, 0xeb,
0x31, 0xf6, 0xca, 0x4a, 0x50, 0xa8, 0xfe, 0x3f, 0x2e, 0xc4, 0xdb, 0x1c, 0x65, 0xe6, 0x51, 0x48,
0xbc, 0x69, 0xca, 0x49, 0xe6, 0x3f, 0x29, 0x27, 0xb8, 0x5d, 0x41, 0x39, 0xe1, 0xac, 0x49, 0x2c,
0x27, 0x82, 0x46, 0x12, 0x66, 0x7c, 0x04, 0xa5, 0x0d, 0x87, 0x32, 0x7f, 0x71, 0xcb, 0x14, 0x91,
0xd6, 0x30, 0xd7, 0x25, 0x8b, 0x16, 0x74, 0x6a, 0x50, 0x22, 0x94, 0xee, 0x5b, 0x70, 0x2d, 0xa2,
0x0c, 0xbd, 0xba, 0x03, 0x19, 0x0c, 0x03, 0x2a, 0xbc, 0x9e, 0xa0, 0xd0, 0x54, 0x58, 0xe3, 0x7d,
0x28, 0xb2, 0x9c, 0x8b, 0x78, 0xb6, 0x02, 0x10, 0x44, 0x1d, 0xb3, 0x26, 0xcf, 0xc2, 0x98, 0xf5,
0x83, 0x6e, 0x66, 0xfd, 0x98, 0xb3, 0xf5, 0x91, 0xb0, 0x8a, 0xd3, 0xf9, 0xf3, 0x73, 0x0a, 0x4a,
0xb2, 0x9e, 0x9d, 0xc6, 0x27, 0x46, 0xaf, 0xcb, 0x0a, 0x3d, 0x41, 0x29, 0x2e, 0xa0, 0x8c, 0xaa,
0xc6, 0x6b, 0x03, 0xd5, 0x78, 0xfc, 0x08, 0x45, 0x16, 0x70, 0xba, 0x1d, 0xd9, 0x84, 0x92, 0x2c,
0x4d, 0xa7, 0x0a, 0xd2, 0xff, 0xe0, 0x5a, 0x44, 0x0b, 0xd6, 0xb8, 0x3f, 0xd3, 0x70, 0x95, 0x73,
0x1c, 0xc7, 0xfd, 0x32, 0x57, 0x8f, 0x96, 0xb9, 0xd5, 0xb8, 0x62, 0x12, 0x91, 0x1c, 0xae, 0x74,
0xdf, 0xa6, 0xcf, 0xbc, 0xd2, 0x6d, 0x47, 0x2a, 0xdd, 0x3b, 0x13, 0x3a, 0xa7, 0x2d, 0x76, 0x43,
0xd5, 0x64, 0xfa, 0x6c, 0xab, 0xc9, 0xc7, 0x50, 0x1a, 0x74, 0x09, 0x89, 0xf1, 0x26, 0xcc, 0x62,
0xa0, 0x54, 0x4d, 0x49, 0x64, 0x86, 0x0f, 0x0e, 0x2a, 0xcb, 0x16, 0xf5, 0x9e, 0xda, 0x4e, 0x7b,
0x82, 0xca, 0x82, 0x12, 0xba, 0xca, 0xe2, 0x2b, 0x0b, 0x78, 0xdb, 0x93, 0x43, 0x49, 0xbc, 0x55,
0x52, 0x0a, 0x6b, 0x3c, 0x16, 0x95, 0x25, 0xe2, 0x19, 0x61, 0x7d, 0x09, 0xdb, 0x4d, 0xdc, 0x2f,
0xf1, 0xcc, 0x89, 0x8c, 0x32, 0x9c, 0xc8, 0xe9, 0x80, 0xc8, 0x28, 0xcb, 0x89, 0x8c, 0x00, 0xbf,
0xda, 0x9c, 0x91, 0x8f, 0x9f, 0xa9, 0xdc, 0x3a, 0x73, 0x37, 0xfd, 0x7c, 0x8b, 0x78, 0xea, 0xe7,
0x1b, 0x8e, 0x9f, 0x20, 0xdf, 0x22, 0x92, 0x2f, 0x57, 0xbe, 0xc5, 0x38, 0x77, 0x9e, 0xf9, 0x16,
0xb8, 0x14, 0xe4, 0x1b, 0x06, 0x2a, 0x31, 0xdf, 0x54, 0xe4, 0x7c, 0x30, 0x7e, 0x2c, 0x37, 0x3a,
0xfb, 0x2e, 0x5b, 0x53, 0xa8, 0x0e, 0x37, 0xe5, 0x48, 0xa4, 0x0e, 0x23, 0x8e, 0xf3, 0x02, 0x01,
0x3e, 0x7d, 0x7d, 0x15, 0x01, 0x7d, 0x11, 0x92, 0x44, 0x5f, 0x25, 0xa5, 0xb0, 0x3e, 0x97, 0x70,
0xe2, 0x04, 0x5c, 0x8a, 0x48, 0xbe, 0x5c, 0x5c, 0x8a, 0x71, 0xee, 0x3c, 0xb9, 0x14, 0xb8, 0x14,
0x70, 0x09, 0xa3, 0x91, 0xc8, 0x25, 0x15, 0x3a, 0x1f, 0x6c, 0xec, 0x43, 0xf1, 0x43, 0xdb, 0xea,
0x3d, 0xb2, 0xdb, 0xb4, 0x67, 0xda, 0xac, 0x9d, 0xe5, 0x0d, 0x47, 0x05, 0xae, 0x3a, 0xfc, 0x99,
0xee, 0x70, 0xc2, 0x31, 0x46, 0x79, 0x7c, 0x5a, 0x78, 0x38, 0x6b, 0x16, 0xe5, 0xd4, 0xa7, 0x62,
0x46, 0xc8, 0xb1, 0xe3, 0x62, 0x09, 0xf1, 0xdd, 0x46, 0xaf, 0xb1, 0xe7, 0x0b, 0xa4, 0x85, 0x00,
0x91, 0x73, 0x0f, 0xe4, 0x94, 0x90, 0x30, 0xbe, 0x4b, 0xab, 0xfe, 0xea, 0x34, 0x34, 0xe6, 0xfd,
0x95, 0x42, 0x4f, 0xd2, 0x5f, 0xa1, 0xcc, 0x04, 0xfd, 0x15, 0x5a, 0x0f, 0xbe, 0x53, 0xe4, 0x1e,
0xcc, 0x3a, 0xb8, 0x5f, 0x2c, 0xc8, 0x5c, 0xf0, 0x96, 0x4e, 0x70, 0x68, 0x73, 0x6b, 0xd3, 0x47,
0xc7, 0x0b, 0x17, 0x4c, 0x5f, 0x38, 0x68, 0xd4, 0xce, 0x26, 0x1b, 0xab, 0xbf, 0x15, 0x21, 0xb3,
0x21, 0xaf, 0xd3, 0x88, 0x05, 0x19, 0xbc, 0xa9, 0x22, 0x86, 0x4e, 0x78, 0xf0, 0xf6, 0xab, 0x7c,
0x33, 0x11, 0x83, 0x5f, 0x8e, 0x6b, 0xbf, 0xfc, 0xf4, 0xd7, 0xf7, 0xe9, 0xcb, 0x90, 0x17, 0xa0,
0xd7, 0x30, 0xe2, 0xc4, 0x86, 0xac, 0x7f, 0xe5, 0x41, 0x5e, 0x1d, 0xe7, 0x82, 0xa8, 0x7c, 0x6b,
0x04, 0x2a, 0xd9, 0xa0, 0x03, 0x10, 0xdc, 0x38, 0x10, 0xad, 0xae, 0xa1, 0xdb, 0x93, 0xf2, 0xe2,
0x28, 0xd8, 0x48, 0x9b, 0xc1, 0x3d, 0x83, 0xde, 0xe6, 0xd0, 0x0d, 0x86, 0xde, 0xa6, 0xe6, 0xba,
0x22, 0xc6, 0xa6, 0x8c, 0x21, 0x3f, 0xc9, 0xc5, 0xc6, 0x30, 0x74, 0xcf, 0x10, 0x1b, 0xc3, 0x81,
0x1b, 0x85, 0xe4, 0x18, 0x8a, 0x73, 0x66, 0x7c, 0x0c, 0xc3, 0xa7, 0xf6, 0xf8, 0x18, 0x0e, 0x1c,
0x56, 0x47, 0xee, 0xa7, 0x58, 0x5e, 0xc2, 0x7e, 0x86, 0x57, 0xb8, 0x38, 0x0a, 0x36, 0xd2, 0x66,
0x70, 0x4e, 0xd4, 0xdb, 0x1c, 0x3a, 0x8a, 0xea, 0x6d, 0x0e, 0x1f, 0x37, 0xe3, 0x6c, 0x3e, 0x83,
0x4b, 0xe1, 0x96, 0x9b, 0xdc, 0x1e, 0xf3, 0x9c, 0x50, 0x5e, 0x1a, 0x0d, 0x4c, 0xb6, 0xfc, 0x15,
0xe4, 0x07, 0x0e, 0xea, 0x44, 0xab, 0x51, 0x77, 0x31, 0x50, 0x5e, 0x1e, 0x03, 0x39, 0xd2, 0xf8,
0xc0, 0x19, 0x54, 0x6f, 0x5c, 0x77, 0xce, 0xd6, 0x1b, 0xd7, 0x1e, 0x68, 0x13, 0x8c, 0x0f, 0x1c,
0x35, 0xf5, 0xc6, 0x75, 0x67, 0x5a, 0xbd, 0x71, 0xfd, 0xb9, 0x35, 0x91, 0x64, 0xd8, 0xba, 0xc5,
0x92, 0x6c, 0xb0, 0xdd, 0x8f, 0x25, 0x59, 0xb4, 0x77, 0x4f, 0x26, 0x99, 0xea, 0x33, 0xe3, 0x49,
0x16, 0x69, 0x8e, 0xe3, 0x49, 0x16, 0x6d, 0x59, 0x47, 0x92, 0x4c, 0x2d, 0x38, 0x81, 0x64, 0x91,
0x35, 0x2f, 0x8f, 0x81, 0x1c, 0x33, 0xce, 0x89, 0xc6, 0x75, 0xe7, 0xab, 0xa4, 0x38, 0x8f, 0x69,
0x5c, 0xc6, 0x19, 0xbf, 0xc1, 0xb1, 0x71, 0x1e, 0xec, 0x71, 0x62, 0xe3, 0x1c, 0x69, 0x00, 0x46,
0xc4, 0x59, 0xf5, 0x80, 0xf1, 0x71, 0x8e, 0x34, 0xae, 0xf1, 0x71, 0x8e, 0xb6, 0x93, 0x23, 0xf3,
0x59, 0x2d, 0x38, 0x21, 0x9f, 0x23, 0x6b, 0x5e, 0x1e, 0x03, 0x99, 0x68, 0xbc, 0x76, 0xe3, 0xe8,
0xf9, 0xfc, 0x85, 0xdf, 0xd9, 0xef, 0xef, 0xe7, 0xf3, 0xa9, 0xaf, 0x5f, 0xcc, 0xa7, 0x8e, 0xd8,
0xef, 0x57, 0xf6, 0xfb, 0x83, 0xfd, 0x76, 0x67, 0xc4, 0x7f, 0xf4, 0xd6, 0xfe, 0x0d, 0x00, 0x00,
0xff, 0xff, 0xf3, 0xcc, 0x22, 0xcd, 0x4a, 0x1c, 0x00, 0x00,
0xe9, 0xc1, 0xca, 0xee, 0xcc, 0xe7, 0xcd, 0x7b, 0x33, 0x9f, 0xcf, 0xbc, 0x7d, 0x33, 0x81, 0x7c,
0xd3, 0xee, 0x79, 0x8e, 0xdd, 0xa9, 0xf4, 0x1d, 0xdb, 0xb3, 0x09, 0x69, 0xd9, 0xcd, 0x36, 0x75,
0x2a, 0xee, 0xd3, 0x86, 0xd3, 0x6d, 0x5b, 0x5e, 0xe5, 0xe0, 0x8d, 0x72, 0xce, 0xed, 0xd3, 0xa6,
0x2b, 0x01, 0xe5, 0xbc, 0xbd, 0xf3, 0x05, 0x6d, 0x7a, 0xea, 0x35, 0xe7, 0x1d, 0xf6, 0xa9, 0x7a,
0x29, 0xed, 0xd9, 0x7b, 0xb6, 0x78, 0x5c, 0xe1, 0x4f, 0xd8, 0x7a, 0xb5, 0xdf, 0xd9, 0xdf, 0xb3,
0x7a, 0x2b, 0xf2, 0x8f, 0x6c, 0x34, 0xee, 0x42, 0xe1, 0x3e, 0xf5, 0x36, 0xed, 0x16, 0x35, 0xe9,
0x97, 0xfb, 0xd4, 0xf5, 0xc8, 0x2d, 0xc8, 0xf4, 0xd8, 0xeb, 0xb6, 0xd5, 0x9a, 0x4b, 0xbd, 0x92,
0x5a, 0xcc, 0xd6, 0xe0, 0xc5, 0xf1, 0xfc, 0x0c, 0x47, 0xd4, 0x37, 0xcc, 0x19, 0xde, 0x55, 0x6f,
0x19, 0xef, 0xc1, 0x65, 0xdf, 0xcc, 0xed, 0xdb, 0x3d, 0x97, 0x92, 0x65, 0x98, 0xe6, 0x9d, 0xc2,
0x28, 0x57, 0x9d, 0xab, 0x0c, 0x4f, 0xa0, 0x22, 0xf0, 0x02, 0x65, 0x1c, 0x4f, 0xc1, 0x95, 0x07,
0x96, 0x2b, 0x86, 0x70, 0x95, 0xeb, 0x7b, 0x90, 0xd9, 0xb5, 0x3a, 0x1e, 0x75, 0x5c, 0x1c, 0x65,
0x59, 0x37, 0x4a, 0xd4, 0xac, 0x72, 0x4f, 0xda, 0x98, 0xca, 0xb8, 0xfc, 0xcd, 0x14, 0x64, 0xb0,
0x91, 0x94, 0xe0, 0x62, 0xaf, 0xd1, 0xa5, 0x7c, 0xc4, 0xa9, 0xc5, 0xac, 0x29, 0x5f, 0xc8, 0x0a,
0xe4, 0xac, 0xd6, 0x76, 0xdf, 0xa1, 0xbb, 0xd6, 0x33, 0xd6, 0x97, 0xe6, 0x7d, 0xb5, 0x02, 0x9b,
0x28, 0xd4, 0x37, 0x1e, 0x61, 0xab, 0x09, 0x56, 0x4b, 0x3d, 0x93, 0x47, 0x30, 0xd3, 0x69, 0xec,
0xd0, 0x8e, 0x3b, 0x37, 0xc5, 0xb0, 0xb9, 0xea, 0xda, 0x24, 0x91, 0x55, 0x1e, 0x08, 0xd3, 0x0f,
0x18, 0xc1, 0x87, 0x26, 0x8e, 0x43, 0xea, 0x90, 0xeb, 0xd2, 0xee, 0x0e, 0xeb, 0xfe, 0xdc, 0xea,
0xbb, 0x73, 0xd3, 0x6c, 0xd8, 0x42, 0xf5, 0x4e, 0xdc, 0xb2, 0x6d, 0x31, 0xea, 0x2b, 0x0f, 0x7d,
0xbc, 0x19, 0xb6, 0x25, 0x55, 0xb8, 0xc8, 0x94, 0xc3, 0xe6, 0x71, 0x51, 0x0c, 0x72, 0x23, 0x76,
0xed, 0x19, 0xc8, 0x94, 0x50, 0x46, 0x73, 0x9e, 0x2f, 0x45, 0xb0, 0x06, 0x33, 0x62, 0x7d, 0x2e,
0xf1, 0x46, 0x35, 0xeb, 0xf2, 0x5b, 0x90, 0x0b, 0x85, 0x4e, 0xae, 0xc0, 0x54, 0x9b, 0x1e, 0x4a,
0x59, 0x98, 0xfc, 0x91, 0xaf, 0xee, 0x41, 0xa3, 0xb3, 0x4f, 0xd9, 0x0a, 0xf2, 0x36, 0xf9, 0xf2,
0x76, 0x7a, 0x2d, 0x65, 0xac, 0x43, 0x31, 0xb4, 0x1c, 0xa8, 0x91, 0x0a, 0x23, 0x83, 0x37, 0x08,
0x32, 0x92, 0x44, 0x22, 0x61, 0xc6, 0x8f, 0x29, 0x28, 0x3e, 0xe9, 0xb7, 0x1a, 0x1e, 0x9d, 0x54,
0xa1, 0xe4, 0x5d, 0xb8, 0x24, 0x40, 0x07, 0x6c, 0x91, 0x2c, 0xbb, 0x27, 0x02, 0xcc, 0x55, 0xaf,
0xeb, 0x3c, 0x7e, 0x22, 0x21, 0x66, 0x8e, 0x1b, 0xe0, 0x0b, 0x79, 0x1d, 0xa6, 0xf9, 0x76, 0x63,
0x74, 0x73, 0xbb, 0x1b, 0x49, 0xbc, 0x98, 0x02, 0x69, 0xd4, 0x80, 0x84, 0x63, 0x3d, 0xd1, 0xb6,
0xd8, 0x84, 0xa2, 0x49, 0xbb, 0xf6, 0xc1, 0xe4, 0xf3, 0x65, 0x4c, 0xec, 0xda, 0x4e, 0x53, 0x32,
0x31, 0x6b, 0xca, 0x17, 0xa3, 0x04, 0x24, 0x3c, 0x9e, 0x8c, 0x09, 0x37, 0xfd, 0xe3, 0x86, 0xdb,
0x0e, 0xb9, 0xf0, 0xd8, 0x6b, 0xc4, 0x05, 0x47, 0x70, 0x17, 0xbc, 0xcb, 0xdf, 0xf4, 0xd2, 0x2c,
0x98, 0x1d, 0xef, 0x4c, 0x9a, 0x9d, 0xc0, 0x0b, 0x94, 0xb1, 0xa6, 0x66, 0x37, 0xb1, 0x6b, 0x7f,
0x1e, 0x61, 0xef, 0xc6, 0x3f, 0x98, 0x44, 0x78, 0xe3, 0x09, 0x92, 0x48, 0xd8, 0x6c, 0x38, 0x89,
0xfc, 0x70, 0x8e, 0x49, 0x44, 0x17, 0x99, 0x36, 0x89, 0xb0, 0x10, 0x5c, 0xea, 0x1c, 0x58, 0x4d,
0xae, 0x0e, 0x99, 0x44, 0x30, 0x84, 0x2d, 0xd9, 0x5c, 0xdf, 0x60, 0x21, 0x20, 0xa4, 0xde, 0x72,
0xc9, 0x02, 0xcc, 0xa2, 0x96, 0x64, 0xb6, 0xc8, 0xd6, 0x72, 0x0c, 0x9d, 0x91, 0x62, 0x62, 0xb3,
0x97, 0x6a, 0x72, 0xc9, 0x06, 0x14, 0xd8, 0x06, 0xb4, 0x1c, 0xda, 0xda, 0x76, 0x3d, 0xa6, 0x69,
0x99, 0x1f, 0x0a, 0xd5, 0xff, 0xc7, 0x51, 0xbc, 0xc5, 0x51, 0x66, 0x1e, 0x8d, 0xc4, 0x9b, 0x26,
0xc9, 0x64, 0xfe, 0x93, 0x24, 0x83, 0xcb, 0x15, 0x24, 0x19, 0xae, 0x9a, 0xc4, 0x24, 0x23, 0x64,
0x24, 0x61, 0xc6, 0x47, 0x50, 0x5a, 0x77, 0x28, 0x8b, 0x17, 0x97, 0x4c, 0x09, 0x69, 0x15, 0x33,
0x80, 0x54, 0xd1, 0xbc, 0x6e, 0x18, 0xb4, 0x08, 0x25, 0x81, 0x4d, 0xb8, 0x16, 0x19, 0x0c, 0xa3,
0xba, 0x0b, 0x19, 0xa4, 0x01, 0x07, 0xbc, 0x9e, 0x30, 0xa0, 0xa9, 0xb0, 0xc6, 0xfb, 0x50, 0x64,
0x7b, 0x2e, 0x12, 0xd9, 0x32, 0x40, 0xc0, 0x3a, 0xee, 0x9a, 0x3c, 0xa3, 0x31, 0xeb, 0x93, 0x6e,
0x66, 0x7d, 0xce, 0xd9, 0xfc, 0x48, 0x78, 0x88, 0xd3, 0xc5, 0xf3, 0x73, 0x0a, 0x4a, 0x32, 0xcb,
0x9d, 0x26, 0x26, 0x26, 0xaf, 0xcb, 0x0a, 0x3d, 0x41, 0x82, 0x2e, 0xa0, 0x8d, 0xca, 0xd1, 0xab,
0x03, 0x39, 0x7a, 0x7c, 0x86, 0x22, 0x13, 0x38, 0xdd, 0x8a, 0x6c, 0x40, 0x49, 0xa6, 0xa6, 0x53,
0x91, 0xf4, 0x3f, 0xb8, 0x16, 0x19, 0x05, 0x73, 0xdc, 0x9f, 0x69, 0xb8, 0xca, 0x35, 0x8e, 0xed,
0x7e, 0x9a, 0xab, 0x47, 0xd3, 0xdc, 0x4a, 0x5c, 0x32, 0x89, 0x58, 0x0e, 0x67, 0xba, 0x6f, 0xd3,
0x67, 0x9e, 0xe9, 0xb6, 0x22, 0x99, 0xee, 0x9d, 0x09, 0x83, 0xd3, 0x26, 0xbb, 0xa1, 0x6c, 0x32,
0x7d, 0xb6, 0xd9, 0xe4, 0x63, 0x28, 0x0d, 0x86, 0x84, 0xc2, 0x78, 0x13, 0x66, 0x91, 0x28, 0x95,
0x53, 0x12, 0x95, 0xe1, 0x83, 0x83, 0xcc, 0xb2, 0x49, 0xbd, 0xa7, 0xb6, 0xd3, 0x9e, 0x20, 0xb3,
0xa0, 0x85, 0x2e, 0xb3, 0xf8, 0x83, 0x05, 0xba, 0xed, 0xc9, 0xa6, 0x24, 0xdd, 0x2a, 0x2b, 0x85,
0x35, 0x9e, 0x88, 0xcc, 0x12, 0x89, 0x8c, 0xb0, 0x6a, 0x85, 0xad, 0x26, 0xae, 0x97, 0x78, 0xe6,
0x42, 0x46, 0x1b, 0x2e, 0xe4, 0x74, 0x20, 0x64, 0xb4, 0xe5, 0x42, 0x46, 0x80, 0x9f, 0x6d, 0xce,
0x28, 0xc6, 0xcf, 0xd4, 0xde, 0x3a, 0xf3, 0x30, 0xfd, 0xfd, 0x16, 0x89, 0xd4, 0xdf, 0x6f, 0xd8,
0x7e, 0x82, 0xfd, 0x16, 0xb1, 0x7c, 0xb9, 0xf6, 0x5b, 0x4c, 0x70, 0xe7, 0xb9, 0xdf, 0x82, 0x90,
0x82, 0xfd, 0x86, 0x44, 0x25, 0xee, 0x37, 0xc5, 0x9c, 0x0f, 0xc6, 0x8f, 0xe5, 0x7a, 0x67, 0xdf,
0x65, 0x73, 0x0a, 0xe5, 0xe1, 0xa6, 0x6c, 0x89, 0xe4, 0x61, 0xc4, 0x71, 0x5d, 0x20, 0xc0, 0x97,
0xaf, 0x3f, 0x44, 0x20, 0x5f, 0x84, 0x24, 0xc9, 0x57, 0x59, 0x29, 0xac, 0xaf, 0x25, 0xec, 0x38,
0x81, 0x96, 0x22, 0x96, 0x2f, 0x97, 0x96, 0x62, 0x82, 0x3b, 0x4f, 0x2d, 0x05, 0x21, 0x05, 0x5a,
0x42, 0x36, 0x12, 0xb5, 0xa4, 0xa8, 0xf3, 0xc1, 0xc6, 0x3e, 0x14, 0x3f, 0xb4, 0xad, 0xde, 0x63,
0xbb, 0x4d, 0x7b, 0xa6, 0xcd, 0xca, 0x59, 0x5e, 0x70, 0x54, 0xe0, 0xaa, 0xc3, 0x9f, 0xe9, 0x36,
0x17, 0x1c, 0x53, 0x94, 0xc7, 0xbb, 0x45, 0x84, 0xb3, 0x66, 0x51, 0x76, 0x7d, 0x2a, 0x7a, 0x84,
0x1d, 0x3b, 0x44, 0x96, 0x10, 0xdf, 0x6d, 0xf4, 0x1a, 0x7b, 0xbe, 0x81, 0x3c, 0xa3, 0x11, 0xd9,
0xf7, 0x50, 0x76, 0x09, 0x0b, 0xe3, 0xbb, 0xb4, 0xaa, 0xaf, 0x4e, 0x23, 0x63, 0x5e, 0x5f, 0x29,
0xf4, 0x24, 0xf5, 0x15, 0xda, 0x4c, 0x50, 0x5f, 0xa1, 0xf7, 0xe0, 0x3b, 0x45, 0xee, 0xc3, 0xac,
0x83, 0xeb, 0xc5, 0x48, 0xe6, 0x86, 0xb7, 0x75, 0x86, 0x43, 0x8b, 0x5b, 0x9b, 0x3e, 0x3a, 0x9e,
0xbf, 0x60, 0xfa, 0xc6, 0x41, 0xa1, 0x76, 0x36, 0xbb, 0xb1, 0xfa, 0x5b, 0x11, 0x32, 0xeb, 0xf2,
0x92, 0x8d, 0x58, 0x90, 0xc1, 0xfb, 0x2b, 0x62, 0xe8, 0x8c, 0x07, 0xef, 0xc4, 0xca, 0xb7, 0x12,
0x31, 0xf8, 0xe5, 0xb8, 0xf6, 0xcb, 0x4f, 0x7f, 0x7d, 0x9f, 0xbe, 0x0c, 0x79, 0x01, 0x7a, 0x0d,
0x19, 0x27, 0x36, 0x64, 0xfd, 0x8b, 0x10, 0xf2, 0xea, 0x38, 0xd7, 0x46, 0xe5, 0xdb, 0x23, 0x50,
0xc9, 0x0e, 0x1d, 0x80, 0xe0, 0x1e, 0x82, 0x68, 0xc7, 0x1a, 0xba, 0x53, 0x29, 0x2f, 0x8c, 0x82,
0x8d, 0xf4, 0x19, 0xdc, 0x33, 0xe8, 0x7d, 0x0e, 0xdd, 0x6b, 0xe8, 0x7d, 0x6a, 0xae, 0x2b, 0x62,
0x7c, 0x4a, 0x0e, 0xf9, 0x49, 0x2e, 0x96, 0xc3, 0xd0, 0x3d, 0x43, 0x2c, 0x87, 0x03, 0x37, 0x0a,
0xc9, 0x1c, 0x8a, 0x73, 0x66, 0x3c, 0x87, 0xe1, 0x53, 0x7b, 0x3c, 0x87, 0x03, 0x87, 0xd5, 0x91,
0xeb, 0x29, 0xa6, 0x97, 0xb0, 0x9e, 0xe1, 0x19, 0x2e, 0x8c, 0x82, 0x8d, 0xf4, 0x19, 0x9c, 0x13,
0xf5, 0x3e, 0x87, 0x8e, 0xa2, 0x7a, 0x9f, 0xc3, 0xc7, 0xcd, 0x38, 0x9f, 0xcf, 0xe0, 0x52, 0xb8,
0xe4, 0x26, 0x77, 0xc6, 0x3c, 0x27, 0x94, 0x17, 0x47, 0x03, 0x93, 0x3d, 0x7f, 0x05, 0xf9, 0x81,
0x83, 0x3a, 0xd1, 0x8e, 0xa8, 0xbb, 0x18, 0x28, 0x2f, 0x8d, 0x81, 0x1c, 0xe9, 0x7c, 0xe0, 0x0c,
0xaa, 0x77, 0xae, 0x3b, 0x67, 0xeb, 0x9d, 0x6b, 0x0f, 0xb4, 0x09, 0xce, 0x07, 0x8e, 0x9a, 0x7a,
0xe7, 0xba, 0x33, 0xad, 0xde, 0xb9, 0xfe, 0xdc, 0x9a, 0x28, 0x32, 0x2c, 0xdd, 0x62, 0x45, 0x36,
0x58, 0xee, 0xc7, 0x8a, 0x2c, 0x5a, 0xbb, 0x27, 0x8b, 0x4c, 0xd5, 0x99, 0xf1, 0x22, 0x8b, 0x14,
0xc7, 0xf1, 0x22, 0x8b, 0x96, 0xac, 0x23, 0x45, 0xa6, 0x26, 0x9c, 0x20, 0xb2, 0xc8, 0x9c, 0x97,
0xc6, 0x40, 0x8e, 0xc9, 0x73, 0xa2, 0x73, 0xdd, 0xf9, 0x2a, 0x89, 0xe7, 0x31, 0x9d, 0x4b, 0x9e,
0xf1, 0x1b, 0x1c, 0xcb, 0xf3, 0x60, 0x8d, 0x13, 0xcb, 0x73, 0xa4, 0x00, 0x18, 0xc1, 0xb3, 0xaa,
0x01, 0xe3, 0x79, 0x8e, 0x14, 0xae, 0xf1, 0x3c, 0x47, 0xcb, 0xc9, 0x91, 0xfb, 0x59, 0x4d, 0x38,
0x61, 0x3f, 0x47, 0xe6, 0xbc, 0x34, 0x06, 0x32, 0xd1, 0x79, 0xed, 0xc6, 0xd1, 0xf3, 0x9b, 0x17,
0x7e, 0x67, 0xbf, 0xbf, 0x9f, 0xdf, 0x4c, 0x7d, 0xfd, 0xe2, 0x66, 0xea, 0x88, 0xfd, 0x7e, 0x65,
0xbf, 0x3f, 0xd8, 0x6f, 0x67, 0x46, 0xfc, 0x9f, 0x6f, 0xf5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff,
0xbb, 0x97, 0x43, 0x36, 0x60, 0x1c, 0x00, 0x00,
}

View file

@ -115,6 +115,7 @@ message UpdateNodeResponse {
// RemoveNodeRequest requests to delete the specified node from store.
message RemoveNodeRequest {
string node_id = 1 [(gogoproto.customname) = "NodeID"];
bool force = 2;
}
message RemoveNodeResponse {

View file

@ -1072,12 +1072,12 @@ func encodeVarintDispatcher(data []byte, offset int, v uint64) int {
type raftProxyDispatcherServer struct {
local DispatcherServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer {
func NewRaftProxyDispatcherServer(local DispatcherServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) DispatcherServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1128,6 +1128,19 @@ func (p *raftProxyDispatcherServer) Session(r *SessionRequest, stream Dispatcher
if err != nil {
return err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
clientStream, err := NewDispatcherClient(conn).Session(ctx, r)
if err != nil {
@ -1162,6 +1175,19 @@ func (p *raftProxyDispatcherServer) Heartbeat(ctx context.Context, r *HeartbeatR
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewDispatcherClient(conn).Heartbeat(ctx, r)
}
@ -1178,6 +1204,19 @@ func (p *raftProxyDispatcherServer) UpdateTaskStatus(ctx context.Context, r *Upd
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewDispatcherClient(conn).UpdateTaskStatus(ctx, r)
}
@ -1194,6 +1233,19 @@ func (p *raftProxyDispatcherServer) Tasks(r *TasksRequest, stream Dispatcher_Tas
if err != nil {
return err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
clientStream, err := NewDispatcherClient(conn).Tasks(ctx, r)
if err != nil {

View file

@ -19,3 +19,11 @@ func TasksEqualStable(a, b *api.Task) bool {
return reflect.DeepEqual(&copyA, &copyB)
}
// TaskStatusesEqualStable compares the task status excluding timestamp fields.
func TaskStatusesEqualStable(a, b *api.TaskStatus) bool {
copyA, copyB := *a, *b
copyA.Timestamp, copyB.Timestamp = nil, nil
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -319,12 +319,12 @@ func encodeVarintHealth(data []byte, offset int, v uint64) int {
type raftProxyHealthServer struct {
local HealthServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyHealthServer(local HealthServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer {
func NewRaftProxyHealthServer(local HealthServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) HealthServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -375,6 +375,19 @@ func (p *raftProxyHealthServer) Check(ctx context.Context, r *HealthCheckRequest
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewHealthClient(conn).Check(ctx, r)
}

View file

@ -1438,12 +1438,12 @@ func encodeVarintRaft(data []byte, offset int, v uint64) int {
type raftProxyRaftServer struct {
local RaftServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyRaftServer(local RaftServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer {
func NewRaftProxyRaftServer(local RaftServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1494,6 +1494,19 @@ func (p *raftProxyRaftServer) ProcessRaftMessage(ctx context.Context, r *Process
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftClient(conn).ProcessRaftMessage(ctx, r)
}
@ -1510,17 +1523,30 @@ func (p *raftProxyRaftServer) ResolveAddress(ctx context.Context, r *ResolveAddr
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftClient(conn).ResolveAddress(ctx, r)
}
type raftProxyRaftMembershipServer struct {
local RaftMembershipServer
connSelector *raftpicker.ConnSelector
connSelector raftpicker.Interface
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector *raftpicker.ConnSelector, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer {
func NewRaftProxyRaftMembershipServer(local RaftMembershipServer, connSelector raftpicker.Interface, cluster raftpicker.RaftCluster, ctxMod func(context.Context) (context.Context, error)) RaftMembershipServer {
redirectChecker := func(ctx context.Context) (context.Context, error) {
s, ok := transport.StreamFromContext(ctx)
if !ok {
@ -1571,6 +1597,19 @@ func (p *raftProxyRaftMembershipServer) Join(ctx context.Context, r *JoinRequest
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftMembershipClient(conn).Join(ctx, r)
}
@ -1587,6 +1626,19 @@ func (p *raftProxyRaftMembershipServer) Leave(ctx context.Context, r *LeaveReque
if err != nil {
return nil, err
}
defer func() {
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, grpc.ErrClientConnClosing.Error()) ||
strings.Contains(errStr, grpc.ErrClientConnTimeout.Error()) ||
strings.Contains(errStr, "connection error") ||
grpc.Code(err) == codes.Internal {
p.connSelector.Reset()
}
}
}()
return NewRaftMembershipClient(conn).Leave(ctx, r)
}

View file

@ -57,19 +57,17 @@ const (
RootCAExpiration = "630720000s"
// DefaultNodeCertExpiration represents the default expiration for node certificates (3 months)
DefaultNodeCertExpiration = 2160 * time.Hour
// CertBackdate represents the amount of time each certificate is backdated to try to avoid
// clock drift issues.
CertBackdate = 1 * time.Hour
// CertLowerRotationRange represents the minimum fraction of time that we will wait when randomly
// choosing our next certificate rotation
CertLowerRotationRange = 0.5
// CertUpperRotationRange represents the maximum fraction of time that we will wait when randomly
// choosing our next certificate rotation
CertUpperRotationRange = 0.8
// MinNodeCertExpiration represents the minimum expiration for node certificates (25 + 5 minutes)
// X - 5 > CertUpperRotationRange * X <=> X < 5/(1 - CertUpperRotationRange)
// Since we're issuing certificates 5 minutes in the past to get around clock drifts, and
// we're selecting a random rotation distribution range from CertLowerRotationRange to
// CertUpperRotationRange, we need to ensure that we don't accept an expiration time that will
// make a node able to randomly choose the next rotation after the expiration of the certificate.
MinNodeCertExpiration = 30 * time.Minute
// MinNodeCertExpiration represents the minimum expiration for node certificates
MinNodeCertExpiration = 1 * time.Hour
)
// ErrNoLocalRootCA is an error type used to indicate that the local root CA

View file

@ -109,12 +109,6 @@ func (s *SecurityConfig) UpdateRootCA(cert, key []byte, certExpiry time.Duration
return err
}
// DefaultPolicy is the default policy used by the signers to ensure that the only fields
// from the remote CSRs we trust are: PublicKey, PublicKeyAlgorithm and SignatureAlgorithm.
func DefaultPolicy() *cfconfig.Signing {
return SigningPolicy(DefaultNodeCertExpiration)
}
// SigningPolicy creates a policy used by the signer to ensure that the only fields
// from the remote CSRs we trust are: PublicKey, PublicKeyAlgorithm and SignatureAlgorithm.
// It receives the duration a certificate will be valid for
@ -124,10 +118,14 @@ func SigningPolicy(certExpiry time.Duration) *cfconfig.Signing {
certExpiry = DefaultNodeCertExpiration
}
// Add the backdate
certExpiry = certExpiry + CertBackdate
return &cfconfig.Signing{
Default: &cfconfig.SigningProfile{
Usage: []string{"signing", "key encipherment", "server auth", "client auth"},
Expiry: certExpiry,
Backdate: CertBackdate,
// Only trust the key components from the CSR. Everything else should
// come directly from API call params.
CSRWhitelist: &cfconfig.CSRWhitelist{

View file

@ -193,7 +193,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}
for _, s := range services {
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
continue
}
@ -304,7 +304,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateService:
s := v.Service.Copy()
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
break
}
@ -315,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventUpdateService:
s := v.Service.Copy()
if !serviceAllocationNeeded(s, nc) {
if nc.nwkAllocator.IsServiceAllocated(s) {
break
}
@ -326,13 +326,13 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventDeleteService:
s := v.Service.Copy()
if serviceAllocationNeeded(s, nc) {
break
}
if err := nc.nwkAllocator.ServiceDeallocate(s); err != nil {
log.G(ctx).Errorf("Failed deallocation during delete of service %s: %v", s.ID, err)
}
// Remove it from unallocatedServices just in case
// it's still there.
delete(nc.unallocatedServices, s.ID)
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, nc, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
@ -382,23 +382,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
}
}
// serviceAllocationNeeded returns if a service needs allocation or not.
func serviceAllocationNeeded(s *api.Service, nc *networkContext) bool {
// Service needs allocation if:
// Spec has network attachments and endpoint resolution mode is VIP OR
// Spec has non-zero number of exposed ports and ingress routing is SwarmPort
if (len(s.Spec.Networks) != 0 &&
(s.Spec.Endpoint == nil ||
(s.Spec.Endpoint != nil &&
s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP))) ||
(s.Spec.Endpoint != nil &&
len(s.Spec.Endpoint.Ports) != 0) {
return !nc.nwkAllocator.IsServiceAllocated(s)
}
return false
}
// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
@ -420,7 +403,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo
// network configured or service endpoints have been
// allocated.
return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) &&
(s == nil || !serviceAllocationNeeded(s, nc))
(s == nil || nc.nwkAllocator.IsServiceAllocated(s))
}
func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) {
@ -599,6 +582,22 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
return err
}
// If the service doesn't expose ports any more and if we have
// any lingering virtual IP references for ingress network
// clean them up here.
if s.Spec.Endpoint == nil || len(s.Spec.Endpoint.Ports) == 0 {
if s.Endpoint != nil {
for i, vip := range s.Endpoint.VirtualIPs {
if vip.NetworkID == nc.ingressNetwork.ID {
n := len(s.Endpoint.VirtualIPs)
s.Endpoint.VirtualIPs[i], s.Endpoint.VirtualIPs[n-1] = s.Endpoint.VirtualIPs[n-1], nil
s.Endpoint.VirtualIPs = s.Endpoint.VirtualIPs[:n-1]
break
}
}
}
}
if err := a.store.Update(func(tx store.Tx) error {
for {
err := store.UpdateService(tx, s)
@ -670,7 +669,7 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return nil, fmt.Errorf("could not find service %s", t.ServiceID)
}
if serviceAllocationNeeded(s, nc) {
if !nc.nwkAllocator.IsServiceAllocated(s) {
return nil, fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID)
}
@ -733,7 +732,7 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
for _, s := range nc.unallocatedServices {
if serviceAllocationNeeded(s, nc) {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if err := a.allocateService(ctx, nc, s); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
continue

View file

@ -165,15 +165,29 @@ func (na *NetworkAllocator) ServiceAllocate(s *api.Service) (err error) {
}
}()
// If ResolutionMode is DNSRR do not try allocating VIPs.
if s.Endpoint == nil {
s.Endpoint = &api.Endpoint{}
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
// If ResolutionMode is DNSRR do not try allocating VIPs, but
// free any VIP from previous state.
if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin {
return
if s.Endpoint != nil {
for _, vip := range s.Endpoint.VirtualIPs {
if err := na.deallocateVIP(vip); err != nil {
// don't bail here, deallocate as many as possible.
log.L.WithError(err).
WithField("vip.network", vip.NetworkID).
WithField("vip.addr", vip.Addr).Error("error deallocating vip")
}
}
if s.Endpoint == nil {
s.Endpoint = &api.Endpoint{
Spec: s.Spec.Endpoint.Copy(),
s.Endpoint.VirtualIPs = nil
}
delete(na.services, s.ID)
return
}
// First allocate VIPs for all the pre-populated endpoint attachments
@ -198,7 +212,6 @@ outer:
s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, vip)
}
s.Endpoint.Spec = s.Spec.Endpoint.Copy()
na.services[s.ID] = struct{}{}
return
@ -271,11 +284,28 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool {
// IsServiceAllocated returns if the passed service has it's network resources allocated or not.
func (na *NetworkAllocator) IsServiceAllocated(s *api.Service) bool {
// If endpoint mode is VIP and allocator does not have the
// service in VIP allocated set then it is not allocated.
if len(s.Spec.Networks) != 0 &&
(s.Spec.Endpoint == nil ||
s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP) {
if _, ok := na.services[s.ID]; !ok {
return false
}
}
if s.Spec.Endpoint != nil {
// If the endpoint mode is DNSRR and allocator has the service
// in VIP allocated set then we return not allocated to make
// sure the allocator triggers networkallocator to free up the
// resources if any.
if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin {
if _, ok := na.services[s.ID]; ok {
return false
}
}
if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) ||
(s.Endpoint != nil && len(s.Endpoint.Ports) != 0) {
return na.portAllocator.isPortsAllocated(s)
}

View file

@ -0,0 +1,12 @@
package hackpicker
// AddrSelector is interface which should track cluster for its leader address.
type AddrSelector interface {
LeaderAddr() (string, error)
}
// RaftCluster is interface which combines useful methods for clustering.
type RaftCluster interface {
AddrSelector
IsLeader() bool
}

View file

@ -0,0 +1,141 @@
// Package hackpicker is temporary solution to provide more seamless experience
// for controlapi. It has drawback of slow reaction to leader change, but it
// tracks leader automatically without erroring out to client.
package hackpicker
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)
// picker always picks address of cluster leader.
type picker struct {
mu sync.Mutex
addr string
raft AddrSelector
conn *grpc.Conn
cc *grpc.ClientConn
}
// Init does initial processing for the Picker, e.g., initiate some connections.
func (p *picker) Init(cc *grpc.ClientConn) error {
p.cc = cc
return nil
}
func (p *picker) initConn() error {
if p.conn == nil {
conn, err := grpc.NewConn(p.cc)
if err != nil {
return err
}
p.conn = conn
}
return nil
}
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// or some error happens.
func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Lock()
if err := p.initConn(); err != nil {
p.mu.Unlock()
return nil, err
}
p.mu.Unlock()
addr, err := p.raft.LeaderAddr()
if err != nil {
return nil, err
}
p.mu.Lock()
if p.addr != addr {
p.addr = addr
p.conn.NotifyReset()
}
p.mu.Unlock()
return p.conn.Wait(ctx)
}
// PickAddr picks a peer address for connecting. This will be called repeated for
// connecting/reconnecting.
func (p *picker) PickAddr() (string, error) {
addr, err := p.raft.LeaderAddr()
if err != nil {
return "", err
}
p.mu.Lock()
p.addr = addr
p.mu.Unlock()
return addr, nil
}
// State returns the connectivity state of the underlying connections.
func (p *picker) State() (grpc.ConnectivityState, error) {
return p.conn.State(), nil
}
// WaitForStateChange blocks until the state changes to something other than
// the sourceState. It returns the new state or error.
func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
return p.conn.WaitForStateChange(ctx, sourceState)
}
// Reset the current connection and force a reconnect to another address.
func (p *picker) Reset() error {
p.conn.NotifyReset()
return nil
}
// Close closes all the Conn's owned by this Picker.
func (p *picker) Close() error {
return p.conn.Close()
}
// ConnSelector is struct for obtaining connection with raftpicker.
type ConnSelector struct {
mu sync.Mutex
cc *grpc.ClientConn
cluster RaftCluster
opts []grpc.DialOption
}
// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which
// will be used for Dial on first call of Conn.
func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector {
return &ConnSelector{
cluster: cluster,
opts: opts,
}
}
// Conn returns *grpc.ClientConn with picker which picks raft cluster leader.
// Internal connection estabilished lazily on this call.
// It can return error if cluster wasn't ready at the moment of initial call.
func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cc != nil {
return c.cc, nil
}
addr, err := c.cluster.LeaderAddr()
if err != nil {
return nil, err
}
picker := &picker{raft: c.cluster, addr: addr}
opts := append(c.opts, grpc.WithPicker(picker))
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
c.cc = cc
return c.cc, nil
}
// Reset does nothing for hackpicker.
func (c *ConnSelector) Reset() error {
return nil
}

View file

@ -283,7 +283,7 @@ func (s *Server) RemoveNode(ctx context.Context, request *api.RemoveNodeRequest)
return grpc.Errorf(codes.FailedPrecondition, "node %s is a cluster manager and is a member of the raft cluster. It must be demoted to worker before removal", request.NodeID)
}
}
if node.Status.State == api.NodeStatus_READY {
if !request.Force && node.Status.State == api.NodeStatus_READY {
return grpc.Errorf(codes.FailedPrecondition, "node %s is not down and can't be removed", request.NodeID)
}
return store.DeleteNode(tx, request.NodeID)

View file

@ -182,7 +182,7 @@ func validateServiceSpec(spec *api.ServiceSpec) error {
// checkPortConflicts does a best effort to find if the passed in spec has port
// conflicts with existing services.
func (s *Server) checkPortConflicts(spec *api.ServiceSpec) error {
func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) error {
if spec.Endpoint == nil {
return nil
}
@ -215,17 +215,21 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec) error {
}
for _, service := range services {
// If service ID is the same (and not "") then this is an update
if serviceID != "" && serviceID == service.ID {
continue
}
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
}
}
if service.Endpoint != nil {
for _, pc := range service.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service %s", pc.PublishedPort, service.ID)
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
}
}
}
@ -243,7 +247,7 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.checkPortConflicts(request.Spec); err != nil {
if err := s.checkPortConflicts(request.Spec, ""); err != nil {
return nil, err
}
@ -309,7 +313,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
if request.Spec.Endpoint != nil && !reflect.DeepEqual(request.Spec.Endpoint, service.Spec.Endpoint) {
if err := s.checkPortConflicts(request.Spec); err != nil {
if err := s.checkPortConflicts(request.Spec, request.ServiceID); err != nil {
return nil, err
}
}

View file

@ -20,6 +20,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/state/watch"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
)
@ -142,7 +143,11 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
NodeID: m.NodeID,
Addr: m.Addr,
},
Weight: 1,
// TODO(stevvooe): Calculate weight of manager selection based on
// cluster-level observations, such as number of connections and
// load.
Weight: picker.DefaultObservationWeight,
})
}
return mgrs
@ -590,7 +595,10 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
modificationCnt++
case state.EventUpdateTask:
if oldTask, exists := tasksMap[v.Task.ID]; exists {
if equality.TasksEqualStable(oldTask, v.Task) {
// States ASSIGNED and below are set by the orchestrator/scheduler,
// not the agent, so tasks in these states need to be sent to the
// agent even if nothing else has changed.
if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
// this update should not trigger action at agent
tasksMap[v.Task.ID] = v.Task
continue

View file

@ -44,7 +44,8 @@ func (rn *registeredNode) checkSessionID(sessionID string) error {
type nodeStore struct {
periodChooser *periodChooser
gracePeriodMultiplier time.Duration
gracePeriodMultiplierNormal time.Duration
gracePeriodMultiplierUnknown time.Duration
rateLimitPeriod time.Duration
nodes map[string]*registeredNode
mu sync.RWMutex
@ -54,7 +55,8 @@ func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLi
return &nodeStore{
nodes: make(map[string]*registeredNode),
periodChooser: newPeriodChooser(hbPeriod, hbEpsilon),
gracePeriodMultiplier: time.Duration(graceMultiplier),
gracePeriodMultiplierNormal: time.Duration(graceMultiplier),
gracePeriodMultiplierUnknown: time.Duration(graceMultiplier) * 2,
rateLimitPeriod: rateLimitPeriod,
}
}
@ -62,7 +64,8 @@ func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLi
func (s *nodeStore) updatePeriod(hbPeriod, hbEpsilon time.Duration, gracePeriodMultiplier int) {
s.mu.Lock()
s.periodChooser = newPeriodChooser(hbPeriod, hbEpsilon)
s.gracePeriodMultiplier = time.Duration(gracePeriodMultiplier)
s.gracePeriodMultiplierNormal = time.Duration(gracePeriodMultiplier)
s.gracePeriodMultiplierUnknown = s.gracePeriodMultiplierNormal * 2
s.mu.Unlock()
}
@ -79,7 +82,7 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error {
Node: n,
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierUnknown, expireFunc)
return nil
}
@ -124,7 +127,7 @@ func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode {
Disconnect: make(chan struct{}),
}
s.nodes[n.ID] = rn
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc)
rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierNormal, expireFunc)
return rn
}
@ -154,7 +157,7 @@ func (s *nodeStore) Heartbeat(id, sid string) (time.Duration, error) {
return 0, err
}
period := s.periodChooser.Choose() // base period for node
grace := period * time.Duration(s.gracePeriodMultiplier)
grace := period * time.Duration(s.gracePeriodMultiplierNormal)
rn.mu.Lock()
rn.Heartbeat.Update(grace)
rn.Heartbeat.Beat()

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/controlapi/hackpicker"
"github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
@ -89,9 +90,11 @@ type Manager struct {
server *grpc.Server
localserver *grpc.Server
RaftNode *raft.Node
connSelector *raftpicker.ConnSelector
mu sync.Mutex
started chan struct{}
stopped chan struct{}
}
@ -220,6 +223,7 @@ func New(config *Config) (*Manager, error) {
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
RaftNode: RaftNode,
started: make(chan struct{}),
stopped: make(chan struct{}),
}
@ -428,11 +432,23 @@ func (m *Manager) Run(parent context.Context) error {
}()
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(2 * time.Second),
grpc.WithTimeout(5 * time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
// We need special connSelector for controlapi because it provides automatic
// leader tracking.
// Other APIs are using connSelector which errors out on leader change, but
// allows to react quickly to reelections.
controlAPIProxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
controlAPIConnSelector := hackpicker.NewConnSelector(m.RaftNode, controlAPIProxyOpts...)
authorize := func(ctx context.Context, roles []string) error {
// Authorize the remote roles, ensure they can only be forwarded by managers
@ -463,7 +479,7 @@ func (m *Manager) Run(parent context.Context) error {
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, controlAPIConnSelector, m.RaftNode, forwardAsOwnRequest)
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
@ -506,6 +522,8 @@ func (m *Manager) Run(parent context.Context) error {
return fmt.Errorf("can't initialize raft node: %v", err)
}
close(m.started)
go func() {
err := m.RaftNode.Run(ctx)
if err != nil {
@ -560,12 +578,15 @@ func (m *Manager) Run(parent context.Context) error {
func (m *Manager) Stop(ctx context.Context) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
// starting up.
<-m.started
// the mutex stops us from trying to stop while we're alrady stopping, or
// from returning before we've finished stopping.
m.mu.Lock()
defer m.mu.Unlock()
select {
// check to see that we've already stopped
case <-m.stopped:
return
@ -600,6 +621,9 @@ func (m *Manager) Stop(ctx context.Context) {
m.keyManager.Stop()
}
if m.connSelector != nil {
m.connSelector.Stop()
}
m.RaftNode.Shutdown()
// some time after this point, Run will receive an error from one of these
m.server.Stop()

View file

@ -2,115 +2,45 @@ package raftpicker
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
)
// picker always picks address of cluster leader.
type picker struct {
mu sync.Mutex
addr string
raft AddrSelector
conn *grpc.Conn
cc *grpc.ClientConn
// Interface is interface to replace implementation with controlapi/hackpicker.
// TODO: it should be done cooler.
type Interface interface {
Conn() (*grpc.ClientConn, error)
Reset() error
}
// Init does initial processing for the Picker, e.g., initiate some connections.
func (p *picker) Init(cc *grpc.ClientConn) error {
p.cc = cc
return nil
}
func (p *picker) initConn() error {
if p.conn == nil {
conn, err := grpc.NewConn(p.cc)
if err != nil {
return err
}
p.conn = conn
}
return nil
}
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// or some error happens.
func (p *picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Lock()
if err := p.initConn(); err != nil {
p.mu.Unlock()
return nil, err
}
p.mu.Unlock()
addr, err := p.raft.LeaderAddr()
if err != nil {
return nil, err
}
p.mu.Lock()
if p.addr != addr {
p.addr = addr
p.conn.NotifyReset()
}
p.mu.Unlock()
return p.conn.Wait(ctx)
}
// PickAddr picks a peer address for connecting. This will be called repeated for
// connecting/reconnecting.
func (p *picker) PickAddr() (string, error) {
addr, err := p.raft.LeaderAddr()
if err != nil {
return "", err
}
p.mu.Lock()
p.addr = addr
p.mu.Unlock()
return addr, nil
}
// State returns the connectivity state of the underlying connections.
func (p *picker) State() (grpc.ConnectivityState, error) {
return p.conn.State(), nil
}
// WaitForStateChange blocks until the state changes to something other than
// the sourceState. It returns the new state or error.
func (p *picker) WaitForStateChange(ctx context.Context, sourceState grpc.ConnectivityState) (grpc.ConnectivityState, error) {
return p.conn.WaitForStateChange(ctx, sourceState)
}
// Reset the current connection and force a reconnect to another address.
func (p *picker) Reset() error {
p.conn.NotifyReset()
return nil
}
// Close closes all the Conn's owned by this Picker.
func (p *picker) Close() error {
return p.conn.Close()
}
// ConnSelector is struct for obtaining connection with raftpicker.
// ConnSelector is struct for obtaining connection connected to cluster leader.
type ConnSelector struct {
mu sync.Mutex
cc *grpc.ClientConn
cluster RaftCluster
opts []grpc.DialOption
cc *grpc.ClientConn
addr string
stop chan struct{}
}
// NewConnSelector returns new ConnSelector with cluster and grpc.DialOpts which
// will be used for Dial on first call of Conn.
// will be used for connection create.
func NewConnSelector(cluster RaftCluster, opts ...grpc.DialOption) *ConnSelector {
return &ConnSelector{
cs := &ConnSelector{
cluster: cluster,
opts: opts,
stop: make(chan struct{}),
}
go cs.updateLoop()
return cs
}
// Conn returns *grpc.ClientConn with picker which picks raft cluster leader.
// Internal connection estabilished lazily on this call.
// Conn returns *grpc.ClientConn which connected to cluster leader.
// It can return error if cluster wasn't ready at the moment of initial call.
func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
c.mu.Lock()
@ -122,12 +52,76 @@ func (c *ConnSelector) Conn() (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
picker := &picker{raft: c.cluster, addr: addr}
opts := append(c.opts, grpc.WithPicker(picker))
cc, err := grpc.Dial(addr, opts...)
cc, err := grpc.Dial(addr, c.opts...)
if err != nil {
return nil, err
}
c.cc = cc
return c.cc, nil
c.addr = addr
return cc, nil
}
// Reset recreates underlying connection.
func (c *ConnSelector) Reset() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.cc != nil {
c.cc.Close()
c.cc = nil
}
addr, err := c.cluster.LeaderAddr()
if err != nil {
logrus.WithError(err).Errorf("error obtaining leader address")
return err
}
cc, err := grpc.Dial(addr, c.opts...)
if err != nil {
logrus.WithError(err).Errorf("error reestabilishing connection to leader")
return err
}
c.cc = cc
c.addr = addr
return nil
}
// Stop cancels updating connection loop.
func (c *ConnSelector) Stop() {
close(c.stop)
}
func (c *ConnSelector) updateConn() error {
addr, err := c.cluster.LeaderAddr()
if err != nil {
return err
}
c.mu.Lock()
defer c.mu.Unlock()
if c.addr != addr {
if c.cc != nil {
c.cc.Close()
c.cc = nil
}
conn, err := grpc.Dial(addr, c.opts...)
if err != nil {
return err
}
c.cc = conn
c.addr = addr
}
return nil
}
func (c *ConnSelector) updateLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.updateConn(); err != nil {
logrus.WithError(err).Errorf("error reestabilishing connection to leader")
}
case <-c.stop:
return
}
}
}

View file

@ -61,8 +61,8 @@ func (s *Scheduler) setupTasksList(tx store.ReadTx) error {
tasksByNode := make(map[string]map[string]*api.Task)
for _, t := range tasks {
// Ignore all tasks that have not reached ALLOCATED
// state.
if t.Status.State < api.TaskStateAllocated {
// state and tasks that no longer consume resources.
if t.Status.State < api.TaskStateAllocated || t.Status.State > api.TaskStateRunning {
continue
}
@ -109,8 +109,31 @@ func (s *Scheduler) Run(ctx context.Context) error {
// Queue all unassigned tasks before processing changes.
s.tick(ctx)
const (
// commitDebounceGap is the amount of time to wait between
// commit events to debounce them.
commitDebounceGap = 50 * time.Millisecond
// maxLatency is a time limit on the debouncing.
maxLatency = time.Second
)
var (
debouncingStarted time.Time
commitDebounceTimer *time.Timer
commitDebounceTimeout <-chan time.Time
)
pendingChanges := 0
schedule := func() {
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
}
}
// Watch for changes.
for {
select {
@ -131,15 +154,25 @@ func (s *Scheduler) Run(ctx context.Context) error {
case state.EventDeleteNode:
s.nodeHeap.remove(v.Node.ID)
case state.EventCommit:
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
if commitDebounceTimer != nil {
if time.Since(debouncingStarted) > maxLatency {
commitDebounceTimer.Stop()
commitDebounceTimer = nil
commitDebounceTimeout = nil
schedule()
} else {
commitDebounceTimer.Reset(commitDebounceGap)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
} else {
commitDebounceTimer = time.NewTimer(commitDebounceGap)
commitDebounceTimeout = commitDebounceTimer.C
debouncingStarted = time.Now()
}
}
case <-commitDebounceTimeout:
schedule()
commitDebounceTimer = nil
commitDebounceTimeout = nil
case <-s.stopChan:
return nil
}

View file

@ -115,7 +115,7 @@ func (c *Cluster) RemoveMember(id uint64) error {
// ReplaceMemberConnection replaces the member's GRPC connection and GRPC
// client.
func (c *Cluster) ReplaceMemberConnection(id uint64, newConn *Member) error {
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -124,6 +124,12 @@ func (c *Cluster) ReplaceMemberConnection(id uint64, newConn *Member) error {
return ErrIDNotFound
}
if oldConn.Conn != oldMember.Conn {
// The connection was already replaced. Don't do it again.
newConn.Conn.Close()
return nil
}
oldMember.Conn.Close()
newMember := *oldMember

View file

@ -95,8 +95,8 @@ type Node struct {
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
wasLeader bool
restored bool
signalledLeadership uint32
isMember uint32
joinAddr string
@ -104,10 +104,6 @@ type Node struct {
// shutting down the node.
waitProp sync.WaitGroup
// forceNewCluster is a special flag used to recover from disaster
// scenario by pointing to an existing or backed up data directory.
forceNewCluster bool
confState raftpb.ConfState
appliedIndex uint64
snapshotIndex uint64
@ -192,7 +188,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
MaxInflightMsgs: cfg.MaxInflightMsgs,
Logger: cfg.Logger,
},
forceNewCluster: opts.ForceNewCluster,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
removeRaftCh: make(chan struct{}),
@ -329,6 +324,8 @@ func (n *Node) Run(ctx context.Context) error {
close(n.doneCh)
}()
wasLeader := false
for {
select {
case <-n.ticker.C():
@ -358,7 +355,7 @@ func (n *Node) Run(ctx context.Context) error {
// saveToStorage.
if !raft.IsEmptySnap(rd.Snapshot) {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(rd.Snapshot.Data, n.forceNewCluster); err != nil {
if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
n.Config.Logger.Error(err)
}
n.appliedIndex = rd.Snapshot.Metadata.Index
@ -387,12 +384,23 @@ func (n *Node) Run(ctx context.Context) error {
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if n.wasLeader && rd.SoftState.RaftState != raft.StateLeader {
n.wasLeader = false
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
} else if !n.wasLeader && rd.SoftState.RaftState == raft.StateLeader {
n.wasLeader = true
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
@ -451,6 +459,17 @@ func (n *Node) Shutdown() {
}
}
// isShutdown indicates if node was shut down.
// This method should be called under n.stopMu to avoid races with n.stop().
func (n *Node) isShutdown() bool {
select {
case <-n.Ctx.Done():
return true
default:
return false
}
}
func (n *Node) stop() {
n.stopMu.Lock()
defer n.stopMu.Unlock()
@ -493,6 +512,19 @@ func (n *Node) Leader() uint64 {
return n.Node.Status().Lead
}
// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
return atomic.LoadUint32(&n.signalledLeadership) == 1
}
func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}
// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
@ -706,12 +738,24 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
}
// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
}
if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
}
// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
@ -763,7 +807,10 @@ func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressReques
func (n *Node) LeaderAddr() (string, error) {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if n.isShutdown() {
return "", fmt.Errorf("raft node is shut down")
}
ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second)
defer cancel()
if err := WaitForLeader(ctx, n); err != nil {
return "", ErrNoClusterLeader
@ -966,6 +1013,14 @@ func (n *Node) send(messages []raftpb.Message) error {
continue
}
if m.Type == raftpb.MsgProp {
// We don't forward proposals to the leader. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
continue
}
n.asyncTasks.Add(1)
go n.sendToMember(members, m)
}
@ -1047,7 +1102,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
if err != nil {
n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err)
} else {
n.cluster.ReplaceMemberConnection(m.To, newConn)
n.cluster.ReplaceMemberConnection(m.To, conn, newConn)
}
} else if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFinish)
@ -1077,7 +1132,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
ch := n.wait.register(r.ID, cb)
// Do this check after calling register to avoid a race.
if !n.IsLeader() {
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}

View file

@ -10,6 +10,7 @@ import (
"sort"
"strings"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
}
n.wal, err = wal.Create(n.walDir(), metadata)
if err != nil {
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
}
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
@ -127,7 +128,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
repaired := false
for {
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
return fmt.Errorf("open wal error: %v", err)
return fmt.Errorf("open WAL error: %v", err)
}
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
if err := n.wal.Close(); err != nil {
@ -135,7 +136,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
}
if !wal.Repair(n.walDir()) {
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
var raftNode api.RaftMember
if err := raftNode.Unmarshal(metadata); err != nil {
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
}
n.Config.ID = raftNode.RaftID
@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
// This means that if the current snapshot doesn't appear in the
// directory for some strange reason, we won't delete anything, which
// is the safe behavior.
curSnapshotIdx := -1
var (
afterCurSnapshot bool
removeErr error
oldestSnapshot string
)
for i, snapFile := range snapshots {
if afterCurSnapshot {
if uint64(len(snapshots)-i) <= keepOldSnapshots {
return removeErr
}
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
}
continue
}
} else if snapFile == curSnapshot {
afterCurSnapshot = true
curSnapshotIdx = i
}
oldestSnapshot = snapFile
}
if removeErr != nil {
return removeErr
}
// Remove any WAL files that only contain data from before the oldest
// remaining snapshot.
if oldestSnapshot == "" {
return nil
}
// Parse index out of oldest snapshot's filename
var snapTerm, snapIndex uint64
_, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex)
if err != nil {
return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err)
}
// List the WALs
dirents, err = ioutil.ReadDir(n.walDir())
if err != nil {
return err
}
var wals []string
for _, dirent := range dirents {
if strings.HasSuffix(dirent.Name(), ".wal") {
wals = append(wals, dirent.Name())
}
}
return removeErr
// Sort WAL filenames in lexical order
sort.Sort(sort.StringSlice(wals))
found := false
deleteUntil := -1
for i, walName := range wals {
var walSeq, walIndex uint64
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
if err != nil {
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
}
if walIndex >= snapIndex {
deleteUntil = i - 1
found = true
break
}
}
// If all WAL files started with indices below the oldest snapshot's
// index, we can delete all but the newest WAL file.
if !found && len(wals) != 0 {
deleteUntil = len(wals) - 1
}
for i := 0; i < deleteUntil; i++ {
walPath := filepath.Join(n.walDir(), wals[i])
l, err := fileutil.NewLock(walPath)
if err != nil {
continue
}
err = l.TryLock()
if err != nil {
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
}
err = os.Remove(walPath)
l.Unlock()
l.Destroy()
if err != nil {
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
}
}
return nil
}
func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {

View file

@ -15,6 +15,10 @@ import (
var errRemotesUnavailable = fmt.Errorf("no remote hosts provided")
// DefaultObservationWeight provides a weight to use for positive observations
// that will balance well under repeated observations.
const DefaultObservationWeight = 10
// Remotes keeps track of remote addresses by weight, informed by
// observations.
type Remotes interface {
@ -49,7 +53,7 @@ func NewRemotes(peers ...api.Peer) Remotes {
}
for _, peer := range peers {
mwr.Observe(peer, 1)
mwr.Observe(peer, DefaultObservationWeight)
}
return mwr
@ -96,7 +100,7 @@ func (mwr *remotesWeightedRandom) Select(excludes ...string) (api.Peer, error) {
// bias to zero-weighted remotes have same probability. otherwise, we
// always select first entry when all are zero.
const bias = 0.1
const bias = 0.001
// clear out workspace
mwr.cdf = mwr.cdf[:0]
@ -165,7 +169,7 @@ const (
// See
// https://en.wikipedia.org/wiki/Exponential_smoothing#Basic_exponential_smoothing
// for details.
remoteWeightSmoothingFactor = 0.7
remoteWeightSmoothingFactor = 0.5
remoteWeightMax = 1 << 8
)
@ -228,7 +232,7 @@ func (p *Picker) Init(cc *grpc.ClientConn) error {
peer := p.peer
p.mu.Unlock()
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
c, err := grpc.NewConn(cc)
if err != nil {
return err
@ -248,7 +252,7 @@ func (p *Picker) Pick(ctx context.Context) (transport.ClientTransport, error) {
p.mu.Unlock()
transport, err := p.conn.Wait(ctx)
if err != nil {
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}
return transport, err
@ -261,7 +265,7 @@ func (p *Picker) PickAddr() (string, error) {
peer := p.peer
p.mu.Unlock()
p.r.ObserveIfExists(peer, -1) // downweight the current addr
p.r.ObserveIfExists(peer, -DefaultObservationWeight) // downweight the current addr
var err error
peer, err = p.r.Select()
@ -299,15 +303,15 @@ func (p *Picker) WaitForStateChange(ctx context.Context, sourceState grpc.Connec
// TODO(stevvooe): This is questionable, but we'll see how it works.
switch state {
case grpc.Idle:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Connecting:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.Ready:
p.r.ObserveIfExists(peer, 1)
p.r.ObserveIfExists(peer, DefaultObservationWeight)
case grpc.TransientFailure:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
case grpc.Shutdown:
p.r.ObserveIfExists(peer, -1)
p.r.ObserveIfExists(peer, -DefaultObservationWeight)
}
return state, err