Browse Source

Eliminate the lost wakeups problem

Eric Zhang 4 years ago
parent
commit
56184b569b
2 changed files with 77 additions and 6 deletions
  1. 5 6
      rustpad-server/src/rustpad.rs
  2. 72 0
      rustpad-server/tests/stress.rs

+ 5 - 6
rustpad-server/src/rustpad.rs

@@ -2,7 +2,6 @@
 
 use std::collections::HashMap;
 use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::Duration;
 
 use anyhow::{bail, Context, Result};
 use futures::prelude::*;
@@ -11,7 +10,6 @@ use operational_transform::OperationSeq;
 use parking_lot::{RwLock, RwLockUpgradableReadGuard};
 use serde::{Deserialize, Serialize};
 use tokio::sync::{broadcast, Notify};
-use tokio::time;
 use warp::ws::{Message, WebSocket};
 
 /// The main object representing a collaborative session.
@@ -130,15 +128,16 @@ impl Rustpad {
         let mut revision: usize = 0;
 
         loop {
+            // In order to avoid the "lost wakeup" problem, we first request a
+            // notification, **then** check the current state for new revisions.
+            // This is the same approach that `tokio::sync::watch` takes.
+            let notified = self.notify.notified();
             if self.revision() > revision {
                 revision = self.send_history(revision, &mut socket).await?
             }
 
-            let sleep = time::sleep(Duration::from_millis(500));
-            tokio::pin!(sleep);
             tokio::select! {
-                _ = &mut sleep => {}
-                _ = self.notify.notified() => {}
+                _ = notified => {}
                 update = update_rx.recv() => {
                     socket.send(update?.into()).await?;
                 }

+ 72 - 0
rustpad-server/tests/stress.rs

@@ -0,0 +1,72 @@
+//! Stress tests for liveness and consistency properties.
+
+use std::time::Duration;
+
+use anyhow::{anyhow, Result};
+use common::*;
+use log::info;
+use operational_transform::OperationSeq;
+use rustpad_server::server;
+use serde_json::{json, Value};
+use tokio::time::Instant;
+
+pub mod common;
+
+#[tokio::test]
+async fn test_lost_wakeups() -> Result<()> {
+    pretty_env_logger::try_init().ok();
+    let filter = server();
+
+    expect_text(&filter, "stress", "").await;
+
+    let mut client = connect(&filter, "stress").await?;
+    let msg = client.recv().await?;
+    assert_eq!(msg, json!({ "Identity": 0 }));
+
+    let mut client2 = connect(&filter, "stress").await?;
+    let msg = client2.recv().await?;
+    assert_eq!(msg, json!({ "Identity": 1 }));
+
+    let mut revision = 0;
+    for i in 0..100 {
+        let num_edits = i % 5 + 1;
+        for _ in 0..num_edits {
+            let mut operation = OperationSeq::default();
+            operation.retain(revision);
+            operation.insert("a");
+            let msg = json!({
+                "Edit": {
+                    "revision": revision,
+                    "operation": operation
+                }
+            });
+            client.send(&msg).await;
+            revision += 1;
+        }
+
+        let start = Instant::now();
+
+        let num_ops = |msg: &Value| -> Option<usize> {
+            Some(msg.get("History")?.get("operations")?.as_array()?.len())
+        };
+
+        let mut total = 0;
+        while total < num_edits {
+            let msg = client.recv().await?;
+            total += num_ops(&msg).ok_or(anyhow!("missing json key"))?;
+        }
+
+        let mut total2 = 0;
+        while total2 < num_edits {
+            let msg = client2.recv().await?;
+            total2 += num_ops(&msg).ok_or(anyhow!("missing json key"))?;
+        }
+
+        info!("took {} ms", start.elapsed().as_millis());
+        assert!(start.elapsed() <= Duration::from_millis(200));
+    }
+
+    expect_text(&filter, "stress", &"a".repeat(revision as usize)).await;
+
+    Ok(())
+}