lib.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. //! Server backend for the Rustpad collaborative text editor.
  2. #![forbid(unsafe_code)]
  3. #![warn(missing_docs)]
  4. use std::sync::Arc;
  5. use std::time::{Duration, SystemTime};
  6. use dashmap::DashMap;
  7. use log::info;
  8. use rustpad::Rustpad;
  9. use serde::Serialize;
  10. use tokio::time::{self, Instant};
  11. use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
  12. mod ot;
  13. mod rustpad;
  14. /// An entry stored in the global server map.
  15. ///
  16. /// Each entry corresponds to a single document. This is garbage collected by a
  17. /// background task after one day of inactivity, to avoid server memory usage
  18. /// growing without bound.
  19. struct Document {
  20. last_accessed: Instant,
  21. rustpad: Arc<Rustpad>,
  22. }
  23. impl Default for Document {
  24. fn default() -> Self {
  25. Self {
  26. last_accessed: Instant::now(),
  27. rustpad: Default::default(),
  28. }
  29. }
  30. }
  31. /// Statistics about the server, returned from an API endpoint.
  32. #[derive(Serialize)]
  33. struct Stats {
  34. /// System time when the server started, in seconds since Unix epoch.
  35. start_time: u64,
  36. /// Number of documents currently tracked by the server.
  37. num_documents: usize,
  38. }
  39. /// Data that will be used to configure the server.
  40. #[derive(Debug)]
  41. pub struct ServerData {
  42. /// Number of days to clean up documents after inactivity.
  43. pub expiry_days: u32,
  44. }
  45. impl Default for ServerData {
  46. fn default() -> Self {
  47. Self { expiry_days: 1 }
  48. }
  49. }
  50. /// A combined filter handling all server routes.
  51. pub fn server(data: ServerData) -> BoxedFilter<(impl Reply,)> {
  52. warp::path("api").and(backend(data)).or(frontend()).boxed()
  53. }
  54. /// Construct routes for static files from React.
  55. fn frontend() -> BoxedFilter<(impl Reply,)> {
  56. warp::fs::dir("build").boxed()
  57. }
  58. /// Construct backend routes, including WebSocket handlers.
  59. fn backend(data: ServerData) -> BoxedFilter<(impl Reply,)> {
  60. let state: Arc<DashMap<String, Document>> = Default::default();
  61. tokio::spawn(cleaner(Arc::clone(&state), data.expiry_days));
  62. let state_filter = warp::any().map(move || Arc::clone(&state));
  63. let socket = warp::path("socket")
  64. .and(warp::path::param())
  65. .and(warp::path::end())
  66. .and(warp::ws())
  67. .and(state_filter.clone())
  68. .map(
  69. |id: String, ws: Ws, state: Arc<DashMap<String, Document>>| {
  70. let mut entry = state.entry(id).or_default();
  71. let value = entry.value_mut();
  72. value.last_accessed = Instant::now();
  73. let rustpad = Arc::clone(&value.rustpad);
  74. ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await })
  75. },
  76. );
  77. let text = warp::path("text")
  78. .and(warp::path::param())
  79. .and(warp::path::end())
  80. .and(state_filter.clone())
  81. .map(|id: String, state: Arc<DashMap<String, Document>>| {
  82. state
  83. .get(&id)
  84. .map(|value| value.rustpad.text())
  85. .unwrap_or_default()
  86. });
  87. let start_time = SystemTime::now()
  88. .duration_since(SystemTime::UNIX_EPOCH)
  89. .expect("SystemTime returned before UNIX_EPOCH")
  90. .as_secs();
  91. let stats = warp::path("stats")
  92. .and(warp::path::end())
  93. .and(state_filter.clone())
  94. .map(move |state: Arc<DashMap<String, Document>>| {
  95. let num_documents = state.len();
  96. warp::reply::json(&Stats {
  97. start_time,
  98. num_documents,
  99. })
  100. });
  101. socket.or(text).or(stats).boxed()
  102. }
  103. const HOUR: Duration = Duration::from_secs(3600);
  104. // Reclaims memory for documents.
  105. async fn cleaner(state: Arc<DashMap<String, Document>>, expiry_days: u32) {
  106. loop {
  107. time::sleep(HOUR).await;
  108. let mut keys = Vec::new();
  109. for entry in &*state {
  110. if entry.last_accessed.elapsed() > HOUR * 24 * expiry_days {
  111. keys.push(entry.key().clone());
  112. }
  113. }
  114. info!("cleaner removing keys: {:?}", keys);
  115. for key in keys {
  116. state.remove(&key);
  117. }
  118. }
  119. }