//! Server backend for the Rustpad collaborative text editor. #![forbid(unsafe_code)] #![warn(missing_docs)] use std::sync::Arc; use std::time::{Duration, SystemTime}; use dashmap::DashMap; use log::{error, info}; use serde::Serialize; use tokio::time::{self, Instant}; use warp::{filters::BoxedFilter, ws::Ws, Filter, Rejection, Reply}; use crate::{database::Database, rustpad::Rustpad}; pub mod database; mod ot; mod rustpad; /// An entry stored in the global server map. /// /// Each entry corresponds to a single document. This is garbage collected by a /// background task after one day of inactivity, to avoid server memory usage /// growing without bound. struct Document { last_accessed: Instant, rustpad: Arc, } impl Document { fn new(rustpad: Arc) -> Self { Self { last_accessed: Instant::now(), rustpad, } } } impl Drop for Document { fn drop(&mut self) { self.rustpad.kill(); } } #[derive(Debug)] struct CustomReject(anyhow::Error); impl warp::reject::Reject for CustomReject {} /// The shared state of the server, accessible from within request handlers. #[derive(Clone)] struct ServerState { /// Concurrent map storing in-memory documents. documents: Arc>, /// Connection to the database pool, if persistence is enabled. database: Option, } /// Statistics about the server, returned from an API endpoint. #[derive(Serialize)] struct Stats { /// System time when the server started, in seconds since Unix epoch. start_time: u64, /// Number of documents currently tracked by the server. num_documents: usize, /// Number of documents persisted in the database. database_size: usize, } /// Server configuration. #[derive(Clone, Debug)] pub struct ServerConfig { /// Number of days to clean up documents after inactivity. pub expiry_days: u32, /// Database object, for persistence if desired. pub database: Option, } impl Default for ServerConfig { fn default() -> Self { Self { expiry_days: 1, database: None, } } } /// A combined filter handling all server routes. pub fn server(config: ServerConfig) -> BoxedFilter<(impl Reply,)> { warp::path("api") .and(backend(config)) .or(frontend()) .boxed() } /// Construct routes for static files from React. fn frontend() -> BoxedFilter<(impl Reply,)> { warp::fs::dir("dist").boxed() } /// Construct backend routes, including WebSocket handlers. fn backend(config: ServerConfig) -> BoxedFilter<(impl Reply,)> { let state = ServerState { documents: Default::default(), database: config.database, }; tokio::spawn(cleaner(state.clone(), config.expiry_days)); let state_filter = warp::any().map(move || state.clone()); let socket = warp::path!("socket" / String) .and(warp::ws()) .and(state_filter.clone()) .and_then(socket_handler); let text = warp::path!("text" / String) .and(state_filter.clone()) .and_then(text_handler); let start_time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("SystemTime returned before UNIX_EPOCH") .as_secs(); let stats = warp::path!("stats") .and(warp::any().map(move || start_time)) .and(state_filter) .and_then(stats_handler); socket.or(text).or(stats).boxed() } /// Handler for the `/api/socket/{id}` endpoint. async fn socket_handler(id: String, ws: Ws, state: ServerState) -> Result { use dashmap::mapref::entry::Entry; let mut entry = match state.documents.entry(id.clone()) { Entry::Occupied(e) => e.into_ref(), Entry::Vacant(e) => { let rustpad = Arc::new(match &state.database { Some(db) => db.load(&id).await.map(Rustpad::from).unwrap_or_default(), None => Rustpad::default(), }); if let Some(db) = &state.database { tokio::spawn(persister(id, Arc::clone(&rustpad), db.clone())); } e.insert(Document::new(rustpad)) } }; let value = entry.value_mut(); value.last_accessed = Instant::now(); let rustpad = Arc::clone(&value.rustpad); Ok(ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await })) } /// Handler for the `/api/text/{id}` endpoint. async fn text_handler(id: String, state: ServerState) -> Result { Ok(match state.documents.get(&id) { Some(value) => value.rustpad.text(), None => { if let Some(db) = &state.database { db.load(&id) .await .map(|document| document.text) .unwrap_or_default() } else { String::new() } } }) } /// Handler for the `/api/stats` endpoint. async fn stats_handler(start_time: u64, state: ServerState) -> Result { let num_documents = state.documents.len(); let database_size = match state.database { None => 0, Some(db) => match db.count().await { Ok(size) => size, Err(e) => return Err(warp::reject::custom(CustomReject(e))), }, }; Ok(warp::reply::json(&Stats { start_time, num_documents, database_size, })) } const HOUR: Duration = Duration::from_secs(3600); /// Reclaims memory for documents. async fn cleaner(state: ServerState, expiry_days: u32) { loop { time::sleep(HOUR).await; let mut keys = Vec::new(); for entry in &*state.documents { if entry.last_accessed.elapsed() > HOUR * 24 * expiry_days { keys.push(entry.key().clone()); } } info!("cleaner removing keys: {:?}", keys); for key in keys { state.documents.remove(&key); } } } const PERSIST_INTERVAL: Duration = Duration::from_secs(3); /// Persists changed documents after a fixed time interval. async fn persister(id: String, rustpad: Arc, db: Database) { let mut last_revision = 0; while !rustpad.killed() { time::sleep(PERSIST_INTERVAL).await; let revision = rustpad.revision(); if revision > last_revision { info!("persisting revision {} for id = {}", revision, id); if let Err(e) = db.store(&id, &rustpad.snapshot()).await { error!("when persisting document {}: {}", id, e); } last_revision = revision; } } }