storage.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package raft
  2. import (
  3. "fmt"
  4. "github.com/coreos/etcd/raft"
  5. "github.com/coreos/etcd/raft/raftpb"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/log"
  8. "github.com/docker/swarmkit/manager/encryption"
  9. "github.com/docker/swarmkit/manager/state/raft/membership"
  10. "github.com/docker/swarmkit/manager/state/raft/storage"
  11. "github.com/docker/swarmkit/manager/state/store"
  12. "github.com/pkg/errors"
  13. "golang.org/x/net/context"
  14. )
  15. func (n *Node) readFromDisk(ctx context.Context) (*raftpb.Snapshot, storage.WALData, error) {
  16. keys := n.keyRotator.GetKeys()
  17. n.raftLogger = &storage.EncryptedRaftLogger{
  18. StateDir: n.opts.StateDir,
  19. EncryptionKey: keys.CurrentDEK,
  20. }
  21. if keys.PendingDEK != nil {
  22. n.raftLogger.EncryptionKey = keys.PendingDEK
  23. }
  24. snap, walData, err := n.raftLogger.BootstrapFromDisk(ctx)
  25. if keys.PendingDEK != nil {
  26. switch errors.Cause(err).(type) {
  27. case nil:
  28. if err = n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: keys.PendingDEK}); err != nil {
  29. err = errors.Wrap(err, "previous key rotation was successful, but unable mark rotation as complete")
  30. }
  31. case encryption.ErrCannotDecrypt:
  32. snap, walData, err = n.raftLogger.BootstrapFromDisk(ctx, keys.CurrentDEK)
  33. }
  34. }
  35. if err != nil {
  36. return nil, storage.WALData{}, err
  37. }
  38. return snap, walData, nil
  39. }
  40. // bootstraps a node's raft store from the raft logs and snapshots on disk
  41. func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
  42. snapshot, waldata, err := n.readFromDisk(ctx)
  43. if err != nil {
  44. return err
  45. }
  46. // Read logs to fully catch up store
  47. var raftNode api.RaftMember
  48. if err := raftNode.Unmarshal(waldata.Metadata); err != nil {
  49. return errors.Wrap(err, "failed to unmarshal WAL metadata")
  50. }
  51. n.Config.ID = raftNode.RaftID
  52. if snapshot != nil {
  53. snapCluster, err := n.clusterSnapshot(snapshot.Data)
  54. if err != nil {
  55. return err
  56. }
  57. var bootstrapMembers []*api.RaftMember
  58. if forceNewCluster {
  59. for _, m := range snapCluster.Members {
  60. if m.RaftID != n.Config.ID {
  61. n.cluster.RemoveMember(m.RaftID)
  62. continue
  63. }
  64. bootstrapMembers = append(bootstrapMembers, m)
  65. }
  66. } else {
  67. bootstrapMembers = snapCluster.Members
  68. }
  69. n.bootstrapMembers = bootstrapMembers
  70. for _, removedMember := range snapCluster.Removed {
  71. n.cluster.RemoveMember(removedMember)
  72. }
  73. }
  74. ents, st := waldata.Entries, waldata.HardState
  75. // All members that are no longer part of the cluster must be added to
  76. // the removed list right away, so that we don't try to connect to them
  77. // before processing the configuration change entries, which could make
  78. // us get stuck.
  79. for _, ent := range ents {
  80. if ent.Index <= st.Commit && ent.Type == raftpb.EntryConfChange {
  81. var cc raftpb.ConfChange
  82. if err := cc.Unmarshal(ent.Data); err != nil {
  83. return errors.Wrap(err, "failed to unmarshal config change")
  84. }
  85. if cc.Type == raftpb.ConfChangeRemoveNode {
  86. n.cluster.RemoveMember(cc.NodeID)
  87. }
  88. }
  89. }
  90. if forceNewCluster {
  91. // discard the previously uncommitted entries
  92. for i, ent := range ents {
  93. if ent.Index > st.Commit {
  94. log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i)
  95. ents = ents[:i]
  96. break
  97. }
  98. }
  99. // force append the configuration change entries
  100. toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit)
  101. // All members that are being removed as part of the
  102. // force-new-cluster process must be added to the
  103. // removed list right away, so that we don't try to
  104. // connect to them before processing the configuration
  105. // change entries, which could make us get stuck.
  106. for _, ccEnt := range toAppEnts {
  107. if ccEnt.Type == raftpb.EntryConfChange {
  108. var cc raftpb.ConfChange
  109. if err := cc.Unmarshal(ccEnt.Data); err != nil {
  110. return errors.Wrap(err, "error unmarshalling force-new-cluster config change")
  111. }
  112. if cc.Type == raftpb.ConfChangeRemoveNode {
  113. n.cluster.RemoveMember(cc.NodeID)
  114. }
  115. }
  116. }
  117. ents = append(ents, toAppEnts...)
  118. // force commit newly appended entries
  119. err := n.raftLogger.SaveEntries(st, toAppEnts)
  120. if err != nil {
  121. log.G(ctx).WithError(err).Fatal("failed to save WAL while forcing new cluster")
  122. }
  123. if len(toAppEnts) != 0 {
  124. st.Commit = toAppEnts[len(toAppEnts)-1].Index
  125. }
  126. }
  127. if snapshot != nil {
  128. if err := n.raftStore.ApplySnapshot(*snapshot); err != nil {
  129. return err
  130. }
  131. }
  132. if err := n.raftStore.SetHardState(st); err != nil {
  133. return err
  134. }
  135. return n.raftStore.Append(ents)
  136. }
  137. func (n *Node) newRaftLogs(nodeID string) (raft.Peer, error) {
  138. raftNode := &api.RaftMember{
  139. RaftID: n.Config.ID,
  140. NodeID: nodeID,
  141. Addr: n.opts.Addr,
  142. }
  143. metadata, err := raftNode.Marshal()
  144. if err != nil {
  145. return raft.Peer{}, errors.Wrap(err, "error marshalling raft node")
  146. }
  147. if err := n.raftLogger.BootstrapNew(metadata); err != nil {
  148. return raft.Peer{}, err
  149. }
  150. n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
  151. return raft.Peer{ID: n.Config.ID, Context: metadata}, nil
  152. }
  153. func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
  154. snapshot := api.Snapshot{Version: api.Snapshot_V0}
  155. for _, member := range n.cluster.Members() {
  156. snapshot.Membership.Members = append(snapshot.Membership.Members,
  157. &api.RaftMember{
  158. NodeID: member.NodeID,
  159. RaftID: member.RaftID,
  160. Addr: member.Addr,
  161. })
  162. }
  163. snapshot.Membership.Removed = n.cluster.Removed()
  164. viewStarted := make(chan struct{})
  165. n.asyncTasks.Add(1)
  166. n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
  167. go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
  168. defer func() {
  169. n.asyncTasks.Done()
  170. n.snapshotInProgress <- snapshotMeta
  171. }()
  172. var err error
  173. n.memoryStore.View(func(tx store.ReadTx) {
  174. close(viewStarted)
  175. var storeSnapshot *api.StoreSnapshot
  176. storeSnapshot, err = n.memoryStore.Save(tx)
  177. snapshot.Store = *storeSnapshot
  178. })
  179. if err != nil {
  180. log.G(ctx).WithError(err).Error("failed to read snapshot from store")
  181. return
  182. }
  183. d, err := snapshot.Marshal()
  184. if err != nil {
  185. log.G(ctx).WithError(err).Error("failed to marshal snapshot")
  186. return
  187. }
  188. snap, err := n.raftStore.CreateSnapshot(appliedIndex, &n.confState, d)
  189. if err == nil {
  190. if err := n.raftLogger.SaveSnapshot(snap); err != nil {
  191. log.G(ctx).WithError(err).Error("failed to save snapshot")
  192. return
  193. }
  194. snapshotMeta = snap.Metadata
  195. if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
  196. err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
  197. if err != nil && err != raft.ErrCompacted {
  198. log.G(ctx).WithError(err).Error("failed to compact snapshot")
  199. }
  200. }
  201. } else if err != raft.ErrSnapOutOfDate {
  202. log.G(ctx).WithError(err).Error("failed to create snapshot")
  203. }
  204. }(n.appliedIndex, n.snapshotMeta)
  205. // Wait for the goroutine to establish a read transaction, to make
  206. // sure it sees the state as of this moment.
  207. <-viewStarted
  208. }
  209. func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
  210. var snapshot api.Snapshot
  211. if err := snapshot.Unmarshal(data); err != nil {
  212. return snapshot.Membership, err
  213. }
  214. if snapshot.Version != api.Snapshot_V0 {
  215. return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
  216. }
  217. if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
  218. return snapshot.Membership, err
  219. }
  220. return snapshot.Membership, nil
  221. }