Clean up old documents after 24 hours

This commit is contained in:
Eric Zhang 2021-06-03 14:16:29 -05:00
parent 3b14671206
commit cdde359332
6 changed files with 171 additions and 69 deletions

View file

@ -25,9 +25,9 @@ editor that powers VS Code.
Architecturally, client-side code communicates via WebSocket with a central
server that stores in-memory data structures. This makes the editor very fast,
allows us to avoid provisioning a database, and makes testing our code much
easier. The tradeoff is that user documents are transient and lost between
server restarts.
allows us to avoid provisioning a database, and makes testing much easier. The
tradeoff is that documents are transient and lost between server restarts, or
after 24 hours of inactivity.
## Development setup

View file

@ -15,6 +15,6 @@ parking_lot = "0.11.1"
pretty_env_logger = "0.4.0"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
tokio = { version = "1.6.1", features = ["full"] }
tokio = { version = "1.6.1", features = ["full", "test-util"] }
tokio-stream = "0.1.6"
warp = "0.3.1"

View file

@ -3,14 +3,35 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use dashmap::DashMap;
use log::info;
use rustpad::Rustpad;
use tokio::time::{self, Instant};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
mod rustpad;
/// An entry stored in the global server map.
///
/// Each entry corresponds to a single document. This is garbage collected by a
/// background task after one day of inactivity, to avoid server memory usage
/// growing without bound.
struct Document {
last_accessed: Instant,
rustpad: Arc<Rustpad>,
}
impl Default for Document {
fn default() -> Self {
Self {
last_accessed: Instant::now(),
rustpad: Default::default(),
}
}
}
/// A combined filter handling all server routes.
pub fn server() -> BoxedFilter<(impl Reply,)> {
warp::path("api").and(backend()).or(frontend()).boxed()
@ -25,18 +46,22 @@ fn frontend() -> BoxedFilter<(impl Reply,)> {
/// Construct backend routes, including WebSocket handlers.
fn backend() -> BoxedFilter<(impl Reply,)> {
let rustpad_map: Arc<DashMap<String, Arc<Rustpad>>> = Default::default();
let rustpad_map = warp::any().map(move || Arc::clone(&rustpad_map));
let state: Arc<DashMap<String, Document>> = Default::default();
tokio::spawn(cleaner(Arc::clone(&state)));
let state_filter = warp::any().map(move || Arc::clone(&state));
let socket = warp::path("socket")
.and(warp::path::param())
.and(warp::path::end())
.and(warp::ws())
.and(rustpad_map.clone())
.and(state_filter.clone())
.map(
|id: String, ws: Ws, rustpad_map: Arc<DashMap<String, Arc<Rustpad>>>| {
let rustpad = rustpad_map.entry(id).or_default();
let rustpad = Arc::clone(rustpad.value());
|id: String, ws: Ws, state: Arc<DashMap<String, Document>>| {
let mut entry = state.entry(id).or_default();
let value = entry.value_mut();
value.last_accessed = Instant::now();
let rustpad = Arc::clone(&value.rustpad);
ws.on_upgrade(move |socket| async move { rustpad.on_connection(socket).await })
},
);
@ -44,15 +69,33 @@ fn backend() -> BoxedFilter<(impl Reply,)> {
let text = warp::path("text")
.and(warp::path::param())
.and(warp::path::end())
.and(rustpad_map.clone())
.map(
|id: String, rustpad_map: Arc<DashMap<String, Arc<Rustpad>>>| {
rustpad_map
.get(&id)
.map(|rustpad| rustpad.text())
.unwrap_or_default()
},
);
.and(state_filter.clone())
.map(|id: String, state: Arc<DashMap<String, Document>>| {
state
.get(&id)
.map(|value| value.rustpad.text())
.unwrap_or_default()
});
socket.or(text).boxed()
}
const HOUR: Duration = Duration::from_secs(3600);
const DAY: Duration = Duration::from_secs(24 * 3600);
// Reclaims memory for documents after a day of inactivity.
async fn cleaner(state: Arc<DashMap<String, Document>>) {
loop {
time::sleep(HOUR).await;
let mut keys = Vec::new();
for entry in &*state {
if entry.last_accessed.elapsed() > DAY {
keys.push(entry.key().clone());
}
}
info!("cleaner removing keys: {:?}", keys);
for key in keys {
state.remove(&key);
}
}
}

View file

@ -0,0 +1,49 @@
//! Tests to ensure that documents are garbage collected.
use std::time::Duration;
use anyhow::Result;
use common::*;
use operational_transform::OperationSeq;
use rustpad_server::server;
use serde_json::json;
use tokio::time;
pub mod common;
#[tokio::test]
async fn test_cleanup() -> Result<()> {
pretty_env_logger::try_init().ok();
let filter = server();
expect_text(&filter, "old", "").await;
let mut client = connect(&filter, "old").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
let mut operation = OperationSeq::default();
operation.insert("hello");
let msg = json!({
"Edit": {
"revision": 0,
"operation": operation
}
});
client.send(&msg).await;
let msg = client.recv().await?;
msg.get("History")
.expect("should receive history operation");
expect_text(&filter, "old", "hello").await;
let hour = Duration::from_secs(3600);
time::pause();
time::advance(23 * hour).await;
expect_text(&filter, "old", "hello").await;
time::advance(3 * hour).await;
expect_text(&filter, "old", "").await;
Ok(())
}

View file

@ -0,0 +1,44 @@
use anyhow::{anyhow, Result};
use serde_json::Value;
use warp::{filters::BoxedFilter, test::WsClient, Reply};
/// A test WebSocket client that sends and receives JSON messages.
pub struct JsonSocket(WsClient);
impl JsonSocket {
pub async fn send(&mut self, msg: &Value) {
self.0.send_text(msg.to_string()).await
}
pub async fn recv(&mut self) -> Result<Value> {
let msg = self.0.recv().await?;
let msg = msg.to_str().map_err(|_| anyhow!("non-string message"))?;
Ok(serde_json::from_str(&msg)?)
}
pub async fn recv_closed(&mut self) -> Result<()> {
self.0.recv_closed().await.map_err(|e| e.into())
}
}
/// Connect a new test client WebSocket.
pub async fn connect(
filter: &BoxedFilter<(impl Reply + 'static,)>,
id: &str,
) -> Result<JsonSocket> {
let client = warp::test::ws()
.path(&format!("/api/socket/{}", id))
.handshake(filter.clone())
.await?;
Ok(JsonSocket(client))
}
/// Check the text route.
pub async fn expect_text(filter: &BoxedFilter<(impl Reply + 'static,)>, id: &str, text: &str) {
let resp = warp::test::request()
.path(&format!("/api/text/{}", id))
.reply(filter)
.await;
assert_eq!(resp.status(), 200);
assert_eq!(resp.body(), text);
}

View file

@ -1,59 +1,25 @@
//! Basic tests for real-time collaboration.
use std::time::Duration;
use anyhow::{anyhow, Result};
use anyhow::Result;
use common::*;
use log::info;
use operational_transform::OperationSeq;
use rustpad_server::server;
use serde_json::{json, Value};
use serde_json::json;
use tokio::time;
use warp::{filters::BoxedFilter, test::WsClient, Reply};
/// A test WebSocket client that sends and receives JSON messages.
struct JsonSocket(WsClient);
impl JsonSocket {
async fn send(&mut self, msg: &Value) {
self.0.send_text(msg.to_string()).await
}
async fn recv(&mut self) -> Result<Value> {
let msg = self.0.recv().await?;
let msg = msg.to_str().map_err(|_| anyhow!("non-string message"))?;
Ok(serde_json::from_str(&msg)?)
}
async fn recv_closed(&mut self) -> Result<()> {
self.0.recv_closed().await.map_err(|e| e.into())
}
}
/// Connect a new test client WebSocket.
async fn connect(filter: &BoxedFilter<(impl Reply + 'static,)>) -> Result<JsonSocket> {
let client = warp::test::ws()
.path("/api/socket/foobar")
.handshake(filter.clone())
.await?;
Ok(JsonSocket(client))
}
/// Check the text route.
async fn expect_text(filter: &BoxedFilter<(impl Reply + 'static,)>, text: &str) {
let resp = warp::test::request()
.path("/api/text/foobar")
.reply(filter)
.await;
assert_eq!(resp.status(), 200);
assert_eq!(resp.body(), text);
}
pub mod common;
#[tokio::test]
async fn test_single_operation() -> Result<()> {
pretty_env_logger::try_init().ok();
let filter = server();
expect_text(&filter, "").await;
expect_text(&filter, "foobar", "").await;
let mut client = connect(&filter).await?;
let mut client = connect(&filter, "foobar").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
@ -81,7 +47,7 @@ async fn test_single_operation() -> Result<()> {
})
);
expect_text(&filter, "hello").await;
expect_text(&filter, "foobar", "hello").await;
Ok(())
}
@ -90,9 +56,9 @@ async fn test_invalid_operation() -> Result<()> {
pretty_env_logger::try_init().ok();
let filter = server();
expect_text(&filter, "").await;
expect_text(&filter, "foobar", "").await;
let mut client = connect(&filter).await?;
let mut client = connect(&filter, "foobar").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
@ -117,7 +83,7 @@ async fn test_concurrent_transform() -> Result<()> {
let filter = server();
// Connect the first client
let mut client = connect(&filter).await?;
let mut client = connect(&filter, "foobar").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
@ -173,10 +139,10 @@ async fn test_concurrent_transform() -> Result<()> {
}
})
);
expect_text(&filter, "henlo").await;
expect_text(&filter, "foobar", "henlo").await;
// Connect the second client
let mut client2 = connect(&filter).await?;
let mut client2 = connect(&filter, "foobar").await?;
let msg = client2.recv().await?;
assert_eq!(msg, json!({ "Identity": 1 }));
@ -226,6 +192,6 @@ async fn test_concurrent_transform() -> Result<()> {
let msg = client2.recv().await?;
assert_eq!(msg, transformed_op);
expect_text(&filter, "~rust~henlo").await;
expect_text(&filter, "foobar", "~rust~henlo").await;
Ok(())
}