123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package raft
- import (
- "fmt"
- "github.com/coreos/etcd/raft"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/docker/swarmkit/api"
- "github.com/docker/swarmkit/log"
- "github.com/docker/swarmkit/manager/encryption"
- "github.com/docker/swarmkit/manager/state/raft/membership"
- "github.com/docker/swarmkit/manager/state/raft/storage"
- "github.com/docker/swarmkit/manager/state/store"
- "github.com/pkg/errors"
- "golang.org/x/net/context"
- )
- func (n *Node) readFromDisk(ctx context.Context) (*raftpb.Snapshot, storage.WALData, error) {
- keys := n.keyRotator.GetKeys()
- n.raftLogger = &storage.EncryptedRaftLogger{
- StateDir: n.opts.StateDir,
- EncryptionKey: keys.CurrentDEK,
- }
- if keys.PendingDEK != nil {
- n.raftLogger.EncryptionKey = keys.PendingDEK
- }
- snap, walData, err := n.raftLogger.BootstrapFromDisk(ctx)
- if keys.PendingDEK != nil {
- switch errors.Cause(err).(type) {
- case nil:
- if err = n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: keys.PendingDEK}); err != nil {
- err = errors.Wrap(err, "previous key rotation was successful, but unable mark rotation as complete")
- }
- case encryption.ErrCannotDecrypt:
- snap, walData, err = n.raftLogger.BootstrapFromDisk(ctx, keys.CurrentDEK)
- }
- }
- if err != nil {
- return nil, storage.WALData{}, err
- }
- return snap, walData, nil
- }
- // bootstraps a node's raft store from the raft logs and snapshots on disk
- func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
- snapshot, waldata, err := n.readFromDisk(ctx)
- if err != nil {
- return err
- }
- // Read logs to fully catch up store
- var raftNode api.RaftMember
- if err := raftNode.Unmarshal(waldata.Metadata); err != nil {
- return errors.Wrap(err, "failed to unmarshal WAL metadata")
- }
- n.Config.ID = raftNode.RaftID
- if snapshot != nil {
- snapCluster, err := n.clusterSnapshot(snapshot.Data)
- if err != nil {
- return err
- }
- var bootstrapMembers []*api.RaftMember
- if forceNewCluster {
- for _, m := range snapCluster.Members {
- if m.RaftID != n.Config.ID {
- n.cluster.RemoveMember(m.RaftID)
- continue
- }
- bootstrapMembers = append(bootstrapMembers, m)
- }
- } else {
- bootstrapMembers = snapCluster.Members
- }
- n.bootstrapMembers = bootstrapMembers
- for _, removedMember := range snapCluster.Removed {
- n.cluster.RemoveMember(removedMember)
- }
- }
- ents, st := waldata.Entries, waldata.HardState
- // All members that are no longer part of the cluster must be added to
- // the removed list right away, so that we don't try to connect to them
- // before processing the configuration change entries, which could make
- // us get stuck.
- for _, ent := range ents {
- if ent.Index <= st.Commit && ent.Type == raftpb.EntryConfChange {
- var cc raftpb.ConfChange
- if err := cc.Unmarshal(ent.Data); err != nil {
- return errors.Wrap(err, "failed to unmarshal config change")
- }
- if cc.Type == raftpb.ConfChangeRemoveNode {
- n.cluster.RemoveMember(cc.NodeID)
- }
- }
- }
- if forceNewCluster {
- // discard the previously uncommitted entries
- for i, ent := range ents {
- if ent.Index > st.Commit {
- log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i)
- ents = ents[:i]
- break
- }
- }
- // force append the configuration change entries
- toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit)
- // All members that are being removed as part of the
- // force-new-cluster process must be added to the
- // removed list right away, so that we don't try to
- // connect to them before processing the configuration
- // change entries, which could make us get stuck.
- for _, ccEnt := range toAppEnts {
- if ccEnt.Type == raftpb.EntryConfChange {
- var cc raftpb.ConfChange
- if err := cc.Unmarshal(ccEnt.Data); err != nil {
- return errors.Wrap(err, "error unmarshalling force-new-cluster config change")
- }
- if cc.Type == raftpb.ConfChangeRemoveNode {
- n.cluster.RemoveMember(cc.NodeID)
- }
- }
- }
- ents = append(ents, toAppEnts...)
- // force commit newly appended entries
- err := n.raftLogger.SaveEntries(st, toAppEnts)
- if err != nil {
- log.G(ctx).WithError(err).Fatal("failed to save WAL while forcing new cluster")
- }
- if len(toAppEnts) != 0 {
- st.Commit = toAppEnts[len(toAppEnts)-1].Index
- }
- }
- if snapshot != nil {
- if err := n.raftStore.ApplySnapshot(*snapshot); err != nil {
- return err
- }
- }
- if err := n.raftStore.SetHardState(st); err != nil {
- return err
- }
- return n.raftStore.Append(ents)
- }
- func (n *Node) newRaftLogs(nodeID string) (raft.Peer, error) {
- raftNode := &api.RaftMember{
- RaftID: n.Config.ID,
- NodeID: nodeID,
- Addr: n.opts.Addr,
- }
- metadata, err := raftNode.Marshal()
- if err != nil {
- return raft.Peer{}, errors.Wrap(err, "error marshalling raft node")
- }
- if err := n.raftLogger.BootstrapNew(metadata); err != nil {
- return raft.Peer{}, err
- }
- n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
- return raft.Peer{ID: n.Config.ID, Context: metadata}, nil
- }
- func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
- snapshot := api.Snapshot{Version: api.Snapshot_V0}
- for _, member := range n.cluster.Members() {
- snapshot.Membership.Members = append(snapshot.Membership.Members,
- &api.RaftMember{
- NodeID: member.NodeID,
- RaftID: member.RaftID,
- Addr: member.Addr,
- })
- }
- snapshot.Membership.Removed = n.cluster.Removed()
- viewStarted := make(chan struct{})
- n.asyncTasks.Add(1)
- n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
- go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
- defer func() {
- n.asyncTasks.Done()
- n.snapshotInProgress <- snapshotMeta
- }()
- var err error
- n.memoryStore.View(func(tx store.ReadTx) {
- close(viewStarted)
- var storeSnapshot *api.StoreSnapshot
- storeSnapshot, err = n.memoryStore.Save(tx)
- snapshot.Store = *storeSnapshot
- })
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to read snapshot from store")
- return
- }
- d, err := snapshot.Marshal()
- if err != nil {
- log.G(ctx).WithError(err).Error("failed to marshal snapshot")
- return
- }
- snap, err := n.raftStore.CreateSnapshot(appliedIndex, &n.confState, d)
- if err == nil {
- if err := n.raftLogger.SaveSnapshot(snap); err != nil {
- log.G(ctx).WithError(err).Error("failed to save snapshot")
- return
- }
- snapshotMeta = snap.Metadata
- if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
- err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
- if err != nil && err != raft.ErrCompacted {
- log.G(ctx).WithError(err).Error("failed to compact snapshot")
- }
- }
- } else if err != raft.ErrSnapOutOfDate {
- log.G(ctx).WithError(err).Error("failed to create snapshot")
- }
- }(n.appliedIndex, n.snapshotMeta)
- // Wait for the goroutine to establish a read transaction, to make
- // sure it sees the state as of this moment.
- <-viewStarted
- }
- func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
- var snapshot api.Snapshot
- if err := snapshot.Unmarshal(data); err != nil {
- return snapshot.Membership, err
- }
- if snapshot.Version != api.Snapshot_V0 {
- return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
- }
- if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
- return snapshot.Membership, err
- }
- return snapshot.Membership, nil
- }
|