raft.go 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974
  1. package raft
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/coreos/etcd/pkg/idutil"
  12. "github.com/coreos/etcd/raft"
  13. "github.com/coreos/etcd/raft/raftpb"
  14. "github.com/docker/docker/pkg/signal"
  15. "github.com/docker/go-events"
  16. "github.com/docker/swarmkit/api"
  17. "github.com/docker/swarmkit/ca"
  18. "github.com/docker/swarmkit/log"
  19. "github.com/docker/swarmkit/manager/raftselector"
  20. "github.com/docker/swarmkit/manager/state"
  21. "github.com/docker/swarmkit/manager/state/raft/membership"
  22. "github.com/docker/swarmkit/manager/state/raft/storage"
  23. "github.com/docker/swarmkit/manager/state/raft/transport"
  24. "github.com/docker/swarmkit/manager/state/store"
  25. "github.com/docker/swarmkit/watch"
  26. "github.com/gogo/protobuf/proto"
  27. "github.com/pivotal-golang/clock"
  28. "github.com/pkg/errors"
  29. "golang.org/x/net/context"
  30. "golang.org/x/time/rate"
  31. "google.golang.org/grpc"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/peer"
  35. )
  36. var (
  37. // ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
  38. ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
  39. // ErrConfChangeRefused is returned when there is an issue with the configuration change
  40. ErrConfChangeRefused = errors.New("raft: propose configuration change refused")
  41. // ErrApplyNotSpecified is returned during the creation of a raft node when no apply method was provided
  42. ErrApplyNotSpecified = errors.New("raft: apply method was not specified")
  43. // ErrSetHardState is returned when the node fails to set the hard state
  44. ErrSetHardState = errors.New("raft: failed to set the hard state for log append entry")
  45. // ErrStopped is returned when an operation was submitted but the node was stopped in the meantime
  46. ErrStopped = errors.New("raft: failed to process the request: node is stopped")
  47. // ErrLostLeadership is returned when an operation was submitted but the node lost leader status before it became committed
  48. ErrLostLeadership = errors.New("raft: failed to process the request: node lost leader status")
  49. // ErrRequestTooLarge is returned when a raft internal message is too large to be sent
  50. ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
  51. // ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
  52. ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
  53. // ErrNoClusterLeader is thrown when the cluster has no elected leader
  54. ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
  55. // ErrMemberUnknown is sent in response to a message from an
  56. // unrecognized peer.
  57. ErrMemberUnknown = errors.New("raft: member unknown")
  58. // work around lint
  59. lostQuorumMessage = "The swarm does not have a leader. It's possible that too few managers are online. Make sure more than half of the managers are online."
  60. errLostQuorum = errors.New(lostQuorumMessage)
  61. )
  62. // LeadershipState indicates whether the node is a leader or follower.
  63. type LeadershipState int
  64. const (
  65. // IsLeader indicates that the node is a raft leader.
  66. IsLeader LeadershipState = iota
  67. // IsFollower indicates that the node is a raft follower.
  68. IsFollower
  69. // lostQuorumTimeout is the number of ticks that can elapse with no
  70. // leader before LeaderConn starts returning an error right away.
  71. lostQuorumTimeout = 10
  72. )
  73. // EncryptionKeys are the current and, if necessary, pending DEKs with which to
  74. // encrypt raft data
  75. type EncryptionKeys struct {
  76. CurrentDEK []byte
  77. PendingDEK []byte
  78. }
  79. // EncryptionKeyRotator is an interface to find out if any keys need rotating.
  80. type EncryptionKeyRotator interface {
  81. GetKeys() EncryptionKeys
  82. UpdateKeys(EncryptionKeys) error
  83. NeedsRotation() bool
  84. RotationNotify() chan struct{}
  85. }
  86. // Node represents the Raft Node useful
  87. // configuration.
  88. type Node struct {
  89. raftNode raft.Node
  90. cluster *membership.Cluster
  91. transport *transport.Transport
  92. raftStore *raft.MemoryStorage
  93. memoryStore *store.MemoryStore
  94. Config *raft.Config
  95. opts NodeOptions
  96. reqIDGen *idutil.Generator
  97. wait *wait
  98. campaignWhenAble bool
  99. signalledLeadership uint32
  100. isMember uint32
  101. bootstrapMembers []*api.RaftMember
  102. // waitProp waits for all the proposals to be terminated before
  103. // shutting down the node.
  104. waitProp sync.WaitGroup
  105. confState raftpb.ConfState
  106. appliedIndex uint64
  107. snapshotMeta raftpb.SnapshotMetadata
  108. writtenWALIndex uint64
  109. ticker clock.Ticker
  110. doneCh chan struct{}
  111. // RemovedFromRaft notifies about node deletion from raft cluster
  112. RemovedFromRaft chan struct{}
  113. cancelFunc func()
  114. // removeRaftCh notifies about node deletion from raft cluster
  115. removeRaftCh chan struct{}
  116. removeRaftOnce sync.Once
  117. leadershipBroadcast *watch.Queue
  118. // used to coordinate shutdown
  119. // Lock should be used only in stop(), all other functions should use RLock.
  120. stopMu sync.RWMutex
  121. // used for membership management checks
  122. membershipLock sync.Mutex
  123. // synchronizes access to n.opts.Addr, and makes sure the address is not
  124. // updated concurrently with JoinAndStart.
  125. addrLock sync.Mutex
  126. snapshotInProgress chan raftpb.SnapshotMetadata
  127. asyncTasks sync.WaitGroup
  128. // stopped chan is used for notifying grpc handlers that raft node going
  129. // to stop.
  130. stopped chan struct{}
  131. raftLogger *storage.EncryptedRaftLogger
  132. keyRotator EncryptionKeyRotator
  133. rotationQueued bool
  134. clearData bool
  135. waitForAppliedIndex uint64
  136. ticksWithNoLeader uint32
  137. }
  138. // NodeOptions provides node-level options.
  139. type NodeOptions struct {
  140. // ID is the node's ID, from its certificate's CN field.
  141. ID string
  142. // Addr is the address of this node's listener
  143. Addr string
  144. // ForceNewCluster defines if we have to force a new cluster
  145. // because we are recovering from a backup data directory.
  146. ForceNewCluster bool
  147. // JoinAddr is the cluster to join. May be an empty string to create
  148. // a standalone cluster.
  149. JoinAddr string
  150. // Config is the raft config.
  151. Config *raft.Config
  152. // StateDir is the directory to store durable state.
  153. StateDir string
  154. // TickInterval interval is the time interval between raft ticks.
  155. TickInterval time.Duration
  156. // ClockSource is a Clock interface to use as a time base.
  157. // Leave this nil except for tests that are designed not to run in real
  158. // time.
  159. ClockSource clock.Clock
  160. // SendTimeout is the timeout on the sending messages to other raft
  161. // nodes. Leave this as 0 to get the default value.
  162. SendTimeout time.Duration
  163. TLSCredentials credentials.TransportCredentials
  164. KeyRotator EncryptionKeyRotator
  165. // DisableStackDump prevents Run from dumping goroutine stacks when the
  166. // store becomes stuck.
  167. DisableStackDump bool
  168. }
  169. func init() {
  170. rand.Seed(time.Now().UnixNano())
  171. }
  172. // NewNode generates a new Raft node
  173. func NewNode(opts NodeOptions) *Node {
  174. cfg := opts.Config
  175. if cfg == nil {
  176. cfg = DefaultNodeConfig()
  177. }
  178. if opts.TickInterval == 0 {
  179. opts.TickInterval = time.Second
  180. }
  181. if opts.SendTimeout == 0 {
  182. opts.SendTimeout = 2 * time.Second
  183. }
  184. raftStore := raft.NewMemoryStorage()
  185. n := &Node{
  186. cluster: membership.NewCluster(),
  187. raftStore: raftStore,
  188. opts: opts,
  189. Config: &raft.Config{
  190. ElectionTick: cfg.ElectionTick,
  191. HeartbeatTick: cfg.HeartbeatTick,
  192. Storage: raftStore,
  193. MaxSizePerMsg: cfg.MaxSizePerMsg,
  194. MaxInflightMsgs: cfg.MaxInflightMsgs,
  195. Logger: cfg.Logger,
  196. CheckQuorum: cfg.CheckQuorum,
  197. },
  198. doneCh: make(chan struct{}),
  199. RemovedFromRaft: make(chan struct{}),
  200. stopped: make(chan struct{}),
  201. leadershipBroadcast: watch.NewQueue(),
  202. keyRotator: opts.KeyRotator,
  203. }
  204. n.memoryStore = store.NewMemoryStore(n)
  205. if opts.ClockSource == nil {
  206. n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
  207. } else {
  208. n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
  209. }
  210. n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
  211. n.wait = newWait()
  212. n.cancelFunc = func(n *Node) func() {
  213. var cancelOnce sync.Once
  214. return func() {
  215. cancelOnce.Do(func() {
  216. close(n.stopped)
  217. })
  218. }
  219. }(n)
  220. return n
  221. }
  222. // IsIDRemoved reports if member with id was removed from cluster.
  223. // Part of transport.Raft interface.
  224. func (n *Node) IsIDRemoved(id uint64) bool {
  225. return n.cluster.IsIDRemoved(id)
  226. }
  227. // NodeRemoved signals that node was removed from cluster and should stop.
  228. // Part of transport.Raft interface.
  229. func (n *Node) NodeRemoved() {
  230. n.removeRaftOnce.Do(func() {
  231. atomic.StoreUint32(&n.isMember, 0)
  232. close(n.RemovedFromRaft)
  233. })
  234. }
  235. // ReportSnapshot reports snapshot status to underlying raft node.
  236. // Part of transport.Raft interface.
  237. func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
  238. n.raftNode.ReportSnapshot(id, status)
  239. }
  240. // ReportUnreachable reports to underlying raft node that member with id is
  241. // unreachable.
  242. // Part of transport.Raft interface.
  243. func (n *Node) ReportUnreachable(id uint64) {
  244. n.raftNode.ReportUnreachable(id)
  245. }
  246. // SetAddr provides the raft node's address. This can be used in cases where
  247. // opts.Addr was not provided to NewNode, for example when a port was not bound
  248. // until after the raft node was created.
  249. func (n *Node) SetAddr(ctx context.Context, addr string) error {
  250. n.addrLock.Lock()
  251. defer n.addrLock.Unlock()
  252. n.opts.Addr = addr
  253. if !n.IsMember() {
  254. return nil
  255. }
  256. newRaftMember := &api.RaftMember{
  257. RaftID: n.Config.ID,
  258. NodeID: n.opts.ID,
  259. Addr: addr,
  260. }
  261. if err := n.cluster.UpdateMember(n.Config.ID, newRaftMember); err != nil {
  262. return err
  263. }
  264. // If the raft node is running, submit a configuration change
  265. // with the new address.
  266. // TODO(aaronl): Currently, this node must be the leader to
  267. // submit this configuration change. This works for the initial
  268. // use cases (single-node cluster late binding ports, or calling
  269. // SetAddr before joining a cluster). In the future, we may want
  270. // to support having a follower proactively change its remote
  271. // address.
  272. leadershipCh, cancelWatch := n.SubscribeLeadership()
  273. defer cancelWatch()
  274. ctx, cancelCtx := n.WithContext(ctx)
  275. defer cancelCtx()
  276. isLeader := atomic.LoadUint32(&n.signalledLeadership) == 1
  277. for !isLeader {
  278. select {
  279. case leadershipChange := <-leadershipCh:
  280. if leadershipChange == IsLeader {
  281. isLeader = true
  282. }
  283. case <-ctx.Done():
  284. return ctx.Err()
  285. }
  286. }
  287. return n.updateNodeBlocking(ctx, n.Config.ID, addr)
  288. }
  289. // WithContext returns context which is cancelled when parent context cancelled
  290. // or node is stopped.
  291. func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
  292. ctx, cancel := context.WithCancel(ctx)
  293. go func() {
  294. select {
  295. case <-ctx.Done():
  296. case <-n.stopped:
  297. cancel()
  298. }
  299. }()
  300. return ctx, cancel
  301. }
  302. func (n *Node) initTransport() {
  303. transportConfig := &transport.Config{
  304. HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
  305. SendTimeout: n.opts.SendTimeout,
  306. Credentials: n.opts.TLSCredentials,
  307. Raft: n,
  308. }
  309. n.transport = transport.New(transportConfig)
  310. }
  311. // JoinAndStart joins and starts the raft server
  312. func (n *Node) JoinAndStart(ctx context.Context) (err error) {
  313. ctx, cancel := n.WithContext(ctx)
  314. defer func() {
  315. cancel()
  316. if err != nil {
  317. n.stopMu.Lock()
  318. // to shutdown transport
  319. close(n.stopped)
  320. n.stopMu.Unlock()
  321. n.done()
  322. } else {
  323. atomic.StoreUint32(&n.isMember, 1)
  324. }
  325. }()
  326. loadAndStartErr := n.loadAndStart(ctx, n.opts.ForceNewCluster)
  327. if loadAndStartErr != nil && loadAndStartErr != storage.ErrNoWAL {
  328. return loadAndStartErr
  329. }
  330. snapshot, err := n.raftStore.Snapshot()
  331. // Snapshot never returns an error
  332. if err != nil {
  333. panic("could not get snapshot of raft store")
  334. }
  335. n.confState = snapshot.Metadata.ConfState
  336. n.appliedIndex = snapshot.Metadata.Index
  337. n.snapshotMeta = snapshot.Metadata
  338. n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
  339. n.addrLock.Lock()
  340. defer n.addrLock.Unlock()
  341. // override the module field entirely, since etcd/raft is not exactly a submodule
  342. n.Config.Logger = log.G(ctx).WithField("module", "raft")
  343. // restore from snapshot
  344. if loadAndStartErr == nil {
  345. if n.opts.JoinAddr != "" {
  346. log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
  347. }
  348. n.campaignWhenAble = true
  349. n.initTransport()
  350. n.raftNode = raft.RestartNode(n.Config)
  351. return nil
  352. }
  353. // first member of cluster
  354. if n.opts.JoinAddr == "" {
  355. // First member in the cluster, self-assign ID
  356. n.Config.ID = uint64(rand.Int63()) + 1
  357. peer, err := n.newRaftLogs(n.opts.ID)
  358. if err != nil {
  359. return err
  360. }
  361. n.campaignWhenAble = true
  362. n.initTransport()
  363. n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
  364. return nil
  365. }
  366. // join to existing cluster
  367. if n.opts.Addr == "" {
  368. return errors.New("attempted to join raft cluster without knowing own address")
  369. }
  370. conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
  371. if err != nil {
  372. return err
  373. }
  374. defer conn.Close()
  375. client := api.NewRaftMembershipClient(conn)
  376. joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
  377. defer joinCancel()
  378. resp, err := client.Join(joinCtx, &api.JoinRequest{
  379. Addr: n.opts.Addr,
  380. })
  381. if err != nil {
  382. return err
  383. }
  384. n.Config.ID = resp.RaftID
  385. if _, err := n.newRaftLogs(n.opts.ID); err != nil {
  386. return err
  387. }
  388. n.bootstrapMembers = resp.Members
  389. n.initTransport()
  390. n.raftNode = raft.StartNode(n.Config, nil)
  391. return nil
  392. }
  393. // DefaultNodeConfig returns the default config for a
  394. // raft node that can be modified and customized
  395. func DefaultNodeConfig() *raft.Config {
  396. return &raft.Config{
  397. HeartbeatTick: 1,
  398. ElectionTick: 3,
  399. MaxSizePerMsg: math.MaxUint16,
  400. MaxInflightMsgs: 256,
  401. Logger: log.L,
  402. CheckQuorum: true,
  403. }
  404. }
  405. // DefaultRaftConfig returns a default api.RaftConfig.
  406. func DefaultRaftConfig() api.RaftConfig {
  407. return api.RaftConfig{
  408. KeepOldSnapshots: 0,
  409. SnapshotInterval: 10000,
  410. LogEntriesForSlowFollowers: 500,
  411. ElectionTick: 3,
  412. HeartbeatTick: 1,
  413. }
  414. }
  415. // MemoryStore returns the memory store that is kept in sync with the raft log.
  416. func (n *Node) MemoryStore() *store.MemoryStore {
  417. return n.memoryStore
  418. }
  419. func (n *Node) done() {
  420. n.cluster.Clear()
  421. n.ticker.Stop()
  422. n.leadershipBroadcast.Close()
  423. n.cluster.PeersBroadcast.Close()
  424. n.memoryStore.Close()
  425. if n.transport != nil {
  426. n.transport.Stop()
  427. }
  428. close(n.doneCh)
  429. }
  430. // ClearData tells the raft node to delete its WALs, snapshots, and keys on
  431. // shutdown.
  432. func (n *Node) ClearData() {
  433. n.clearData = true
  434. }
  435. // Run is the main loop for a Raft node, it goes along the state machine,
  436. // acting on the messages received from other Raft nodes in the cluster.
  437. //
  438. // Before running the main loop, it first starts the raft node based on saved
  439. // cluster state. If no saved state exists, it starts a single-node cluster.
  440. func (n *Node) Run(ctx context.Context) error {
  441. ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
  442. ctx, cancel := context.WithCancel(ctx)
  443. for _, node := range n.bootstrapMembers {
  444. if err := n.registerNode(node); err != nil {
  445. log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
  446. }
  447. }
  448. defer func() {
  449. cancel()
  450. n.stop(ctx)
  451. if n.clearData {
  452. // Delete WAL and snapshots, since they are no longer
  453. // usable.
  454. if err := n.raftLogger.Clear(ctx); err != nil {
  455. log.G(ctx).WithError(err).Error("failed to move wal after node removal")
  456. }
  457. // clear out the DEKs
  458. if err := n.keyRotator.UpdateKeys(EncryptionKeys{}); err != nil {
  459. log.G(ctx).WithError(err).Error("could not remove DEKs")
  460. }
  461. }
  462. n.done()
  463. }()
  464. wasLeader := false
  465. transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)
  466. for {
  467. select {
  468. case <-n.ticker.C():
  469. n.raftNode.Tick()
  470. if n.leader() == raft.None {
  471. atomic.AddUint32(&n.ticksWithNoLeader, 1)
  472. } else {
  473. atomic.StoreUint32(&n.ticksWithNoLeader, 0)
  474. }
  475. case rd := <-n.raftNode.Ready():
  476. raftConfig := n.getCurrentRaftConfig()
  477. // Save entries to storage
  478. if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
  479. return errors.Wrap(err, "failed to save entries to storage")
  480. }
  481. if wasLeader &&
  482. (rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
  483. n.memoryStore.Wedged() &&
  484. transferLeadershipLimit.Allow() {
  485. if !n.opts.DisableStackDump {
  486. signal.DumpStacks("")
  487. }
  488. transferee, err := n.transport.LongestActive()
  489. if err != nil {
  490. log.G(ctx).WithError(err).Error("failed to get longest-active member")
  491. } else {
  492. log.G(ctx).Error("data store lock held too long - transferring leadership")
  493. n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
  494. }
  495. }
  496. for _, msg := range rd.Messages {
  497. // Send raft messages to peers
  498. if err := n.transport.Send(msg); err != nil {
  499. log.G(ctx).WithError(err).Error("failed to send message to member")
  500. }
  501. }
  502. // Apply snapshot to memory store. The snapshot
  503. // was applied to the raft store in
  504. // saveToStorage.
  505. if !raft.IsEmptySnap(rd.Snapshot) {
  506. // Load the snapshot data into the store
  507. if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
  508. log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
  509. }
  510. n.appliedIndex = rd.Snapshot.Metadata.Index
  511. n.snapshotMeta = rd.Snapshot.Metadata
  512. n.confState = rd.Snapshot.Metadata.ConfState
  513. }
  514. // If we cease to be the leader, we must cancel any
  515. // proposals that are currently waiting for a quorum to
  516. // acknowledge them. It is still possible for these to
  517. // become committed, but if that happens we will apply
  518. // them as any follower would.
  519. // It is important that we cancel these proposals before
  520. // calling processCommitted, so processCommitted does
  521. // not deadlock.
  522. if rd.SoftState != nil {
  523. if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
  524. wasLeader = false
  525. if atomic.LoadUint32(&n.signalledLeadership) == 1 {
  526. atomic.StoreUint32(&n.signalledLeadership, 0)
  527. n.leadershipBroadcast.Publish(IsFollower)
  528. }
  529. // It is important that we set n.signalledLeadership to 0
  530. // before calling n.wait.cancelAll. When a new raft
  531. // request is registered, it checks n.signalledLeadership
  532. // afterwards, and cancels the registration if it is 0.
  533. // If cancelAll was called first, this call might run
  534. // before the new request registers, but
  535. // signalledLeadership would be set after the check.
  536. // Setting signalledLeadership before calling cancelAll
  537. // ensures that if a new request is registered during
  538. // this transition, it will either be cancelled by
  539. // cancelAll, or by its own check of signalledLeadership.
  540. n.wait.cancelAll()
  541. } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
  542. wasLeader = true
  543. }
  544. }
  545. // Process committed entries
  546. for _, entry := range rd.CommittedEntries {
  547. if err := n.processCommitted(ctx, entry); err != nil {
  548. log.G(ctx).WithError(err).Error("failed to process committed entries")
  549. }
  550. }
  551. // in case the previous attempt to update the key failed
  552. n.maybeMarkRotationFinished(ctx)
  553. // Trigger a snapshot every once in awhile
  554. if n.snapshotInProgress == nil &&
  555. (n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
  556. n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
  557. n.doSnapshot(ctx, raftConfig)
  558. }
  559. if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
  560. // If all the entries in the log have become
  561. // committed, broadcast our leadership status.
  562. if n.caughtUp() {
  563. atomic.StoreUint32(&n.signalledLeadership, 1)
  564. n.leadershipBroadcast.Publish(IsLeader)
  565. }
  566. }
  567. // Advance the state machine
  568. n.raftNode.Advance()
  569. // On the first startup, or if we are the only
  570. // registered member after restoring from the state,
  571. // campaign to be the leader.
  572. if n.campaignWhenAble {
  573. members := n.cluster.Members()
  574. if len(members) >= 1 {
  575. n.campaignWhenAble = false
  576. }
  577. if len(members) == 1 && members[n.Config.ID] != nil {
  578. n.raftNode.Campaign(ctx)
  579. }
  580. }
  581. case snapshotMeta := <-n.snapshotInProgress:
  582. raftConfig := n.getCurrentRaftConfig()
  583. if snapshotMeta.Index > n.snapshotMeta.Index {
  584. n.snapshotMeta = snapshotMeta
  585. if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
  586. log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
  587. }
  588. }
  589. n.snapshotInProgress = nil
  590. n.maybeMarkRotationFinished(ctx)
  591. if n.rotationQueued && n.needsSnapshot(ctx) {
  592. // there was a key rotation that took place before while the snapshot
  593. // was in progress - we have to take another snapshot and encrypt with the new key
  594. n.rotationQueued = false
  595. n.doSnapshot(ctx, raftConfig)
  596. }
  597. case <-n.keyRotator.RotationNotify():
  598. // There are 2 separate checks: rotationQueued, and n.needsSnapshot().
  599. // We set rotationQueued so that when we are notified of a rotation, we try to
  600. // do a snapshot as soon as possible. However, if there is an error while doing
  601. // the snapshot, we don't want to hammer the node attempting to do snapshots over
  602. // and over. So if doing a snapshot fails, wait until the next entry comes in to
  603. // try again.
  604. switch {
  605. case n.snapshotInProgress != nil:
  606. n.rotationQueued = true
  607. case n.needsSnapshot(ctx):
  608. n.doSnapshot(ctx, n.getCurrentRaftConfig())
  609. }
  610. case <-ctx.Done():
  611. return nil
  612. }
  613. }
  614. }
  615. func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
  616. snapCluster, err := n.clusterSnapshot(data)
  617. if err != nil {
  618. return err
  619. }
  620. oldMembers := n.cluster.Members()
  621. for _, member := range snapCluster.Members {
  622. delete(oldMembers, member.RaftID)
  623. }
  624. for _, removedMember := range snapCluster.Removed {
  625. n.cluster.RemoveMember(removedMember)
  626. n.transport.RemovePeer(removedMember)
  627. delete(oldMembers, removedMember)
  628. }
  629. for id, member := range oldMembers {
  630. n.cluster.ClearMember(id)
  631. if err := n.transport.RemovePeer(member.RaftID); err != nil {
  632. log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
  633. }
  634. }
  635. for _, node := range snapCluster.Members {
  636. if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
  637. log.G(ctx).WithError(err).Error("failed to register node from snapshot")
  638. }
  639. }
  640. return nil
  641. }
  642. func (n *Node) needsSnapshot(ctx context.Context) bool {
  643. if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
  644. keys := n.keyRotator.GetKeys()
  645. if keys.PendingDEK != nil {
  646. n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
  647. // we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
  648. // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
  649. // written with the new key to supercede any WAL written with an old DEK.
  650. n.waitForAppliedIndex = n.writtenWALIndex
  651. // if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
  652. // snapshot index, because the rotation cannot be completed until the next snapshot
  653. if n.waitForAppliedIndex <= n.snapshotMeta.Index {
  654. n.waitForAppliedIndex = n.snapshotMeta.Index + 1
  655. }
  656. log.G(ctx).Debugf(
  657. "beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
  658. }
  659. }
  660. result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
  661. if result {
  662. log.G(ctx).Debugf(
  663. "a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
  664. n.waitForAppliedIndex, n.appliedIndex)
  665. }
  666. return result
  667. }
  668. func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
  669. if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
  670. // this means we tried to rotate - so finish the rotation
  671. if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
  672. log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
  673. } else {
  674. log.G(ctx).Debugf(
  675. "a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
  676. n.snapshotMeta.Index, n.waitForAppliedIndex)
  677. n.waitForAppliedIndex = 0
  678. if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
  679. log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
  680. }
  681. }
  682. }
  683. }
  684. func (n *Node) getCurrentRaftConfig() api.RaftConfig {
  685. raftConfig := DefaultRaftConfig()
  686. n.memoryStore.View(func(readTx store.ReadTx) {
  687. clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
  688. if err == nil && len(clusters) == 1 {
  689. raftConfig = clusters[0].Spec.Raft
  690. }
  691. })
  692. return raftConfig
  693. }
  694. // Cancel interrupts all ongoing proposals, and prevents new ones from
  695. // starting. This is useful for the shutdown sequence because it allows
  696. // the manager to shut down raft-dependent services that might otherwise
  697. // block on shutdown if quorum isn't met. Then the raft node can be completely
  698. // shut down once no more code is using it.
  699. func (n *Node) Cancel() {
  700. n.cancelFunc()
  701. }
  702. // Done returns channel which is closed when raft node is fully stopped.
  703. func (n *Node) Done() <-chan struct{} {
  704. return n.doneCh
  705. }
  706. func (n *Node) stop(ctx context.Context) {
  707. n.stopMu.Lock()
  708. defer n.stopMu.Unlock()
  709. n.Cancel()
  710. n.waitProp.Wait()
  711. n.asyncTasks.Wait()
  712. n.raftNode.Stop()
  713. n.ticker.Stop()
  714. n.raftLogger.Close(ctx)
  715. atomic.StoreUint32(&n.isMember, 0)
  716. // TODO(stevvooe): Handle ctx.Done()
  717. }
  718. // isLeader checks if we are the leader or not, without the protection of lock
  719. func (n *Node) isLeader() bool {
  720. if !n.IsMember() {
  721. return false
  722. }
  723. if n.Status().Lead == n.Config.ID {
  724. return true
  725. }
  726. return false
  727. }
  728. // IsLeader checks if we are the leader or not, with the protection of lock
  729. func (n *Node) IsLeader() bool {
  730. n.stopMu.RLock()
  731. defer n.stopMu.RUnlock()
  732. return n.isLeader()
  733. }
  734. // leader returns the id of the leader, without the protection of lock and
  735. // membership check, so it's caller task.
  736. func (n *Node) leader() uint64 {
  737. return n.Status().Lead
  738. }
  739. // Leader returns the id of the leader, with the protection of lock
  740. func (n *Node) Leader() (uint64, error) {
  741. n.stopMu.RLock()
  742. defer n.stopMu.RUnlock()
  743. if !n.IsMember() {
  744. return raft.None, ErrNoRaftMember
  745. }
  746. leader := n.leader()
  747. if leader == raft.None {
  748. return raft.None, ErrNoClusterLeader
  749. }
  750. return leader, nil
  751. }
  752. // ReadyForProposals returns true if the node has broadcasted a message
  753. // saying that it has become the leader. This means it is ready to accept
  754. // proposals.
  755. func (n *Node) ReadyForProposals() bool {
  756. return atomic.LoadUint32(&n.signalledLeadership) == 1
  757. }
  758. func (n *Node) caughtUp() bool {
  759. // obnoxious function that always returns a nil error
  760. lastIndex, _ := n.raftStore.LastIndex()
  761. return n.appliedIndex >= lastIndex
  762. }
  763. // Join asks to a member of the raft to propose
  764. // a configuration change and add us as a member thus
  765. // beginning the log replication process. This method
  766. // is called from an aspiring member to an existing member
  767. func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
  768. nodeInfo, err := ca.RemoteNode(ctx)
  769. if err != nil {
  770. return nil, err
  771. }
  772. fields := logrus.Fields{
  773. "node.id": nodeInfo.NodeID,
  774. "method": "(*Node).Join",
  775. "raft_id": fmt.Sprintf("%x", n.Config.ID),
  776. }
  777. if nodeInfo.ForwardedBy != nil {
  778. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  779. }
  780. log := log.G(ctx).WithFields(fields)
  781. log.Debug("")
  782. // can't stop the raft node while an async RPC is in progress
  783. n.stopMu.RLock()
  784. defer n.stopMu.RUnlock()
  785. n.membershipLock.Lock()
  786. defer n.membershipLock.Unlock()
  787. if !n.IsMember() {
  788. return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
  789. }
  790. if !n.isLeader() {
  791. return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
  792. }
  793. // A single manager must not be able to join the raft cluster twice. If
  794. // it did, that would cause the quorum to be computed incorrectly. This
  795. // could happen if the WAL was deleted from an active manager.
  796. for _, m := range n.cluster.Members() {
  797. if m.NodeID == nodeInfo.NodeID {
  798. return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
  799. }
  800. }
  801. // Find a unique ID for the joining member.
  802. var raftID uint64
  803. for {
  804. raftID = uint64(rand.Int63()) + 1
  805. if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
  806. break
  807. }
  808. }
  809. remoteAddr := req.Addr
  810. // If the joining node sent an address like 0.0.0.0:4242, automatically
  811. // determine its actual address based on the GRPC connection. This
  812. // avoids the need for a prospective member to know its own address.
  813. requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
  814. if err != nil {
  815. return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
  816. }
  817. requestIP := net.ParseIP(requestHost)
  818. if requestIP != nil && requestIP.IsUnspecified() {
  819. remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
  820. if err != nil {
  821. return nil, err
  822. }
  823. remoteAddr = net.JoinHostPort(remoteHost, requestPort)
  824. }
  825. // We do not bother submitting a configuration change for the
  826. // new member if we can't contact it back using its address
  827. if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
  828. return nil, err
  829. }
  830. err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
  831. if err != nil {
  832. log.WithError(err).Errorf("failed to add member %x", raftID)
  833. return nil, err
  834. }
  835. var nodes []*api.RaftMember
  836. for _, node := range n.cluster.Members() {
  837. nodes = append(nodes, &api.RaftMember{
  838. RaftID: node.RaftID,
  839. NodeID: node.NodeID,
  840. Addr: node.Addr,
  841. })
  842. }
  843. log.Debugf("node joined")
  844. return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
  845. }
  846. // checkHealth tries to contact an aspiring member through its advertised address
  847. // and checks if its raft server is running.
  848. func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
  849. conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
  850. if err != nil {
  851. return err
  852. }
  853. defer conn.Close()
  854. if timeout != 0 {
  855. tctx, cancel := context.WithTimeout(ctx, timeout)
  856. defer cancel()
  857. ctx = tctx
  858. }
  859. healthClient := api.NewHealthClient(conn)
  860. resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
  861. if err != nil {
  862. return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
  863. }
  864. if resp.Status != api.HealthCheckResponse_SERVING {
  865. return fmt.Errorf("health check returned status %s", resp.Status.String())
  866. }
  867. return nil
  868. }
  869. // addMember submits a configuration change to add a new member on the raft cluster.
  870. func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
  871. node := api.RaftMember{
  872. RaftID: raftID,
  873. NodeID: nodeID,
  874. Addr: addr,
  875. }
  876. meta, err := node.Marshal()
  877. if err != nil {
  878. return err
  879. }
  880. cc := raftpb.ConfChange{
  881. Type: raftpb.ConfChangeAddNode,
  882. NodeID: raftID,
  883. Context: meta,
  884. }
  885. // Wait for a raft round to process the configuration change
  886. return n.configure(ctx, cc)
  887. }
  888. // updateNodeBlocking runs synchronous job to update node address in whole cluster.
  889. func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
  890. m := n.cluster.GetMember(id)
  891. if m == nil {
  892. return errors.Errorf("member %x is not found for update", id)
  893. }
  894. node := api.RaftMember{
  895. RaftID: m.RaftID,
  896. NodeID: m.NodeID,
  897. Addr: addr,
  898. }
  899. meta, err := node.Marshal()
  900. if err != nil {
  901. return err
  902. }
  903. cc := raftpb.ConfChange{
  904. Type: raftpb.ConfChangeUpdateNode,
  905. NodeID: id,
  906. Context: meta,
  907. }
  908. // Wait for a raft round to process the configuration change
  909. return n.configure(ctx, cc)
  910. }
  911. // UpdateNode submits a configuration change to change a member's address.
  912. func (n *Node) UpdateNode(id uint64, addr string) {
  913. ctx, cancel := n.WithContext(context.Background())
  914. defer cancel()
  915. // spawn updating info in raft in background to unblock transport
  916. go func() {
  917. if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
  918. log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
  919. }
  920. }()
  921. }
  922. // Leave asks to a member of the raft to remove
  923. // us from the raft cluster. This method is called
  924. // from a member who is willing to leave its raft
  925. // membership to an active member of the raft
  926. func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
  927. if req.Node == nil {
  928. return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
  929. }
  930. nodeInfo, err := ca.RemoteNode(ctx)
  931. if err != nil {
  932. return nil, err
  933. }
  934. ctx, cancel := n.WithContext(ctx)
  935. defer cancel()
  936. fields := logrus.Fields{
  937. "node.id": nodeInfo.NodeID,
  938. "method": "(*Node).Leave",
  939. "raft_id": fmt.Sprintf("%x", n.Config.ID),
  940. }
  941. if nodeInfo.ForwardedBy != nil {
  942. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  943. }
  944. log.G(ctx).WithFields(fields).Debug("")
  945. if err := n.removeMember(ctx, req.Node.RaftID); err != nil {
  946. return nil, err
  947. }
  948. return &api.LeaveResponse{}, nil
  949. }
  950. // CanRemoveMember checks if a member can be removed from
  951. // the context of the current node.
  952. func (n *Node) CanRemoveMember(id uint64) bool {
  953. members := n.cluster.Members()
  954. nreachable := 0 // reachable managers after removal
  955. for _, m := range members {
  956. if m.RaftID == id {
  957. continue
  958. }
  959. // Local node from where the remove is issued
  960. if m.RaftID == n.Config.ID {
  961. nreachable++
  962. continue
  963. }
  964. if n.transport.Active(m.RaftID) {
  965. nreachable++
  966. }
  967. }
  968. nquorum := (len(members)-1)/2 + 1
  969. if nreachable < nquorum {
  970. return false
  971. }
  972. return true
  973. }
  974. func (n *Node) removeMember(ctx context.Context, id uint64) error {
  975. // can't stop the raft node while an async RPC is in progress
  976. n.stopMu.RLock()
  977. defer n.stopMu.RUnlock()
  978. if !n.IsMember() {
  979. return ErrNoRaftMember
  980. }
  981. if !n.isLeader() {
  982. return ErrLostLeadership
  983. }
  984. n.membershipLock.Lock()
  985. defer n.membershipLock.Unlock()
  986. if !n.CanRemoveMember(id) {
  987. return ErrCannotRemoveMember
  988. }
  989. cc := raftpb.ConfChange{
  990. ID: id,
  991. Type: raftpb.ConfChangeRemoveNode,
  992. NodeID: id,
  993. Context: []byte(""),
  994. }
  995. return n.configure(ctx, cc)
  996. }
  997. // TransferLeadership attempts to transfer leadership to a different node,
  998. // and wait for the transfer to happen.
  999. func (n *Node) TransferLeadership(ctx context.Context) error {
  1000. ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
  1001. defer cancelTransfer()
  1002. n.stopMu.RLock()
  1003. defer n.stopMu.RUnlock()
  1004. if !n.IsMember() {
  1005. return ErrNoRaftMember
  1006. }
  1007. if !n.isLeader() {
  1008. return ErrLostLeadership
  1009. }
  1010. transferee, err := n.transport.LongestActive()
  1011. if err != nil {
  1012. return errors.Wrap(err, "failed to get longest-active member")
  1013. }
  1014. start := time.Now()
  1015. n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
  1016. ticker := time.NewTicker(n.opts.TickInterval / 10)
  1017. defer ticker.Stop()
  1018. var leader uint64
  1019. for {
  1020. leader = n.leader()
  1021. if leader != raft.None && leader != n.Config.ID {
  1022. break
  1023. }
  1024. select {
  1025. case <-ctx.Done():
  1026. return ctx.Err()
  1027. case <-ticker.C:
  1028. }
  1029. }
  1030. log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
  1031. return nil
  1032. }
  1033. // RemoveMember submits a configuration change to remove a member from the raft cluster
  1034. // after checking if the operation would not result in a loss of quorum.
  1035. func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
  1036. ctx, cancel := n.WithContext(ctx)
  1037. defer cancel()
  1038. return n.removeMember(ctx, id)
  1039. }
  1040. // processRaftMessageLogger is used to lazily create a logger for
  1041. // ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
  1042. // formatting strings and allocating a logger when it won't be used.
  1043. func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
  1044. fields := logrus.Fields{
  1045. "method": "(*Node).ProcessRaftMessage",
  1046. }
  1047. if n.IsMember() {
  1048. fields["raft_id"] = fmt.Sprintf("%x", n.Config.ID)
  1049. }
  1050. if msg != nil && msg.Message != nil {
  1051. fields["from"] = fmt.Sprintf("%x", msg.Message.From)
  1052. }
  1053. return log.G(ctx).WithFields(fields)
  1054. }
  1055. func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
  1056. // too early
  1057. if !n.IsMember() {
  1058. return nil
  1059. }
  1060. p, ok := peer.FromContext(ctx)
  1061. if !ok {
  1062. return nil
  1063. }
  1064. oldAddr, err := n.transport.PeerAddr(id)
  1065. if err != nil {
  1066. return err
  1067. }
  1068. if oldAddr == "" {
  1069. // Don't know the address of the peer yet, so can't report an
  1070. // update.
  1071. return nil
  1072. }
  1073. newHost, _, err := net.SplitHostPort(p.Addr.String())
  1074. if err != nil {
  1075. return err
  1076. }
  1077. _, officialPort, err := net.SplitHostPort(oldAddr)
  1078. if err != nil {
  1079. return err
  1080. }
  1081. newAddr := net.JoinHostPort(newHost, officialPort)
  1082. if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil {
  1083. return err
  1084. }
  1085. return nil
  1086. }
  1087. // ProcessRaftMessage calls 'Step' which advances the
  1088. // raft state machine with the provided message on the
  1089. // receiving node
  1090. func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
  1091. if msg == nil || msg.Message == nil {
  1092. n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
  1093. return &api.ProcessRaftMessageResponse{}, nil
  1094. }
  1095. // Don't process the message if this comes from
  1096. // a node in the remove set
  1097. if n.cluster.IsIDRemoved(msg.Message.From) {
  1098. n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
  1099. return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
  1100. }
  1101. ctx, cancel := n.WithContext(ctx)
  1102. defer cancel()
  1103. // TODO(aaronl): Address changes are temporarily disabled.
  1104. // See https://github.com/docker/docker/issues/30455.
  1105. // This should be reenabled in the future with additional
  1106. // safeguards (perhaps storing multiple addresses per node).
  1107. //if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
  1108. // log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
  1109. //}
  1110. // Reject vote requests from unreachable peers
  1111. if msg.Message.Type == raftpb.MsgVote {
  1112. member := n.cluster.GetMember(msg.Message.From)
  1113. if member == nil {
  1114. n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
  1115. return &api.ProcessRaftMessageResponse{}, nil
  1116. }
  1117. if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
  1118. n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
  1119. return &api.ProcessRaftMessageResponse{}, nil
  1120. }
  1121. }
  1122. if msg.Message.Type == raftpb.MsgProp {
  1123. // We don't accept forwarded proposals. Our
  1124. // current architecture depends on only the leader
  1125. // making proposals, so in-flight proposals can be
  1126. // guaranteed not to conflict.
  1127. n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
  1128. return &api.ProcessRaftMessageResponse{}, nil
  1129. }
  1130. // can't stop the raft node while an async RPC is in progress
  1131. n.stopMu.RLock()
  1132. defer n.stopMu.RUnlock()
  1133. if n.IsMember() {
  1134. if msg.Message.To != n.Config.ID {
  1135. n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
  1136. return &api.ProcessRaftMessageResponse{}, nil
  1137. }
  1138. if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
  1139. n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
  1140. }
  1141. }
  1142. return &api.ProcessRaftMessageResponse{}, nil
  1143. }
  1144. // ResolveAddress returns the address reaching for a given node ID.
  1145. func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
  1146. if !n.IsMember() {
  1147. return nil, ErrNoRaftMember
  1148. }
  1149. nodeInfo, err := ca.RemoteNode(ctx)
  1150. if err != nil {
  1151. return nil, err
  1152. }
  1153. fields := logrus.Fields{
  1154. "node.id": nodeInfo.NodeID,
  1155. "method": "(*Node).ResolveAddress",
  1156. "raft_id": fmt.Sprintf("%x", n.Config.ID),
  1157. }
  1158. if nodeInfo.ForwardedBy != nil {
  1159. fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
  1160. }
  1161. log.G(ctx).WithFields(fields).Debug("")
  1162. member := n.cluster.GetMember(msg.RaftID)
  1163. if member == nil {
  1164. return nil, grpc.Errorf(codes.NotFound, "member %x not found", msg.RaftID)
  1165. }
  1166. return &api.ResolveAddressResponse{Addr: member.Addr}, nil
  1167. }
  1168. func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
  1169. leader, err := n.Leader()
  1170. if err != nil {
  1171. return nil, err
  1172. }
  1173. if leader == n.Config.ID {
  1174. return nil, raftselector.ErrIsLeader
  1175. }
  1176. conn, err := n.transport.PeerConn(leader)
  1177. if err != nil {
  1178. return nil, errors.Wrap(err, "failed to get connection to leader")
  1179. }
  1180. return conn, nil
  1181. }
  1182. // LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
  1183. // if current machine is leader.
  1184. func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
  1185. cc, err := n.getLeaderConn()
  1186. if err == nil {
  1187. return cc, nil
  1188. }
  1189. if err == raftselector.ErrIsLeader {
  1190. return nil, err
  1191. }
  1192. if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
  1193. return nil, errLostQuorum
  1194. }
  1195. ticker := time.NewTicker(1 * time.Second)
  1196. defer ticker.Stop()
  1197. for {
  1198. select {
  1199. case <-ticker.C:
  1200. cc, err := n.getLeaderConn()
  1201. if err == nil {
  1202. return cc, nil
  1203. }
  1204. if err == raftselector.ErrIsLeader {
  1205. return nil, err
  1206. }
  1207. case <-ctx.Done():
  1208. return nil, ctx.Err()
  1209. }
  1210. }
  1211. }
  1212. // registerNode registers a new node on the cluster memberlist
  1213. func (n *Node) registerNode(node *api.RaftMember) error {
  1214. if n.cluster.IsIDRemoved(node.RaftID) {
  1215. return nil
  1216. }
  1217. member := &membership.Member{}
  1218. existingMember := n.cluster.GetMember(node.RaftID)
  1219. if existingMember != nil {
  1220. // Member already exists
  1221. // If the address is different from what we thought it was,
  1222. // update it. This can happen if we just joined a cluster
  1223. // and are adding ourself now with the remotely-reachable
  1224. // address.
  1225. if existingMember.Addr != node.Addr {
  1226. if node.RaftID != n.Config.ID {
  1227. if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
  1228. return err
  1229. }
  1230. }
  1231. member.RaftMember = node
  1232. n.cluster.AddMember(member)
  1233. }
  1234. return nil
  1235. }
  1236. // Avoid opening a connection to the local node
  1237. if node.RaftID != n.Config.ID {
  1238. if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
  1239. return err
  1240. }
  1241. }
  1242. member.RaftMember = node
  1243. err := n.cluster.AddMember(member)
  1244. if err != nil {
  1245. if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
  1246. return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
  1247. }
  1248. return err
  1249. }
  1250. return nil
  1251. }
  1252. // ProposeValue calls Propose on the raft and waits
  1253. // on the commit log action before returning a result
  1254. func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
  1255. ctx, cancel := n.WithContext(ctx)
  1256. defer cancel()
  1257. _, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
  1258. if err != nil {
  1259. return err
  1260. }
  1261. return nil
  1262. }
  1263. // GetVersion returns the sequence information for the current raft round.
  1264. func (n *Node) GetVersion() *api.Version {
  1265. n.stopMu.RLock()
  1266. defer n.stopMu.RUnlock()
  1267. if !n.IsMember() {
  1268. return nil
  1269. }
  1270. status := n.Status()
  1271. return &api.Version{Index: status.Commit}
  1272. }
  1273. // ChangesBetween returns the changes starting after "from", up to and
  1274. // including "to". If these changes are not available because the log
  1275. // has been compacted, an error will be returned.
  1276. func (n *Node) ChangesBetween(from, to api.Version) ([]state.Change, error) {
  1277. n.stopMu.RLock()
  1278. defer n.stopMu.RUnlock()
  1279. if from.Index > to.Index {
  1280. return nil, errors.New("versions are out of order")
  1281. }
  1282. if !n.IsMember() {
  1283. return nil, ErrNoRaftMember
  1284. }
  1285. // never returns error
  1286. last, _ := n.raftStore.LastIndex()
  1287. if to.Index > last {
  1288. return nil, errors.New("last version is out of bounds")
  1289. }
  1290. pbs, err := n.raftStore.Entries(from.Index+1, to.Index+1, math.MaxUint64)
  1291. if err != nil {
  1292. return nil, err
  1293. }
  1294. var changes []state.Change
  1295. for _, pb := range pbs {
  1296. if pb.Type != raftpb.EntryNormal || pb.Data == nil {
  1297. continue
  1298. }
  1299. r := &api.InternalRaftRequest{}
  1300. err := proto.Unmarshal(pb.Data, r)
  1301. if err != nil {
  1302. return nil, errors.Wrap(err, "error umarshalling internal raft request")
  1303. }
  1304. if r.Action != nil {
  1305. changes = append(changes, state.Change{StoreActions: r.Action, Version: api.Version{Index: pb.Index}})
  1306. }
  1307. }
  1308. return changes, nil
  1309. }
  1310. // SubscribePeers subscribes to peer updates in cluster. It sends always full
  1311. // list of peers.
  1312. func (n *Node) SubscribePeers() (q chan events.Event, cancel func()) {
  1313. return n.cluster.PeersBroadcast.Watch()
  1314. }
  1315. // GetMemberlist returns the current list of raft members in the cluster.
  1316. func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
  1317. memberlist := make(map[uint64]*api.RaftMember)
  1318. members := n.cluster.Members()
  1319. leaderID, err := n.Leader()
  1320. if err != nil {
  1321. leaderID = raft.None
  1322. }
  1323. for id, member := range members {
  1324. reachability := api.RaftMemberStatus_REACHABLE
  1325. leader := false
  1326. if member.RaftID != n.Config.ID {
  1327. if !n.transport.Active(member.RaftID) {
  1328. reachability = api.RaftMemberStatus_UNREACHABLE
  1329. }
  1330. }
  1331. if member.RaftID == leaderID {
  1332. leader = true
  1333. }
  1334. memberlist[id] = &api.RaftMember{
  1335. RaftID: member.RaftID,
  1336. NodeID: member.NodeID,
  1337. Addr: member.Addr,
  1338. Status: api.RaftMemberStatus{
  1339. Leader: leader,
  1340. Reachability: reachability,
  1341. },
  1342. }
  1343. }
  1344. return memberlist
  1345. }
  1346. // Status returns status of underlying etcd.Node.
  1347. func (n *Node) Status() raft.Status {
  1348. return n.raftNode.Status()
  1349. }
  1350. // GetMemberByNodeID returns member information based
  1351. // on its generic Node ID.
  1352. func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
  1353. members := n.cluster.Members()
  1354. for _, member := range members {
  1355. if member.NodeID == nodeID {
  1356. return member
  1357. }
  1358. }
  1359. return nil
  1360. }
  1361. // IsMember checks if the raft node has effectively joined
  1362. // a cluster of existing members.
  1363. func (n *Node) IsMember() bool {
  1364. return atomic.LoadUint32(&n.isMember) == 1
  1365. }
  1366. // Saves a log entry to our Store
  1367. func (n *Node) saveToStorage(
  1368. ctx context.Context,
  1369. raftConfig *api.RaftConfig,
  1370. hardState raftpb.HardState,
  1371. entries []raftpb.Entry,
  1372. snapshot raftpb.Snapshot,
  1373. ) (err error) {
  1374. if !raft.IsEmptySnap(snapshot) {
  1375. if err := n.raftLogger.SaveSnapshot(snapshot); err != nil {
  1376. return errors.Wrap(err, "failed to save snapshot")
  1377. }
  1378. if err := n.raftLogger.GC(snapshot.Metadata.Index, snapshot.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
  1379. log.G(ctx).WithError(err).Error("unable to clean old snapshots and WALs")
  1380. }
  1381. if err = n.raftStore.ApplySnapshot(snapshot); err != nil {
  1382. return errors.Wrap(err, "failed to apply snapshot on raft node")
  1383. }
  1384. }
  1385. if err := n.raftLogger.SaveEntries(hardState, entries); err != nil {
  1386. return errors.Wrap(err, "failed to save raft log entries")
  1387. }
  1388. if len(entries) > 0 {
  1389. lastIndex := entries[len(entries)-1].Index
  1390. if lastIndex > n.writtenWALIndex {
  1391. n.writtenWALIndex = lastIndex
  1392. }
  1393. }
  1394. if err = n.raftStore.Append(entries); err != nil {
  1395. return errors.Wrap(err, "failed to append raft log entries")
  1396. }
  1397. return nil
  1398. }
  1399. // processInternalRaftRequest sends a message to nodes participating
  1400. // in the raft to apply a log entry and then waits for it to be applied
  1401. // on the server. It will block until the update is performed, there is
  1402. // an error or until the raft node finalizes all the proposals on node
  1403. // shutdown.
  1404. func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
  1405. n.stopMu.RLock()
  1406. if !n.IsMember() {
  1407. n.stopMu.RUnlock()
  1408. return nil, ErrStopped
  1409. }
  1410. n.waitProp.Add(1)
  1411. defer n.waitProp.Done()
  1412. n.stopMu.RUnlock()
  1413. r.ID = n.reqIDGen.Next()
  1414. // This must be derived from the context which is cancelled by stop()
  1415. // to avoid a deadlock on shutdown.
  1416. waitCtx, cancel := context.WithCancel(ctx)
  1417. ch := n.wait.register(r.ID, cb, cancel)
  1418. // Do this check after calling register to avoid a race.
  1419. if atomic.LoadUint32(&n.signalledLeadership) != 1 {
  1420. n.wait.cancel(r.ID)
  1421. return nil, ErrLostLeadership
  1422. }
  1423. data, err := r.Marshal()
  1424. if err != nil {
  1425. n.wait.cancel(r.ID)
  1426. return nil, err
  1427. }
  1428. if len(data) > store.MaxTransactionBytes {
  1429. n.wait.cancel(r.ID)
  1430. return nil, ErrRequestTooLarge
  1431. }
  1432. err = n.raftNode.Propose(waitCtx, data)
  1433. if err != nil {
  1434. n.wait.cancel(r.ID)
  1435. return nil, err
  1436. }
  1437. select {
  1438. case x, ok := <-ch:
  1439. if !ok {
  1440. return nil, ErrLostLeadership
  1441. }
  1442. return x.(proto.Message), nil
  1443. case <-waitCtx.Done():
  1444. n.wait.cancel(r.ID)
  1445. // if channel is closed, wait item was canceled, otherwise it was triggered
  1446. x, ok := <-ch
  1447. if !ok {
  1448. return nil, ErrLostLeadership
  1449. }
  1450. return x.(proto.Message), nil
  1451. case <-ctx.Done():
  1452. n.wait.cancel(r.ID)
  1453. // if channel is closed, wait item was canceled, otherwise it was triggered
  1454. x, ok := <-ch
  1455. if !ok {
  1456. return nil, ctx.Err()
  1457. }
  1458. return x.(proto.Message), nil
  1459. }
  1460. }
  1461. // configure sends a configuration change through consensus and
  1462. // then waits for it to be applied to the server. It will block
  1463. // until the change is performed or there is an error.
  1464. func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
  1465. cc.ID = n.reqIDGen.Next()
  1466. ctx, cancel := context.WithCancel(ctx)
  1467. ch := n.wait.register(cc.ID, nil, cancel)
  1468. if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil {
  1469. n.wait.cancel(cc.ID)
  1470. return err
  1471. }
  1472. select {
  1473. case x := <-ch:
  1474. if err, ok := x.(error); ok {
  1475. return err
  1476. }
  1477. if x != nil {
  1478. log.G(ctx).Panic("raft: configuration change error, return type should always be error")
  1479. }
  1480. return nil
  1481. case <-ctx.Done():
  1482. n.wait.cancel(cc.ID)
  1483. return ctx.Err()
  1484. }
  1485. }
  1486. func (n *Node) processCommitted(ctx context.Context, entry raftpb.Entry) error {
  1487. // Process a normal entry
  1488. if entry.Type == raftpb.EntryNormal && entry.Data != nil {
  1489. if err := n.processEntry(ctx, entry); err != nil {
  1490. return err
  1491. }
  1492. }
  1493. // Process a configuration change (add/remove node)
  1494. if entry.Type == raftpb.EntryConfChange {
  1495. n.processConfChange(ctx, entry)
  1496. }
  1497. n.appliedIndex = entry.Index
  1498. return nil
  1499. }
  1500. func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
  1501. r := &api.InternalRaftRequest{}
  1502. err := proto.Unmarshal(entry.Data, r)
  1503. if err != nil {
  1504. return err
  1505. }
  1506. if !n.wait.trigger(r.ID, r) {
  1507. // There was no wait on this ID, meaning we don't have a
  1508. // transaction in progress that would be committed to the
  1509. // memory store by the "trigger" call. Either a different node
  1510. // wrote this to raft, or we wrote it before losing the leader
  1511. // position and cancelling the transaction. Create a new
  1512. // transaction to commit the data.
  1513. // It should not be possible for processInternalRaftRequest
  1514. // to be running in this situation, but out of caution we
  1515. // cancel any current invocations to avoid a deadlock.
  1516. n.wait.cancelAll()
  1517. err := n.memoryStore.ApplyStoreActions(r.Action)
  1518. if err != nil {
  1519. log.G(ctx).WithError(err).Error("failed to apply actions from raft")
  1520. }
  1521. }
  1522. return nil
  1523. }
  1524. func (n *Node) processConfChange(ctx context.Context, entry raftpb.Entry) {
  1525. var (
  1526. err error
  1527. cc raftpb.ConfChange
  1528. )
  1529. if err := proto.Unmarshal(entry.Data, &cc); err != nil {
  1530. n.wait.trigger(cc.ID, err)
  1531. }
  1532. if err := n.cluster.ValidateConfigurationChange(cc); err != nil {
  1533. n.wait.trigger(cc.ID, err)
  1534. }
  1535. switch cc.Type {
  1536. case raftpb.ConfChangeAddNode:
  1537. err = n.applyAddNode(cc)
  1538. case raftpb.ConfChangeUpdateNode:
  1539. err = n.applyUpdateNode(ctx, cc)
  1540. case raftpb.ConfChangeRemoveNode:
  1541. err = n.applyRemoveNode(ctx, cc)
  1542. }
  1543. if err != nil {
  1544. n.wait.trigger(cc.ID, err)
  1545. }
  1546. n.confState = *n.raftNode.ApplyConfChange(cc)
  1547. n.wait.trigger(cc.ID, nil)
  1548. }
  1549. // applyAddNode is called when we receive a ConfChange
  1550. // from a member in the raft cluster, this adds a new
  1551. // node to the existing raft cluster
  1552. func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
  1553. member := &api.RaftMember{}
  1554. err := proto.Unmarshal(cc.Context, member)
  1555. if err != nil {
  1556. return err
  1557. }
  1558. // ID must be non zero
  1559. if member.RaftID == 0 {
  1560. return nil
  1561. }
  1562. if err = n.registerNode(member); err != nil {
  1563. return err
  1564. }
  1565. return nil
  1566. }
  1567. // applyUpdateNode is called when we receive a ConfChange from a member in the
  1568. // raft cluster which update the address of an existing node.
  1569. func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error {
  1570. newMember := &api.RaftMember{}
  1571. err := proto.Unmarshal(cc.Context, newMember)
  1572. if err != nil {
  1573. return err
  1574. }
  1575. if newMember.RaftID == n.Config.ID {
  1576. return nil
  1577. }
  1578. if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
  1579. return err
  1580. }
  1581. return n.cluster.UpdateMember(newMember.RaftID, newMember)
  1582. }
  1583. // applyRemoveNode is called when we receive a ConfChange
  1584. // from a member in the raft cluster, this removes a node
  1585. // from the existing raft cluster
  1586. func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err error) {
  1587. // If the node from where the remove is issued is
  1588. // a follower and the leader steps down, Campaign
  1589. // to be the leader.
  1590. if cc.NodeID == n.leader() && !n.isLeader() {
  1591. if err = n.raftNode.Campaign(ctx); err != nil {
  1592. return err
  1593. }
  1594. }
  1595. if cc.NodeID == n.Config.ID {
  1596. // wait for the commit ack to be sent before closing connection
  1597. n.asyncTasks.Wait()
  1598. n.NodeRemoved()
  1599. } else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
  1600. return err
  1601. }
  1602. return n.cluster.RemoveMember(cc.NodeID)
  1603. }
  1604. // SubscribeLeadership returns channel to which events about leadership change
  1605. // will be sent in form of raft.LeadershipState. Also cancel func is returned -
  1606. // it should be called when listener is no longer interested in events.
  1607. func (n *Node) SubscribeLeadership() (q chan events.Event, cancel func()) {
  1608. return n.leadershipBroadcast.Watch()
  1609. }
  1610. // createConfigChangeEnts creates a series of Raft entries (i.e.
  1611. // EntryConfChange) to remove the set of given IDs from the cluster. The ID
  1612. // `self` is _not_ removed, even if present in the set.
  1613. // If `self` is not inside the given ids, it creates a Raft entry to add a
  1614. // default member with the given `self`.
  1615. func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
  1616. var ents []raftpb.Entry
  1617. next := index + 1
  1618. found := false
  1619. for _, id := range ids {
  1620. if id == self {
  1621. found = true
  1622. continue
  1623. }
  1624. cc := &raftpb.ConfChange{
  1625. Type: raftpb.ConfChangeRemoveNode,
  1626. NodeID: id,
  1627. }
  1628. data, err := cc.Marshal()
  1629. if err != nil {
  1630. log.L.WithError(err).Panic("marshal configuration change should never fail")
  1631. }
  1632. e := raftpb.Entry{
  1633. Type: raftpb.EntryConfChange,
  1634. Data: data,
  1635. Term: term,
  1636. Index: next,
  1637. }
  1638. ents = append(ents, e)
  1639. next++
  1640. }
  1641. if !found {
  1642. node := &api.RaftMember{RaftID: self}
  1643. meta, err := node.Marshal()
  1644. if err != nil {
  1645. log.L.WithError(err).Panic("marshal member should never fail")
  1646. }
  1647. cc := &raftpb.ConfChange{
  1648. Type: raftpb.ConfChangeAddNode,
  1649. NodeID: self,
  1650. Context: meta,
  1651. }
  1652. data, err := cc.Marshal()
  1653. if err != nil {
  1654. log.L.WithError(err).Panic("marshal configuration change should never fail")
  1655. }
  1656. e := raftpb.Entry{
  1657. Type: raftpb.EntryConfChange,
  1658. Data: data,
  1659. Term: term,
  1660. Index: next,
  1661. }
  1662. ents = append(ents, e)
  1663. }
  1664. return ents
  1665. }
  1666. // getIDs returns an ordered set of IDs included in the given snapshot and
  1667. // the entries. The given snapshot/entries can contain two kinds of
  1668. // ID-related entry:
  1669. // - ConfChangeAddNode, in which case the contained ID will be added into the set.
  1670. // - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
  1671. func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
  1672. ids := make(map[uint64]struct{})
  1673. if snap != nil {
  1674. for _, id := range snap.Metadata.ConfState.Nodes {
  1675. ids[id] = struct{}{}
  1676. }
  1677. }
  1678. for _, e := range ents {
  1679. if e.Type != raftpb.EntryConfChange {
  1680. continue
  1681. }
  1682. if snap != nil && e.Index < snap.Metadata.Index {
  1683. continue
  1684. }
  1685. var cc raftpb.ConfChange
  1686. if err := cc.Unmarshal(e.Data); err != nil {
  1687. log.L.WithError(err).Panic("unmarshal configuration change should never fail")
  1688. }
  1689. switch cc.Type {
  1690. case raftpb.ConfChangeAddNode:
  1691. ids[cc.NodeID] = struct{}{}
  1692. case raftpb.ConfChangeRemoveNode:
  1693. delete(ids, cc.NodeID)
  1694. case raftpb.ConfChangeUpdateNode:
  1695. // do nothing
  1696. default:
  1697. log.L.Panic("ConfChange Type should be either ConfChangeAddNode, or ConfChangeRemoveNode, or ConfChangeUpdateNode!")
  1698. }
  1699. }
  1700. var sids []uint64
  1701. for id := range ids {
  1702. sids = append(sids, id)
  1703. }
  1704. return sids
  1705. }
  1706. func (n *Node) reqTimeout() time.Duration {
  1707. return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
  1708. }