123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974 |
- package raft
- import (
- "fmt"
- "math"
- "math/rand"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/Sirupsen/logrus"
- "github.com/coreos/etcd/pkg/idutil"
- "github.com/coreos/etcd/raft"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/docker/docker/pkg/signal"
- "github.com/docker/go-events"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/ca"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/raftselector"
- "github.com/docker/swarmkit/manager/state"
- "github.com/docker/swarmkit/manager/state/raft/membership"
- "github.com/docker/swarmkit/manager/state/raft/storage"
- "github.com/docker/swarmkit/manager/state/raft/transport"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/docker/swarmkit/watch"
- "github.com/gogo/protobuf/proto"
- "github.com/pivotal-golang/clock"
- "github.com/pkg/errors"
- "golang.org/x/net/context"
- "golang.org/x/time/rate"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/peer"
- )
- var (
- // ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
- ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
- // ErrConfChangeRefused is returned when there is an issue with the configuration change
- ErrConfChangeRefused = errors.New("raft: propose configuration change refused")
- // ErrApplyNotSpecified is returned during the creation of a raft node when no apply method was provided
- ErrApplyNotSpecified = errors.New("raft: apply method was not specified")
- // ErrSetHardState is returned when the node fails to set the hard state
- ErrSetHardState = errors.New("raft: failed to set the hard state for log append entry")
- // ErrStopped is returned when an operation was submitted but the node was stopped in the meantime
- ErrStopped = errors.New("raft: failed to process the request: node is stopped")
- // ErrLostLeadership is returned when an operation was submitted but the node lost leader status before it became committed
- ErrLostLeadership = errors.New("raft: failed to process the request: node lost leader status")
- // ErrRequestTooLarge is returned when a raft internal message is too large to be sent
- ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
- // ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
- ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
- // ErrNoClusterLeader is thrown when the cluster has no elected leader
- ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
- // ErrMemberUnknown is sent in response to a message from an
- // unrecognized peer.
- ErrMemberUnknown = errors.New("raft: member unknown")
- // work around lint
- lostQuorumMessage = "The swarm does not have a leader. It's possible that too few managers are online. Make sure more than half of the managers are online."
- errLostQuorum = errors.New(lostQuorumMessage)
- )
- // LeadershipState indicates whether the node is a leader or follower.
- type LeadershipState int
- const (
- // IsLeader indicates that the node is a raft leader.
- IsLeader LeadershipState = iota
- // IsFollower indicates that the node is a raft follower.
- IsFollower
- // lostQuorumTimeout is the number of ticks that can elapse with no
- // leader before LeaderConn starts returning an error right away.
- lostQuorumTimeout = 10
- )
- // EncryptionKeys are the current and, if necessary, pending DEKs with which to
- // encrypt raft data
- type EncryptionKeys struct {
- CurrentDEK []byte
- PendingDEK []byte
- }
- // EncryptionKeyRotator is an interface to find out if any keys need rotating.
- type EncryptionKeyRotator interface {
- GetKeys() EncryptionKeys
- UpdateKeys(EncryptionKeys) error
- NeedsRotation() bool
- RotationNotify() chan struct{}
- }
- // Node represents the Raft Node useful
- // configuration.
- type Node struct {
- raftNode raft.Node
- cluster *membership.Cluster
- transport *transport.Transport
- raftStore *raft.MemoryStorage
- memoryStore *store.MemoryStore
- Config *raft.Config
- opts NodeOptions
- reqIDGen *idutil.Generator
- wait *wait
- campaignWhenAble bool
- signalledLeadership uint32
- isMember uint32
- bootstrapMembers []*api.RaftMember
- // waitProp waits for all the proposals to be terminated before
- // shutting down the node.
- waitProp sync.WaitGroup
- confState raftpb.ConfState
- appliedIndex uint64
- snapshotMeta raftpb.SnapshotMetadata
- writtenWALIndex uint64
- ticker clock.Ticker
- doneCh chan struct{}
- // RemovedFromRaft notifies about node deletion from raft cluster
- RemovedFromRaft chan struct{}
- cancelFunc func()
- // removeRaftCh notifies about node deletion from raft cluster
- removeRaftCh chan struct{}
- removeRaftOnce sync.Once
- leadershipBroadcast *watch.Queue
- // used to coordinate shutdown
- // Lock should be used only in stop(), all other functions should use RLock.
- stopMu sync.RWMutex
- // used for membership management checks
- membershipLock sync.Mutex
- // synchronizes access to n.opts.Addr, and makes sure the address is not
- // updated concurrently with JoinAndStart.
- addrLock sync.Mutex
- snapshotInProgress chan raftpb.SnapshotMetadata
- asyncTasks sync.WaitGroup
- // stopped chan is used for notifying grpc handlers that raft node going
- // to stop.
- stopped chan struct{}
- raftLogger *storage.EncryptedRaftLogger
- keyRotator EncryptionKeyRotator
- rotationQueued bool
- clearData bool
- waitForAppliedIndex uint64
- ticksWithNoLeader uint32
- }
- // NodeOptions provides node-level options.
- type NodeOptions struct {
- // ID is the node's ID, from its certificate's CN field.
- ID string
- // Addr is the address of this node's listener
- Addr string
- // ForceNewCluster defines if we have to force a new cluster
- // because we are recovering from a backup data directory.
- ForceNewCluster bool
- // JoinAddr is the cluster to join. May be an empty string to create
- // a standalone cluster.
- JoinAddr string
- // Config is the raft config.
- Config *raft.Config
- // StateDir is the directory to store durable state.
- StateDir string
- // TickInterval interval is the time interval between raft ticks.
- TickInterval time.Duration
- // ClockSource is a Clock interface to use as a time base.
- // Leave this nil except for tests that are designed not to run in real
- // time.
- ClockSource clock.Clock
- // SendTimeout is the timeout on the sending messages to other raft
- // nodes. Leave this as 0 to get the default value.
- SendTimeout time.Duration
- TLSCredentials credentials.TransportCredentials
- KeyRotator EncryptionKeyRotator
- // DisableStackDump prevents Run from dumping goroutine stacks when the
- // store becomes stuck.
- DisableStackDump bool
- }
- func init() {
- rand.Seed(time.Now().UnixNano())
- }
- // NewNode generates a new Raft node
- func NewNode(opts NodeOptions) *Node {
- cfg := opts.Config
- if cfg == nil {
- cfg = DefaultNodeConfig()
- }
- if opts.TickInterval == 0 {
- opts.TickInterval = time.Second
- }
- if opts.SendTimeout == 0 {
- opts.SendTimeout = 2 * time.Second
- }
- raftStore := raft.NewMemoryStorage()
- n := &Node{
- cluster: membership.NewCluster(),
- raftStore: raftStore,
- opts: opts,
- Config: &raft.Config{
- ElectionTick: cfg.ElectionTick,
- HeartbeatTick: cfg.HeartbeatTick,
- Storage: raftStore,
- MaxSizePerMsg: cfg.MaxSizePerMsg,
- MaxInflightMsgs: cfg.MaxInflightMsgs,
- Logger: cfg.Logger,
- CheckQuorum: cfg.CheckQuorum,
- },
- doneCh: make(chan struct{}),
- RemovedFromRaft: make(chan struct{}),
- stopped: make(chan struct{}),
- leadershipBroadcast: watch.NewQueue(),
- keyRotator: opts.KeyRotator,
- }
- n.memoryStore = store.NewMemoryStore(n)
- if opts.ClockSource == nil {
- n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
- } else {
- n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
- }
- n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
- n.wait = newWait()
- n.cancelFunc = func(n *Node) func() {
- var cancelOnce sync.Once
- return func() {
- cancelOnce.Do(func() {
- close(n.stopped)
- })
- }
- }(n)
- return n
- }
- // IsIDRemoved reports if member with id was removed from cluster.
- // Part of transport.Raft interface.
- func (n *Node) IsIDRemoved(id uint64) bool {
- return n.cluster.IsIDRemoved(id)
- }
- // NodeRemoved signals that node was removed from cluster and should stop.
- // Part of transport.Raft interface.
- func (n *Node) NodeRemoved() {
- n.removeRaftOnce.Do(func() {
- atomic.StoreUint32(&n.isMember, 0)
- close(n.RemovedFromRaft)
- })
- }
- // ReportSnapshot reports snapshot status to underlying raft node.
- // Part of transport.Raft interface.
- func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
- n.raftNode.ReportSnapshot(id, status)
- }
- // ReportUnreachable reports to underlying raft node that member with id is
- // unreachable.
- // Part of transport.Raft interface.
- func (n *Node) ReportUnreachable(id uint64) {
- n.raftNode.ReportUnreachable(id)
- }
- // SetAddr provides the raft node's address. This can be used in cases where
- // opts.Addr was not provided to NewNode, for example when a port was not bound
- // until after the raft node was created.
- func (n *Node) SetAddr(ctx context.Context, addr string) error {
- n.addrLock.Lock()
- defer n.addrLock.Unlock()
- n.opts.Addr = addr
- if !n.IsMember() {
- return nil
- }
- newRaftMember := &api.RaftMember{
- RaftID: n.Config.ID,
- NodeID: n.opts.ID,
- Addr: addr,
- }
- if err := n.cluster.UpdateMember(n.Config.ID, newRaftMember); err != nil {
- return err
- }
- // If the raft node is running, submit a configuration change
- // with the new address.
- // TODO(aaronl): Currently, this node must be the leader to
- // submit this configuration change. This works for the initial
- // use cases (single-node cluster late binding ports, or calling
- // SetAddr before joining a cluster). In the future, we may want
- // to support having a follower proactively change its remote
- // address.
- leadershipCh, cancelWatch := n.SubscribeLeadership()
- defer cancelWatch()
- ctx, cancelCtx := n.WithContext(ctx)
- defer cancelCtx()
- isLeader := atomic.LoadUint32(&n.signalledLeadership) == 1
- for !isLeader {
- select {
- case leadershipChange := <-leadershipCh:
- if leadershipChange == IsLeader {
- isLeader = true
- }
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return n.updateNodeBlocking(ctx, n.Config.ID, addr)
- }
- // WithContext returns context which is cancelled when parent context cancelled
- // or node is stopped.
- func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(ctx)
- go func() {
- select {
- case <-ctx.Done():
- case <-n.stopped:
- cancel()
- }
- }()
- return ctx, cancel
- }
- func (n *Node) initTransport() {
- transportConfig := &transport.Config{
- HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
- SendTimeout: n.opts.SendTimeout,
- Credentials: n.opts.TLSCredentials,
- Raft: n,
- }
- n.transport = transport.New(transportConfig)
- }
- // JoinAndStart joins and starts the raft server
- func (n *Node) JoinAndStart(ctx context.Context) (err error) {
- ctx, cancel := n.WithContext(ctx)
- defer func() {
- cancel()
- if err != nil {
- n.stopMu.Lock()
- // to shutdown transport
- close(n.stopped)
- n.stopMu.Unlock()
- n.done()
- } else {
- atomic.StoreUint32(&n.isMember, 1)
- }
- }()
- loadAndStartErr := n.loadAndStart(ctx, n.opts.ForceNewCluster)
- if loadAndStartErr != nil && loadAndStartErr != storage.ErrNoWAL {
- return loadAndStartErr
- }
- snapshot, err := n.raftStore.Snapshot()
- // Snapshot never returns an error
- if err != nil {
- panic("could not get snapshot of raft store")
- }
- n.confState = snapshot.Metadata.ConfState
- n.appliedIndex = snapshot.Metadata.Index
- n.snapshotMeta = snapshot.Metadata
- n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
- n.addrLock.Lock()
- defer n.addrLock.Unlock()
- // override the module field entirely, since etcd/raft is not exactly a submodule
- n.Config.Logger = log.G(ctx).WithField("module", "raft")
- // restore from snapshot
- if loadAndStartErr == nil {
- if n.opts.JoinAddr != "" {
- log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
- }
- n.campaignWhenAble = true
- n.initTransport()
- n.raftNode = raft.RestartNode(n.Config)
- return nil
- }
- // first member of cluster
- if n.opts.JoinAddr == "" {
- // First member in the cluster, self-assign ID
- n.Config.ID = uint64(rand.Int63()) + 1
- peer, err := n.newRaftLogs(n.opts.ID)
- if err != nil {
- return err
- }
- n.campaignWhenAble = true
- n.initTransport()
- n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
- return nil
- }
- // join to existing cluster
- if n.opts.Addr == "" {
- return errors.New("attempted to join raft cluster without knowing own address")
- }
- conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
- if err != nil {
- return err
- }
- defer conn.Close()
- client := api.NewRaftMembershipClient(conn)
- joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
- defer joinCancel()
- resp, err := client.Join(joinCtx, &api.JoinRequest{
- Addr: n.opts.Addr,
- })
- if err != nil {
- return err
- }
- n.Config.ID = resp.RaftID
- if _, err := n.newRaftLogs(n.opts.ID); err != nil {
- return err
- }
- n.bootstrapMembers = resp.Members
- n.initTransport()
- n.raftNode = raft.StartNode(n.Config, nil)
- return nil
- }
- // DefaultNodeConfig returns the default config for a
- // raft node that can be modified and customized
- func DefaultNodeConfig() *raft.Config {
- return &raft.Config{
- HeartbeatTick: 1,
- ElectionTick: 3,
- MaxSizePerMsg: math.MaxUint16,
- MaxInflightMsgs: 256,
- Logger: log.L,
- CheckQuorum: true,
- }
- }
- // DefaultRaftConfig returns a default api.RaftConfig.
- func DefaultRaftConfig() api.RaftConfig {
- return api.RaftConfig{
- KeepOldSnapshots: 0,
- SnapshotInterval: 10000,
- LogEntriesForSlowFollowers: 500,
- ElectionTick: 3,
- HeartbeatTick: 1,
- }
- }
- // MemoryStore returns the memory store that is kept in sync with the raft log.
- func (n *Node) MemoryStore() *store.MemoryStore {
- return n.memoryStore
- }
- func (n *Node) done() {
- n.cluster.Clear()
- n.ticker.Stop()
- n.leadershipBroadcast.Close()
- n.cluster.PeersBroadcast.Close()
- n.memoryStore.Close()
- if n.transport != nil {
- n.transport.Stop()
- }
- close(n.doneCh)
- }
- // ClearData tells the raft node to delete its WALs, snapshots, and keys on
- // shutdown.
- func (n *Node) ClearData() {
- n.clearData = true
- }
- // Run is the main loop for a Raft node, it goes along the state machine,
- // acting on the messages received from other Raft nodes in the cluster.
- //
- // Before running the main loop, it first starts the raft node based on saved
- // cluster state. If no saved state exists, it starts a single-node cluster.
- func (n *Node) Run(ctx context.Context) error {
- ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
- ctx, cancel := context.WithCancel(ctx)
- for _, node := range n.bootstrapMembers {
- if err := n.registerNode(node); err != nil {
- log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
- }
- }
- defer func() {
- cancel()
- n.stop(ctx)
- if n.clearData {
- // Delete WAL and snapshots, since they are no longer
- // usable.
- if err := n.raftLogger.Clear(ctx); err != nil {
- log.G(ctx).WithError(err).Error("failed to move wal after node removal")
- }
- // clear out the DEKs
- if err := n.keyRotator.UpdateKeys(EncryptionKeys{}); err != nil {
- log.G(ctx).WithError(err).Error("could not remove DEKs")
- }
- }
- n.done()
- }()
- wasLeader := false
- transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)
- for {
- select {
- case <-n.ticker.C():
- n.raftNode.Tick()
- if n.leader() == raft.None {
- atomic.AddUint32(&n.ticksWithNoLeader, 1)
- } else {
- atomic.StoreUint32(&n.ticksWithNoLeader, 0)
- }
- case rd := <-n.raftNode.Ready():
- raftConfig := n.getCurrentRaftConfig()
- // Save entries to storage
- if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
- return errors.Wrap(err, "failed to save entries to storage")
- }
- if wasLeader &&
- (rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
- n.memoryStore.Wedged() &&
- transferLeadershipLimit.Allow() {
- if !n.opts.DisableStackDump {
- signal.DumpStacks("")
- }
- transferee, err := n.transport.LongestActive()
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to get longest-active member")
- } else {
- log.G(ctx).Error("data store lock held too long - transferring leadership")
- n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
- }
- }
- for _, msg := range rd.Messages {
- // Send raft messages to peers
- if err := n.transport.Send(msg); err != nil {
- log.G(ctx).WithError(err).Error("failed to send message to member")
- }
- }
- // Apply snapshot to memory store. The snapshot
- // was applied to the raft store in
- // saveToStorage.
- if !raft.IsEmptySnap(rd.Snapshot) {
- // Load the snapshot data into the store
- if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
- log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
- }
- n.appliedIndex = rd.Snapshot.Metadata.Index
- n.snapshotMeta = rd.Snapshot.Metadata
- n.confState = rd.Snapshot.Metadata.ConfState
- }
- // If we cease to be the leader, we must cancel any
- // proposals that are currently waiting for a quorum to
- // acknowledge them. It is still possible for these to
- // become committed, but if that happens we will apply
- // them as any follower would.
- // It is important that we cancel these proposals before
- // calling processCommitted, so processCommitted does
- // not deadlock.
- if rd.SoftState != nil {
- if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
- wasLeader = false
- if atomic.LoadUint32(&n.signalledLeadership) == 1 {
- atomic.StoreUint32(&n.signalledLeadership, 0)
- n.leadershipBroadcast.Publish(IsFollower)
- }
- // It is important that we set n.signalledLeadership to 0
- // before calling n.wait.cancelAll. When a new raft
- // request is registered, it checks n.signalledLeadership
- // afterwards, and cancels the registration if it is 0.
- // If cancelAll was called first, this call might run
- // before the new request registers, but
- // signalledLeadership would be set after the check.
- // Setting signalledLeadership before calling cancelAll
- // ensures that if a new request is registered during
- // this transition, it will either be cancelled by
- // cancelAll, or by its own check of signalledLeadership.
- n.wait.cancelAll()
- } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
- wasLeader = true
- }
- }
- // Process committed entries
- for _, entry := range rd.CommittedEntries {
- if err := n.processCommitted(ctx, entry); err != nil {
- log.G(ctx).WithError(err).Error("failed to process committed entries")
- }
- }
- // in case the previous attempt to update the key failed
- n.maybeMarkRotationFinished(ctx)
- // Trigger a snapshot every once in awhile
- if n.snapshotInProgress == nil &&
- (n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
- n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
- n.doSnapshot(ctx, raftConfig)
- }
- if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
- // If all the entries in the log have become
- // committed, broadcast our leadership status.
- if n.caughtUp() {
- atomic.StoreUint32(&n.signalledLeadership, 1)
- n.leadershipBroadcast.Publish(IsLeader)
- }
- }
- // Advance the state machine
- n.raftNode.Advance()
- // On the first startup, or if we are the only
- // registered member after restoring from the state,
- // campaign to be the leader.
- if n.campaignWhenAble {
- members := n.cluster.Members()
- if len(members) >= 1 {
- n.campaignWhenAble = false
- }
- if len(members) == 1 && members[n.Config.ID] != nil {
- n.raftNode.Campaign(ctx)
- }
- }
- case snapshotMeta := <-n.snapshotInProgress:
- raftConfig := n.getCurrentRaftConfig()
- if snapshotMeta.Index > n.snapshotMeta.Index {
- n.snapshotMeta = snapshotMeta
- if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
- log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
- }
- }
- n.snapshotInProgress = nil
- n.maybeMarkRotationFinished(ctx)
- if n.rotationQueued && n.needsSnapshot(ctx) {
- // there was a key rotation that took place before while the snapshot
- // was in progress - we have to take another snapshot and encrypt with the new key
- n.rotationQueued = false
- n.doSnapshot(ctx, raftConfig)
- }
- case <-n.keyRotator.RotationNotify():
- // There are 2 separate checks: rotationQueued, and n.needsSnapshot().
- // We set rotationQueued so that when we are notified of a rotation, we try to
- // do a snapshot as soon as possible. However, if there is an error while doing
- // the snapshot, we don't want to hammer the node attempting to do snapshots over
- // and over. So if doing a snapshot fails, wait until the next entry comes in to
- // try again.
- switch {
- case n.snapshotInProgress != nil:
- n.rotationQueued = true
- case n.needsSnapshot(ctx):
- n.doSnapshot(ctx, n.getCurrentRaftConfig())
- }
- case <-ctx.Done():
- return nil
- }
- }
- }
- func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
- snapCluster, err := n.clusterSnapshot(data)
- if err != nil {
- return err
- }
- oldMembers := n.cluster.Members()
- for _, member := range snapCluster.Members {
- delete(oldMembers, member.RaftID)
- }
- for _, removedMember := range snapCluster.Removed {
- n.cluster.RemoveMember(removedMember)
- n.transport.RemovePeer(removedMember)
- delete(oldMembers, removedMember)
- }
- for id, member := range oldMembers {
- n.cluster.ClearMember(id)
- if err := n.transport.RemovePeer(member.RaftID); err != nil {
- log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
- }
- }
- for _, node := range snapCluster.Members {
- if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
- log.G(ctx).WithError(err).Error("failed to register node from snapshot")
- }
- }
- return nil
- }
- func (n *Node) needsSnapshot(ctx context.Context) bool {
- if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
- keys := n.keyRotator.GetKeys()
- if keys.PendingDEK != nil {
- n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
- // we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
- // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
- // written with the new key to supercede any WAL written with an old DEK.
- n.waitForAppliedIndex = n.writtenWALIndex
- // if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
- // snapshot index, because the rotation cannot be completed until the next snapshot
- if n.waitForAppliedIndex <= n.snapshotMeta.Index {
- n.waitForAppliedIndex = n.snapshotMeta.Index + 1
- }
- log.G(ctx).Debugf(
- "beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
- }
- }
- result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
- if result {
- log.G(ctx).Debugf(
- "a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
- n.waitForAppliedIndex, n.appliedIndex)
- }
- return result
- }
- func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
- if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
- // this means we tried to rotate - so finish the rotation
- if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
- log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
- } else {
- log.G(ctx).Debugf(
- "a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
- n.snapshotMeta.Index, n.waitForAppliedIndex)
- n.waitForAppliedIndex = 0
- if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
- log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
- }
- }
- }
- }
- func (n *Node) getCurrentRaftConfig() api.RaftConfig {
- raftConfig := DefaultRaftConfig()
- n.memoryStore.View(func(readTx store.ReadTx) {
- clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
- if err == nil && len(clusters) == 1 {
- raftConfig = clusters[0].Spec.Raft
- }
- })
- return raftConfig
- }
- // Cancel interrupts all ongoing proposals, and prevents new ones from
- // starting. This is useful for the shutdown sequence because it allows
- // the manager to shut down raft-dependent services that might otherwise
- // block on shutdown if quorum isn't met. Then the raft node can be completely
- // shut down once no more code is using it.
- func (n *Node) Cancel() {
- n.cancelFunc()
- }
- // Done returns channel which is closed when raft node is fully stopped.
- func (n *Node) Done() <-chan struct{} {
- return n.doneCh
- }
- func (n *Node) stop(ctx context.Context) {
- n.stopMu.Lock()
- defer n.stopMu.Unlock()
- n.Cancel()
- n.waitProp.Wait()
- n.asyncTasks.Wait()
- n.raftNode.Stop()
- n.ticker.Stop()
- n.raftLogger.Close(ctx)
- atomic.StoreUint32(&n.isMember, 0)
- // TODO(stevvooe): Handle ctx.Done()
- }
- // isLeader checks if we are the leader or not, without the protection of lock
- func (n *Node) isLeader() bool {
- if !n.IsMember() {
- return false
- }
- if n.Status().Lead == n.Config.ID {
- return true
- }
- return false
- }
- // IsLeader checks if we are the leader or not, with the protection of lock
- func (n *Node) IsLeader() bool {
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- return n.isLeader()
- }
- // leader returns the id of the leader, without the protection of lock and
- // membership check, so it's caller task.
- func (n *Node) leader() uint64 {
- return n.Status().Lead
- }
- // Leader returns the id of the leader, with the protection of lock
- func (n *Node) Leader() (uint64, error) {
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if !n.IsMember() {
- return raft.None, ErrNoRaftMember
- }
- leader := n.leader()
- if leader == raft.None {
- return raft.None, ErrNoClusterLeader
- }
- return leader, nil
- }
- // ReadyForProposals returns true if the node has broadcasted a message
- // saying that it has become the leader. This means it is ready to accept
- // proposals.
- func (n *Node) ReadyForProposals() bool {
- return atomic.LoadUint32(&n.signalledLeadership) == 1
- }
- func (n *Node) caughtUp() bool {
- // obnoxious function that always returns a nil error
- lastIndex, _ := n.raftStore.LastIndex()
- return n.appliedIndex >= lastIndex
- }
- // Join asks to a member of the raft to propose
- // a configuration change and add us as a member thus
- // beginning the log replication process. This method
- // is called from an aspiring member to an existing member
- func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
- nodeInfo, err := ca.RemoteNode(ctx)
- if err != nil {
- return nil, err
- }
- fields := logrus.Fields{
- "node.id": nodeInfo.NodeID,
- "method": "(*Node).Join",
- "raft_id": fmt.Sprintf("%x", n.Config.ID),
- }
- if nodeInfo.ForwardedBy != nil {
- fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
- }
- log := log.G(ctx).WithFields(fields)
- log.Debug("")
- // can't stop the raft node while an async RPC is in progress
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- n.membershipLock.Lock()
- defer n.membershipLock.Unlock()
- if !n.IsMember() {
- return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
- }
- if !n.isLeader() {
- return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
- }
- // A single manager must not be able to join the raft cluster twice. If
- // it did, that would cause the quorum to be computed incorrectly. This
- // could happen if the WAL was deleted from an active manager.
- for _, m := range n.cluster.Members() {
- if m.NodeID == nodeInfo.NodeID {
- return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
- }
- }
- // Find a unique ID for the joining member.
- var raftID uint64
- for {
- raftID = uint64(rand.Int63()) + 1
- if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
- break
- }
- }
- remoteAddr := req.Addr
- // If the joining node sent an address like 0.0.0.0:4242, automatically
- // determine its actual address based on the GRPC connection. This
- // avoids the need for a prospective member to know its own address.
- requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
- if err != nil {
- return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
- }
- requestIP := net.ParseIP(requestHost)
- if requestIP != nil && requestIP.IsUnspecified() {
- remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
- if err != nil {
- return nil, err
- }
- remoteAddr = net.JoinHostPort(remoteHost, requestPort)
- }
- // We do not bother submitting a configuration change for the
- // new member if we can't contact it back using its address
- if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
- return nil, err
- }
- err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
- if err != nil {
- log.WithError(err).Errorf("failed to add member %x", raftID)
- return nil, err
- }
- var nodes []*api.RaftMember
- for _, node := range n.cluster.Members() {
- nodes = append(nodes, &api.RaftMember{
- RaftID: node.RaftID,
- NodeID: node.NodeID,
- Addr: node.Addr,
- })
- }
- log.Debugf("node joined")
- return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
- }
- // checkHealth tries to contact an aspiring member through its advertised address
- // and checks if its raft server is running.
- func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
- conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
- if err != nil {
- return err
- }
- defer conn.Close()
- if timeout != 0 {
- tctx, cancel := context.WithTimeout(ctx, timeout)
- defer cancel()
- ctx = tctx
- }
- healthClient := api.NewHealthClient(conn)
- resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
- if err != nil {
- return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
- }
- if resp.Status != api.HealthCheckResponse_SERVING {
- return fmt.Errorf("health check returned status %s", resp.Status.String())
- }
- return nil
- }
- // addMember submits a configuration change to add a new member on the raft cluster.
- func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
- node := api.RaftMember{
- RaftID: raftID,
- NodeID: nodeID,
- Addr: addr,
- }
- meta, err := node.Marshal()
- if err != nil {
- return err
- }
- cc := raftpb.ConfChange{
- Type: raftpb.ConfChangeAddNode,
- NodeID: raftID,
- Context: meta,
- }
- // Wait for a raft round to process the configuration change
- return n.configure(ctx, cc)
- }
- // updateNodeBlocking runs synchronous job to update node address in whole cluster.
- func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
- m := n.cluster.GetMember(id)
- if m == nil {
- return errors.Errorf("member %x is not found for update", id)
- }
- node := api.RaftMember{
- RaftID: m.RaftID,
- NodeID: m.NodeID,
- Addr: addr,
- }
- meta, err := node.Marshal()
- if err != nil {
- return err
- }
- cc := raftpb.ConfChange{
- Type: raftpb.ConfChangeUpdateNode,
- NodeID: id,
- Context: meta,
- }
- // Wait for a raft round to process the configuration change
- return n.configure(ctx, cc)
- }
- // UpdateNode submits a configuration change to change a member's address.
- func (n *Node) UpdateNode(id uint64, addr string) {
- ctx, cancel := n.WithContext(context.Background())
- defer cancel()
- // spawn updating info in raft in background to unblock transport
- go func() {
- if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
- log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
- }
- }()
- }
- // Leave asks to a member of the raft to remove
- // us from the raft cluster. This method is called
- // from a member who is willing to leave its raft
- // membership to an active member of the raft
- func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
- if req.Node == nil {
- return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
- }
- nodeInfo, err := ca.RemoteNode(ctx)
- if err != nil {
- return nil, err
- }
- ctx, cancel := n.WithContext(ctx)
- defer cancel()
- fields := logrus.Fields{
- "node.id": nodeInfo.NodeID,
- "method": "(*Node).Leave",
- "raft_id": fmt.Sprintf("%x", n.Config.ID),
- }
- if nodeInfo.ForwardedBy != nil {
- fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
- }
- log.G(ctx).WithFields(fields).Debug("")
- if err := n.removeMember(ctx, req.Node.RaftID); err != nil {
- return nil, err
- }
- return &api.LeaveResponse{}, nil
- }
- // CanRemoveMember checks if a member can be removed from
- // the context of the current node.
- func (n *Node) CanRemoveMember(id uint64) bool {
- members := n.cluster.Members()
- nreachable := 0 // reachable managers after removal
- for _, m := range members {
- if m.RaftID == id {
- continue
- }
- // Local node from where the remove is issued
- if m.RaftID == n.Config.ID {
- nreachable++
- continue
- }
- if n.transport.Active(m.RaftID) {
- nreachable++
- }
- }
- nquorum := (len(members)-1)/2 + 1
- if nreachable < nquorum {
- return false
- }
- return true
- }
- func (n *Node) removeMember(ctx context.Context, id uint64) error {
- // can't stop the raft node while an async RPC is in progress
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if !n.IsMember() {
- return ErrNoRaftMember
- }
- if !n.isLeader() {
- return ErrLostLeadership
- }
- n.membershipLock.Lock()
- defer n.membershipLock.Unlock()
- if !n.CanRemoveMember(id) {
- return ErrCannotRemoveMember
- }
- cc := raftpb.ConfChange{
- ID: id,
- Type: raftpb.ConfChangeRemoveNode,
- NodeID: id,
- Context: []byte(""),
- }
- return n.configure(ctx, cc)
- }
- // TransferLeadership attempts to transfer leadership to a different node,
- // and wait for the transfer to happen.
- func (n *Node) TransferLeadership(ctx context.Context) error {
- ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
- defer cancelTransfer()
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if !n.IsMember() {
- return ErrNoRaftMember
- }
- if !n.isLeader() {
- return ErrLostLeadership
- }
- transferee, err := n.transport.LongestActive()
- if err != nil {
- return errors.Wrap(err, "failed to get longest-active member")
- }
- start := time.Now()
- n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
- ticker := time.NewTicker(n.opts.TickInterval / 10)
- defer ticker.Stop()
- var leader uint64
- for {
- leader = n.leader()
- if leader != raft.None && leader != n.Config.ID {
- break
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-ticker.C:
- }
- }
- log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
- return nil
- }
- // RemoveMember submits a configuration change to remove a member from the raft cluster
- // after checking if the operation would not result in a loss of quorum.
- func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
- ctx, cancel := n.WithContext(ctx)
- defer cancel()
- return n.removeMember(ctx, id)
- }
- // processRaftMessageLogger is used to lazily create a logger for
- // ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
- // formatting strings and allocating a logger when it won't be used.
- func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
- fields := logrus.Fields{
- "method": "(*Node).ProcessRaftMessage",
- }
- if n.IsMember() {
- fields["raft_id"] = fmt.Sprintf("%x", n.Config.ID)
- }
- if msg != nil && msg.Message != nil {
- fields["from"] = fmt.Sprintf("%x", msg.Message.From)
- }
- return log.G(ctx).WithFields(fields)
- }
- func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
- // too early
- if !n.IsMember() {
- return nil
- }
- p, ok := peer.FromContext(ctx)
- if !ok {
- return nil
- }
- oldAddr, err := n.transport.PeerAddr(id)
- if err != nil {
- return err
- }
- if oldAddr == "" {
- // Don't know the address of the peer yet, so can't report an
- // update.
- return nil
- }
- newHost, _, err := net.SplitHostPort(p.Addr.String())
- if err != nil {
- return err
- }
- _, officialPort, err := net.SplitHostPort(oldAddr)
- if err != nil {
- return err
- }
- newAddr := net.JoinHostPort(newHost, officialPort)
- if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil {
- return err
- }
- return nil
- }
- // ProcessRaftMessage calls 'Step' which advances the
- // raft state machine with the provided message on the
- // receiving node
- func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
- if msg == nil || msg.Message == nil {
- n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
- return &api.ProcessRaftMessageResponse{}, nil
- }
- // Don't process the message if this comes from
- // a node in the remove set
- if n.cluster.IsIDRemoved(msg.Message.From) {
- n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
- return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
- }
- ctx, cancel := n.WithContext(ctx)
- defer cancel()
- // TODO(aaronl): Address changes are temporarily disabled.
- // See https://github.com/docker/docker/issues/30455.
- // This should be reenabled in the future with additional
- // safeguards (perhaps storing multiple addresses per node).
- //if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
- // log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
- //}
- // Reject vote requests from unreachable peers
- if msg.Message.Type == raftpb.MsgVote {
- member := n.cluster.GetMember(msg.Message.From)
- if member == nil {
- n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
- return &api.ProcessRaftMessageResponse{}, nil
- }
- if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
- n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
- return &api.ProcessRaftMessageResponse{}, nil
- }
- }
- if msg.Message.Type == raftpb.MsgProp {
- // We don't accept forwarded proposals. Our
- // current architecture depends on only the leader
- // making proposals, so in-flight proposals can be
- // guaranteed not to conflict.
- n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
- return &api.ProcessRaftMessageResponse{}, nil
- }
- // can't stop the raft node while an async RPC is in progress
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if n.IsMember() {
- if msg.Message.To != n.Config.ID {
- n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
- return &api.ProcessRaftMessageResponse{}, nil
- }
- if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
- n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
- }
- }
- return &api.ProcessRaftMessageResponse{}, nil
- }
- // ResolveAddress returns the address reaching for a given node ID.
- func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
- if !n.IsMember() {
- return nil, ErrNoRaftMember
- }
- nodeInfo, err := ca.RemoteNode(ctx)
- if err != nil {
- return nil, err
- }
- fields := logrus.Fields{
- "node.id": nodeInfo.NodeID,
- "method": "(*Node).ResolveAddress",
- "raft_id": fmt.Sprintf("%x", n.Config.ID),
- }
- if nodeInfo.ForwardedBy != nil {
- fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
- }
- log.G(ctx).WithFields(fields).Debug("")
- member := n.cluster.GetMember(msg.RaftID)
- if member == nil {
- return nil, grpc.Errorf(codes.NotFound, "member %x not found", msg.RaftID)
- }
- return &api.ResolveAddressResponse{Addr: member.Addr}, nil
- }
- func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
- leader, err := n.Leader()
- if err != nil {
- return nil, err
- }
- if leader == n.Config.ID {
- return nil, raftselector.ErrIsLeader
- }
- conn, err := n.transport.PeerConn(leader)
- if err != nil {
- return nil, errors.Wrap(err, "failed to get connection to leader")
- }
- return conn, nil
- }
- // LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
- // if current machine is leader.
- func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
- cc, err := n.getLeaderConn()
- if err == nil {
- return cc, nil
- }
- if err == raftselector.ErrIsLeader {
- return nil, err
- }
- if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
- return nil, errLostQuorum
- }
- ticker := time.NewTicker(1 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- cc, err := n.getLeaderConn()
- if err == nil {
- return cc, nil
- }
- if err == raftselector.ErrIsLeader {
- return nil, err
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- }
- // registerNode registers a new node on the cluster memberlist
- func (n *Node) registerNode(node *api.RaftMember) error {
- if n.cluster.IsIDRemoved(node.RaftID) {
- return nil
- }
- member := &membership.Member{}
- existingMember := n.cluster.GetMember(node.RaftID)
- if existingMember != nil {
- // Member already exists
- // If the address is different from what we thought it was,
- // update it. This can happen if we just joined a cluster
- // and are adding ourself now with the remotely-reachable
- // address.
- if existingMember.Addr != node.Addr {
- if node.RaftID != n.Config.ID {
- if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
- return err
- }
- }
- member.RaftMember = node
- n.cluster.AddMember(member)
- }
- return nil
- }
- // Avoid opening a connection to the local node
- if node.RaftID != n.Config.ID {
- if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
- return err
- }
- }
- member.RaftMember = node
- err := n.cluster.AddMember(member)
- if err != nil {
- if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
- return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
- }
- return err
- }
- return nil
- }
- // ProposeValue calls Propose on the raft and waits
- // on the commit log action before returning a result
- func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
- ctx, cancel := n.WithContext(ctx)
- defer cancel()
- _, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
- if err != nil {
- return err
- }
- return nil
- }
- // GetVersion returns the sequence information for the current raft round.
- func (n *Node) GetVersion() *api.Version {
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if !n.IsMember() {
- return nil
- }
- status := n.Status()
- return &api.Version{Index: status.Commit}
- }
- // ChangesBetween returns the changes starting after "from", up to and
- // including "to". If these changes are not available because the log
- // has been compacted, an error will be returned.
- func (n *Node) ChangesBetween(from, to api.Version) ([]state.Change, error) {
- n.stopMu.RLock()
- defer n.stopMu.RUnlock()
- if from.Index > to.Index {
- return nil, errors.New("versions are out of order")
- }
- if !n.IsMember() {
- return nil, ErrNoRaftMember
- }
- // never returns error
- last, _ := n.raftStore.LastIndex()
- if to.Index > last {
- return nil, errors.New("last version is out of bounds")
- }
- pbs, err := n.raftStore.Entries(from.Index+1, to.Index+1, math.MaxUint64)
- if err != nil {
- return nil, err
- }
- var changes []state.Change
- for _, pb := range pbs {
- if pb.Type != raftpb.EntryNormal || pb.Data == nil {
- continue
- }
- r := &api.InternalRaftRequest{}
- err := proto.Unmarshal(pb.Data, r)
- if err != nil {
- return nil, errors.Wrap(err, "error umarshalling internal raft request")
- }
- if r.Action != nil {
- changes = append(changes, state.Change{StoreActions: r.Action, Version: api.Version{Index: pb.Index}})
- }
- }
- return changes, nil
- }
- // SubscribePeers subscribes to peer updates in cluster. It sends always full
- // list of peers.
- func (n *Node) SubscribePeers() (q chan events.Event, cancel func()) {
- return n.cluster.PeersBroadcast.Watch()
- }
- // GetMemberlist returns the current list of raft members in the cluster.
- func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
- memberlist := make(map[uint64]*api.RaftMember)
- members := n.cluster.Members()
- leaderID, err := n.Leader()
- if err != nil {
- leaderID = raft.None
- }
- for id, member := range members {
- reachability := api.RaftMemberStatus_REACHABLE
- leader := false
- if member.RaftID != n.Config.ID {
- if !n.transport.Active(member.RaftID) {
- reachability = api.RaftMemberStatus_UNREACHABLE
- }
- }
- if member.RaftID == leaderID {
- leader = true
- }
- memberlist[id] = &api.RaftMember{
- RaftID: member.RaftID,
- NodeID: member.NodeID,
- Addr: member.Addr,
- Status: api.RaftMemberStatus{
- Leader: leader,
- Reachability: reachability,
- },
- }
- }
- return memberlist
- }
- // Status returns status of underlying etcd.Node.
- func (n *Node) Status() raft.Status {
- return n.raftNode.Status()
- }
- // GetMemberByNodeID returns member information based
- // on its generic Node ID.
- func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
- members := n.cluster.Members()
- for _, member := range members {
- if member.NodeID == nodeID {
- return member
- }
- }
- return nil
- }
- // IsMember checks if the raft node has effectively joined
- // a cluster of existing members.
- func (n *Node) IsMember() bool {
- return atomic.LoadUint32(&n.isMember) == 1
- }
- // Saves a log entry to our Store
- func (n *Node) saveToStorage(
- ctx context.Context,
- raftConfig *api.RaftConfig,
- hardState raftpb.HardState,
- entries []raftpb.Entry,
- snapshot raftpb.Snapshot,
- ) (err error) {
- if !raft.IsEmptySnap(snapshot) {
- if err := n.raftLogger.SaveSnapshot(snapshot); err != nil {
- return errors.Wrap(err, "failed to save snapshot")
- }
- if err := n.raftLogger.GC(snapshot.Metadata.Index, snapshot.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
- log.G(ctx).WithError(err).Error("unable to clean old snapshots and WALs")
- }
- if err = n.raftStore.ApplySnapshot(snapshot); err != nil {
- return errors.Wrap(err, "failed to apply snapshot on raft node")
- }
- }
- if err := n.raftLogger.SaveEntries(hardState, entries); err != nil {
- return errors.Wrap(err, "failed to save raft log entries")
- }
- if len(entries) > 0 {
- lastIndex := entries[len(entries)-1].Index
- if lastIndex > n.writtenWALIndex {
- n.writtenWALIndex = lastIndex
- }
- }
- if err = n.raftStore.Append(entries); err != nil {
- return errors.Wrap(err, "failed to append raft log entries")
- }
- return nil
- }
- // processInternalRaftRequest sends a message to nodes participating
- // in the raft to apply a log entry and then waits for it to be applied
- // on the server. It will block until the update is performed, there is
- // an error or until the raft node finalizes all the proposals on node
- // shutdown.
- func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
- n.stopMu.RLock()
- if !n.IsMember() {
- n.stopMu.RUnlock()
- return nil, ErrStopped
- }
- n.waitProp.Add(1)
- defer n.waitProp.Done()
- n.stopMu.RUnlock()
- r.ID = n.reqIDGen.Next()
- // This must be derived from the context which is cancelled by stop()
- // to avoid a deadlock on shutdown.
- waitCtx, cancel := context.WithCancel(ctx)
- ch := n.wait.register(r.ID, cb, cancel)
- // Do this check after calling register to avoid a race.
- if atomic.LoadUint32(&n.signalledLeadership) != 1 {
- n.wait.cancel(r.ID)
- return nil, ErrLostLeadership
- }
- data, err := r.Marshal()
- if err != nil {
- n.wait.cancel(r.ID)
- return nil, err
- }
- if len(data) > store.MaxTransactionBytes {
- n.wait.cancel(r.ID)
- return nil, ErrRequestTooLarge
- }
- err = n.raftNode.Propose(waitCtx, data)
- if err != nil {
- n.wait.cancel(r.ID)
- return nil, err
- }
- select {
- case x, ok := <-ch:
- if !ok {
- return nil, ErrLostLeadership
- }
- return x.(proto.Message), nil
- case <-waitCtx.Done():
- n.wait.cancel(r.ID)
- // if channel is closed, wait item was canceled, otherwise it was triggered
- x, ok := <-ch
- if !ok {
- return nil, ErrLostLeadership
- }
- return x.(proto.Message), nil
- case <-ctx.Done():
- n.wait.cancel(r.ID)
- // if channel is closed, wait item was canceled, otherwise it was triggered
- x, ok := <-ch
- if !ok {
- return nil, ctx.Err()
- }
- return x.(proto.Message), nil
- }
- }
- // configure sends a configuration change through consensus and
- // then waits for it to be applied to the server. It will block
- // until the change is performed or there is an error.
- func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
- cc.ID = n.reqIDGen.Next()
- ctx, cancel := context.WithCancel(ctx)
- ch := n.wait.register(cc.ID, nil, cancel)
- if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil {
- n.wait.cancel(cc.ID)
- return err
- }
- select {
- case x := <-ch:
- if err, ok := x.(error); ok {
- return err
- }
- if x != nil {
- log.G(ctx).Panic("raft: configuration change error, return type should always be error")
- }
- return nil
- case <-ctx.Done():
- n.wait.cancel(cc.ID)
- return ctx.Err()
- }
- }
- func (n *Node) processCommitted(ctx context.Context, entry raftpb.Entry) error {
- // Process a normal entry
- if entry.Type == raftpb.EntryNormal && entry.Data != nil {
- if err := n.processEntry(ctx, entry); err != nil {
- return err
- }
- }
- // Process a configuration change (add/remove node)
- if entry.Type == raftpb.EntryConfChange {
- n.processConfChange(ctx, entry)
- }
- n.appliedIndex = entry.Index
- return nil
- }
- func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
- r := &api.InternalRaftRequest{}
- err := proto.Unmarshal(entry.Data, r)
- if err != nil {
- return err
- }
- if !n.wait.trigger(r.ID, r) {
- // There was no wait on this ID, meaning we don't have a
- // transaction in progress that would be committed to the
- // memory store by the "trigger" call. Either a different node
- // wrote this to raft, or we wrote it before losing the leader
- // position and cancelling the transaction. Create a new
- // transaction to commit the data.
- // It should not be possible for processInternalRaftRequest
- // to be running in this situation, but out of caution we
- // cancel any current invocations to avoid a deadlock.
- n.wait.cancelAll()
- err := n.memoryStore.ApplyStoreActions(r.Action)
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to apply actions from raft")
- }
- }
- return nil
- }
- func (n *Node) processConfChange(ctx context.Context, entry raftpb.Entry) {
- var (
- err error
- cc raftpb.ConfChange
- )
- if err := proto.Unmarshal(entry.Data, &cc); err != nil {
- n.wait.trigger(cc.ID, err)
- }
- if err := n.cluster.ValidateConfigurationChange(cc); err != nil {
- n.wait.trigger(cc.ID, err)
- }
- switch cc.Type {
- case raftpb.ConfChangeAddNode:
- err = n.applyAddNode(cc)
- case raftpb.ConfChangeUpdateNode:
- err = n.applyUpdateNode(ctx, cc)
- case raftpb.ConfChangeRemoveNode:
- err = n.applyRemoveNode(ctx, cc)
- }
- if err != nil {
- n.wait.trigger(cc.ID, err)
- }
- n.confState = *n.raftNode.ApplyConfChange(cc)
- n.wait.trigger(cc.ID, nil)
- }
- // applyAddNode is called when we receive a ConfChange
- // from a member in the raft cluster, this adds a new
- // node to the existing raft cluster
- func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
- member := &api.RaftMember{}
- err := proto.Unmarshal(cc.Context, member)
- if err != nil {
- return err
- }
- // ID must be non zero
- if member.RaftID == 0 {
- return nil
- }
- if err = n.registerNode(member); err != nil {
- return err
- }
- return nil
- }
- // applyUpdateNode is called when we receive a ConfChange from a member in the
- // raft cluster which update the address of an existing node.
- func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error {
- newMember := &api.RaftMember{}
- err := proto.Unmarshal(cc.Context, newMember)
- if err != nil {
- return err
- }
- if newMember.RaftID == n.Config.ID {
- return nil
- }
- if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
- return err
- }
- return n.cluster.UpdateMember(newMember.RaftID, newMember)
- }
- // applyRemoveNode is called when we receive a ConfChange
- // from a member in the raft cluster, this removes a node
- // from the existing raft cluster
- func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err error) {
- // If the node from where the remove is issued is
- // a follower and the leader steps down, Campaign
- // to be the leader.
- if cc.NodeID == n.leader() && !n.isLeader() {
- if err = n.raftNode.Campaign(ctx); err != nil {
- return err
- }
- }
- if cc.NodeID == n.Config.ID {
- // wait for the commit ack to be sent before closing connection
- n.asyncTasks.Wait()
- n.NodeRemoved()
- } else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
- return err
- }
- return n.cluster.RemoveMember(cc.NodeID)
- }
- // SubscribeLeadership returns channel to which events about leadership change
- // will be sent in form of raft.LeadershipState. Also cancel func is returned -
- // it should be called when listener is no longer interested in events.
- func (n *Node) SubscribeLeadership() (q chan events.Event, cancel func()) {
- return n.leadershipBroadcast.Watch()
- }
- // createConfigChangeEnts creates a series of Raft entries (i.e.
- // EntryConfChange) to remove the set of given IDs from the cluster. The ID
- // `self` is _not_ removed, even if present in the set.
- // If `self` is not inside the given ids, it creates a Raft entry to add a
- // default member with the given `self`.
- func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
- var ents []raftpb.Entry
- next := index + 1
- found := false
- for _, id := range ids {
- if id == self {
- found = true
- continue
- }
- cc := &raftpb.ConfChange{
- Type: raftpb.ConfChangeRemoveNode,
- NodeID: id,
- }
- data, err := cc.Marshal()
- if err != nil {
- log.L.WithError(err).Panic("marshal configuration change should never fail")
- }
- e := raftpb.Entry{
- Type: raftpb.EntryConfChange,
- Data: data,
- Term: term,
- Index: next,
- }
- ents = append(ents, e)
- next++
- }
- if !found {
- node := &api.RaftMember{RaftID: self}
- meta, err := node.Marshal()
- if err != nil {
- log.L.WithError(err).Panic("marshal member should never fail")
- }
- cc := &raftpb.ConfChange{
- Type: raftpb.ConfChangeAddNode,
- NodeID: self,
- Context: meta,
- }
- data, err := cc.Marshal()
- if err != nil {
- log.L.WithError(err).Panic("marshal configuration change should never fail")
- }
- e := raftpb.Entry{
- Type: raftpb.EntryConfChange,
- Data: data,
- Term: term,
- Index: next,
- }
- ents = append(ents, e)
- }
- return ents
- }
- // getIDs returns an ordered set of IDs included in the given snapshot and
- // the entries. The given snapshot/entries can contain two kinds of
- // ID-related entry:
- // - ConfChangeAddNode, in which case the contained ID will be added into the set.
- // - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
- func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
- ids := make(map[uint64]struct{})
- if snap != nil {
- for _, id := range snap.Metadata.ConfState.Nodes {
- ids[id] = struct{}{}
- }
- }
- for _, e := range ents {
- if e.Type != raftpb.EntryConfChange {
- continue
- }
- if snap != nil && e.Index < snap.Metadata.Index {
- continue
- }
- var cc raftpb.ConfChange
- if err := cc.Unmarshal(e.Data); err != nil {
- log.L.WithError(err).Panic("unmarshal configuration change should never fail")
- }
- switch cc.Type {
- case raftpb.ConfChangeAddNode:
- ids[cc.NodeID] = struct{}{}
- case raftpb.ConfChangeRemoveNode:
- delete(ids, cc.NodeID)
- case raftpb.ConfChangeUpdateNode:
- // do nothing
- default:
- log.L.Panic("ConfChange Type should be either ConfChangeAddNode, or ConfChangeRemoveNode, or ConfChangeUpdateNode!")
- }
- }
- var sids []uint64
- for id := range ids {
- sids = append(sids, id)
- }
- return sids
- }
- func (n *Node) reqTimeout() time.Duration {
- return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
- }
|