rustpad.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. //! Asynchronous systems logic for Rustpad
  2. use std::sync::atomic::{AtomicU64, Ordering};
  3. use std::time::Duration;
  4. use futures::prelude::*;
  5. use log::{error, info};
  6. use operational_transform::OperationSeq;
  7. use parking_lot::RwLock;
  8. use serde::{Deserialize, Serialize};
  9. use tokio::{sync::Notify, time};
  10. use warp::ws::{Message, WebSocket};
  11. /// The main object for a collaborative session
  12. #[derive(Default)]
  13. pub struct Rustpad {
  14. state: RwLock<State>,
  15. count: AtomicU64,
  16. notify: Notify,
  17. }
  18. /// Shared state involving multiple users, protected by a lock
  19. #[derive(Default)]
  20. struct State {
  21. messages: Vec<(u64, String)>,
  22. }
  23. /// A message received from the client over WebSocket
  24. #[derive(Clone, Debug, Serialize, Deserialize)]
  25. enum ClientMsg {
  26. Edit { revision: usize, op: OperationSeq },
  27. }
  28. /// A message sent to the client over WebSocket
  29. #[derive(Clone, Debug, Serialize, Deserialize)]
  30. enum ServerMsg {
  31. History {
  32. revision: usize,
  33. ops: Vec<OperationSeq>,
  34. },
  35. }
  36. impl Rustpad {
  37. /// Construct a new, empty Rustpad object
  38. pub fn new() -> Self {
  39. Default::default()
  40. }
  41. /// Handle a connection from a WebSocket
  42. pub async fn on_connection(&self, mut socket: WebSocket) {
  43. let id = self.count.fetch_add(1, Ordering::Relaxed);
  44. info!("connection! id = {}", id);
  45. let mut revision: usize = 0;
  46. loop {
  47. if self.num_messages() > revision {
  48. match self.send_messages(revision, &mut socket).await {
  49. Ok(new_revision) => revision = new_revision,
  50. Err(e) => {
  51. error!("websocket error: {}", e);
  52. break;
  53. }
  54. }
  55. }
  56. let sleep = time::sleep(Duration::from_millis(500));
  57. tokio::pin!(sleep);
  58. tokio::select! {
  59. _ = &mut sleep => {}
  60. _ = self.notify.notified() => {}
  61. result = socket.next() => {
  62. match result {
  63. None => break,
  64. Some(Ok(message)) => {
  65. self.handle_message(id, message).await
  66. }
  67. Some(Err(e)) => {
  68. error!("websocket error: {}", e);
  69. break;
  70. }
  71. }
  72. }
  73. }
  74. }
  75. info!("disconnection, id = {}", id);
  76. }
  77. fn num_messages(&self) -> usize {
  78. let state = self.state.read();
  79. state.messages.len()
  80. }
  81. async fn send_messages(
  82. &self,
  83. revision: usize,
  84. socket: &mut WebSocket,
  85. ) -> Result<usize, warp::Error> {
  86. let messages = {
  87. let state = self.state.read();
  88. let len = state.messages.len();
  89. if revision < len {
  90. state.messages[revision..].to_owned()
  91. } else {
  92. Vec::new()
  93. }
  94. };
  95. if !messages.is_empty() {
  96. let serialized = serde_json::to_string(&messages)
  97. .expect("serde serialization failed for messages vec");
  98. socket.send(Message::text(&serialized)).await?;
  99. }
  100. Ok(revision + messages.len())
  101. }
  102. async fn handle_message(&self, id: u64, message: Message) {
  103. let text = match message.to_str() {
  104. Ok(text) => String::from(text),
  105. Err(()) => return, // Ignore non-text messages
  106. };
  107. let mut state = self.state.write();
  108. state.messages.push((id, text));
  109. self.notify.notify_waiters();
  110. }
  111. }