lib.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. //! Server backend for the Rustpad collaborative text editor.
  2. #![forbid(unsafe_code)]
  3. #![warn(missing_docs)]
  4. use std::sync::Arc;
  5. use std::time::{Duration, SystemTime};
  6. use dashmap::DashMap;
  7. use log::info;
  8. use rustpad::Rustpad;
  9. use serde::Serialize;
  10. use tokio::time::{self, Instant};
  11. use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
  12. mod ot;
  13. mod rustpad;
  14. /// An entry stored in the global server map.
  15. ///
  16. /// Each entry corresponds to a single document. This is garbage collected by a
  17. /// background task after one day of inactivity, to avoid server memory usage
  18. /// growing without bound.
  19. struct Document {
  20. last_accessed: Instant,
  21. rustpad: Arc<Rustpad>,
  22. }
  23. impl Default for Document {
  24. fn default() -> Self {
  25. Self {
  26. last_accessed: Instant::now(),
  27. rustpad: Default::default(),
  28. }
  29. }
  30. }
  31. /// Statistics about the server, returned from an API endpoint.
  32. #[derive(Serialize)]
  33. struct Stats {
  34. /// System time when the server started, in seconds since Unix epoch.
  35. start_time: u64,
  36. /// Number of documents currently tracked by the server.
  37. num_documents: usize,
  38. }
  39. /// Server configuration.
  40. #[derive(Debug)]
  41. pub struct ServerConfig {
  42. /// Number of days to clean up documents after inactivity.
  43. pub expiry_days: u32,
  44. }
  45. impl Default for ServerConfig {
  46. fn default() -> Self {
  47. Self { expiry_days: 1 }
  48. }
  49. }
  50. /// A combined filter handling all server routes.
  51. pub fn server(config: ServerConfig) -> BoxedFilter<(impl Reply,)> {
  52. warp::path("api")
  53. .and(backend(config))
  54. .or(frontend())
  55. .boxed()
  56. }
  57. /// Construct routes for static files from React.
  58. fn frontend() -> BoxedFilter<(impl Reply,)> {
  59. warp::fs::dir("build").boxed()
  60. }
  61. /// Construct backend routes, including WebSocket handlers.
  62. fn backend(config: ServerConfig) -> BoxedFilter<(impl Reply,)> {
  63. let state: Arc<DashMap<String, Document>> = Default::default();
  64. tokio::spawn(cleaner(Arc::clone(&state), config.expiry_days));
  65. let state_filter = warp::any().map(move || Arc::clone(&state));
  66. let socket = warp::path("socket")
  67. .and(warp::path::param())
  68. .and(warp::path::end())
  69. .and(warp::ws())
  70. .and(state_filter.clone())
  71. .map(
  72. |id: String, ws: Ws, state: Arc<DashMap<String, Document>>| {
  73. let mut entry = state.entry(id).or_default();
  74. let value = entry.value_mut();
  75. value.last_accessed = Instant::now();
  76. let rustpad = Arc::clone(&value.rustpad);
  77. ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await })
  78. },
  79. );
  80. let text = warp::path("text")
  81. .and(warp::path::param())
  82. .and(warp::path::end())
  83. .and(state_filter.clone())
  84. .map(|id: String, state: Arc<DashMap<String, Document>>| {
  85. state
  86. .get(&id)
  87. .map(|value| value.rustpad.text())
  88. .unwrap_or_default()
  89. });
  90. let start_time = SystemTime::now()
  91. .duration_since(SystemTime::UNIX_EPOCH)
  92. .expect("SystemTime returned before UNIX_EPOCH")
  93. .as_secs();
  94. let stats = warp::path("stats")
  95. .and(warp::path::end())
  96. .and(state_filter.clone())
  97. .map(move |state: Arc<DashMap<String, Document>>| {
  98. let num_documents = state.len();
  99. warp::reply::json(&Stats {
  100. start_time,
  101. num_documents,
  102. })
  103. });
  104. socket.or(text).or(stats).boxed()
  105. }
  106. const HOUR: Duration = Duration::from_secs(3600);
  107. // Reclaims memory for documents.
  108. async fn cleaner(state: Arc<DashMap<String, Document>>, expiry_days: u32) {
  109. loop {
  110. time::sleep(HOUR).await;
  111. let mut keys = Vec::new();
  112. for entry in &*state {
  113. if entry.last_accessed.elapsed() > HOUR * 24 * expiry_days {
  114. keys.push(entry.key().clone());
  115. }
  116. }
  117. info!("cleaner removing keys: {:?}", keys);
  118. for key in keys {
  119. state.remove(&key);
  120. }
  121. }
  122. }