|
@@ -196,9 +196,44 @@ func needsSplitting(m *raftpb.Message) bool {
|
|
}
|
|
}
|
|
|
|
|
|
func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
|
|
func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
|
|
- ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
|
|
|
|
|
|
+ // These lines used to be in the code, but they've been removed. I'm
|
|
|
|
+ // leaving them in in a comment just in case they cause some unforeseen
|
|
|
|
+ // breakage later, to show why they were removed.
|
|
|
|
+ //
|
|
|
|
+ // ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
|
|
|
|
+ // defer cancel()
|
|
|
|
+ //
|
|
|
|
+ // Basically, these lines created a timeout that applied not to each chunk
|
|
|
|
+ // of a streaming message, but to the whole streaming process. With a
|
|
|
|
+ // sufficiently large raft log, the bandwidth on some connections can not
|
|
|
|
+ // physically be enough to fit within the default 2 second timeout.
|
|
|
|
+ // Further, it seems that because of some gRPC magic, the timeout was
|
|
|
|
+ // getting propagated to the stream *server*, meaning it wasn't even the
|
|
|
|
+ // sender timing out, it was the receiver.
|
|
|
|
+ //
|
|
|
|
+ // It should be fine to remove this timeout. The whole purpose of this
|
|
|
|
+ // method is to send very large raft messages that could take several
|
|
|
|
+ // seconds to send.
|
|
|
|
+
|
|
|
|
+ ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
defer cancel()
|
|
|
|
|
|
|
|
+ // This is a bootleg watchdog timer. If the timer elapses without something
|
|
|
|
+ // being written to the bump channel, it will cancel the context.
|
|
|
|
+ //
|
|
|
|
+ // We use this because the operations on this stream *must* either time out
|
|
|
|
+ // or succeed for raft to function correctly. We can't just time out the
|
|
|
|
+ // whole operation, because of the reasons stated above. But we also only
|
|
|
|
+ // set the context once, when we create the stream, and so can't set an
|
|
|
|
+ // individual timeout for each stream operation.
|
|
|
|
+ //
|
|
|
|
+ // By doing it as this watchdog-type structure, we can time out individual
|
|
|
|
+ // operations by canceling the context on our own terms.
|
|
|
|
+ t := time.AfterFunc(p.tr.config.SendTimeout, cancel)
|
|
|
|
+ defer t.Stop()
|
|
|
|
+
|
|
|
|
+ bump := func() { t.Reset(p.tr.config.SendTimeout) }
|
|
|
|
+
|
|
var err error
|
|
var err error
|
|
var stream api.Raft_StreamRaftMessageClient
|
|
var stream api.Raft_StreamRaftMessageClient
|
|
stream, err = api.NewRaftClient(p.conn()).StreamRaftMessage(ctx)
|
|
stream, err = api.NewRaftClient(p.conn()).StreamRaftMessage(ctx)
|
|
@@ -222,6 +257,9 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
|
|
stream.CloseAndRecv()
|
|
stream.CloseAndRecv()
|
|
break
|
|
break
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // If the send succeeds, bump the watchdog timer.
|
|
|
|
+ bump()
|
|
}
|
|
}
|
|
|
|
|
|
// Finished sending all the messages.
|
|
// Finished sending all the messages.
|