Browse Source

Implement a basic chat application

Eric Zhang 4 years ago
parent
commit
4c73d191cf
7 changed files with 349 additions and 19 deletions
  1. 2 0
      .env
  2. 2 0
      .gitignore
  3. 150 0
      Cargo.lock
  4. 8 2
      Cargo.toml
  5. 51 12
      app/App.tsx
  6. 13 5
      src/main.rs
  7. 123 0
      src/server.rs

+ 2 - 0
.env

@@ -0,0 +1,2 @@
+# Environment variables for development
+RUST_LOG=info

+ 2 - 0
.gitignore

@@ -5,3 +5,5 @@ node_modules
 dist
 dist-ssr
 *.local
+
+.vscode

+ 150 - 0
Cargo.lock

@@ -1,5 +1,25 @@
 # This file is automatically @generated by Cargo.
 # It is not intended for manual editing.
+[[package]]
+name = "aho-corasick"
+version = "0.7.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "atty"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "winapi",
+]
+
 [[package]]
 name = "autocfg"
 version = "1.0.1"
@@ -85,6 +105,19 @@ version = "0.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
 
+[[package]]
+name = "env_logger"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
+dependencies = [
+ "atty",
+ "humantime",
+ "log",
+ "regex",
+ "termcolor",
+]
+
 [[package]]
 name = "fnv"
 version = "1.0.7"
@@ -109,6 +142,7 @@ checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27"
 dependencies = [
  "futures-channel",
  "futures-core",
+ "futures-executor",
  "futures-io",
  "futures-sink",
  "futures-task",
@@ -131,12 +165,36 @@ version = "0.3.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1"
 
+[[package]]
+name = "futures-executor"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
 [[package]]
 name = "futures-io"
 version = "0.3.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1"
 
+[[package]]
+name = "futures-macro"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121"
+dependencies = [
+ "autocfg",
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "futures-sink"
 version = "0.3.15"
@@ -156,11 +214,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967"
 dependencies = [
  "autocfg",
+ "futures-channel",
  "futures-core",
+ "futures-io",
+ "futures-macro",
  "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
+ "proc-macro-hack",
+ "proc-macro-nested",
  "slab",
 ]
 
@@ -289,6 +353,15 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"
 
+[[package]]
+name = "humantime"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
+dependencies = [
+ "quick-error",
+]
+
 [[package]]
 name = "hyper"
 version = "0.14.8"
@@ -565,6 +638,28 @@ version = "0.2.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
 
+[[package]]
+name = "pretty_env_logger"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d"
+dependencies = [
+ "env_logger",
+ "log",
+]
+
+[[package]]
+name = "proc-macro-hack"
+version = "0.5.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
+
+[[package]]
+name = "proc-macro-nested"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.27"
@@ -679,6 +774,23 @@ dependencies = [
  "bitflags",
 ]
 
+[[package]]
+name = "regex"
+version = "1.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.6.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
+
 [[package]]
 name = "remove_dir_all"
 version = "0.5.3"
@@ -693,8 +805,14 @@ name = "rustpad"
 version = "0.1.0"
 dependencies = [
  "dotenv",
+ "futures",
+ "log",
  "operational-transform",
+ "pretty_env_logger",
+ "serde",
+ "serde_json",
  "tokio",
+ "tokio-stream",
  "warp",
 ]
 
@@ -727,6 +845,20 @@ name = "serde"
 version = "1.0.126"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
+dependencies = [
+ "serde_derive",
+]
+
+[[package]]
+name = "serde_derive"
+version = "1.0.126"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
 
 [[package]]
 name = "serde_json"
@@ -820,6 +952,15 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "termcolor"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
+dependencies = [
+ "winapi-util",
+]
+
 [[package]]
 name = "time"
 version = "0.1.43"
@@ -1105,6 +1246,15 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
 
+[[package]]
+name = "winapi-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
+dependencies = [
+ "winapi",
+]
+
 [[package]]
 name = "winapi-x86_64-pc-windows-gnu"
 version = "0.4.0"

+ 8 - 2
Cargo.toml

@@ -7,7 +7,13 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+dotenv = "0.15.0"
+futures = "0.3.15"
+log = "0.4.14"
 operational-transform = "0.6.0"
-warp = "0.3.1"
+pretty_env_logger = "0.4.0"
+serde = { version = "1.0.126", features = ["derive"] }
+serde_json = "1.0.64"
 tokio = { version = "1.6.1", features = ["full"] }
-dotenv = "0.15.0"
+tokio-stream = "0.1.6"
+warp = "0.3.1"

+ 51 - 12
app/App.tsx

@@ -1,22 +1,61 @@
-import { useState } from "react";
+import { useEffect, useState } from "react";
 
 function App() {
-  const [count, setCount] = useState(0);
+  const [input, setInput] = useState("");
+  const [socket, setSocket] = useState<WebSocket>();
+  const [messages, setMessages] = useState<[number, string][]>([]);
+
+  useEffect(() => {
+    const uri =
+      (location.origin.startsWith("https") ? "wss://" : "ws://") +
+      location.host +
+      "/api/socket";
+    const ws = new WebSocket(uri);
+    console.log("connecting...");
+    ws.onopen = () => {
+      console.log("connected!");
+      setSocket(ws);
+    };
+    ws.onmessage = ({ data }) => {
+      console.log("message:", data);
+      setMessages((messages) => [...messages, ...JSON.parse(data)]);
+    };
+    ws.onclose = () => {
+      console.log("disconnected!");
+      setSocket(undefined);
+    };
+    return () => ws.close();
+  }, []);
 
   return (
     <div className="container">
       <div className="row">
         <div className="one-half column" style={{ marginTop: "25%" }}>
-          <h4>Skeleton + React + Vite</h4>
-          <p>
-            This index.html page is a placeholder with the CSS, font and
-            favicon. It's just waiting for you to add some content! If you need
-            some help hit up the{" "}
-            <a href="http://www.getskeleton.com">Skeleton documentation</a>.
-          </p>
-          <button onClick={() => setCount((count) => count + 1)}>
-            count is: {count}
-          </button>
+          <h4>Chat Application</h4>
+          <p>Let's send some messages!</p>
+          <ul>
+            {messages.map(([sender, message], key) => (
+              <li key={key}>
+                <strong>User #{sender}:</strong> {message}
+              </li>
+            ))}
+          </ul>
+          <form
+            onSubmit={(event) => {
+              event.preventDefault();
+              socket?.send(input);
+              setInput("");
+            }}
+          >
+            <input
+              className="u-full-width"
+              required
+              placeholder="Hello!"
+              value={input}
+              onChange={(event) => setInput(event.target.value)}
+            />
+            <input className="button-primary" type="submit" />
+          </form>
         </div>
       </div>
     </div>

+ 13 - 5
src/main.rs

@@ -1,17 +1,25 @@
+//! Server backend for the Rustpad collaborative text editor
+
+#![forbid(unsafe_code)]
+#![warn(missing_docs)]
+
 use warp::{filters::BoxedFilter, Filter, Reply};
 
+mod server;
+
+/// Construct routes for static files from React
 fn frontend() -> BoxedFilter<(impl Reply,)> {
     warp::fs::dir("dist")
         .or(warp::get().and(warp::fs::file("dist/index.html")))
         .boxed()
 }
 
+/// Construct backend routes, including WebSocket handlers
 fn backend() -> BoxedFilter<(impl Reply,)> {
-    warp::path!("hello" / String)
-        .map(|name| format!("Hello, {}!", name))
-        .boxed()
+    server::routes()
 }
 
+/// A combined filter handling all server routes
 fn server() -> BoxedFilter<(impl Reply,)> {
     warp::path("api").and(backend()).or(frontend()).boxed()
 }
@@ -19,12 +27,12 @@ fn server() -> BoxedFilter<(impl Reply,)> {
 #[tokio::main]
 async fn main() {
     dotenv::dotenv().ok();
+    pretty_env_logger::init();
 
     let port = std::env::var("PORT")
-        .unwrap_or("3030".to_string())
+        .unwrap_or_else(|_| String::from("3030"))
         .parse()
         .expect("Unable to parse PORT");
 
-    println!("Server listening on http://localhost:{}", port);
     warp::serve(server()).run(([0, 0, 0, 0], port)).await;
 }

+ 123 - 0
src/server.rs

@@ -0,0 +1,123 @@
+//! Server routes for Rustpad
+
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::prelude::*;
+use log::{error, info};
+use tokio::{
+    sync::{Notify, RwLock},
+    time,
+};
+use warp::{
+    filters::BoxedFilter,
+    ws::{Message, WebSocket, Ws},
+    Filter, Reply,
+};
+
+/// Construct a set of routes for the server
+pub fn routes() -> BoxedFilter<(impl Reply,)> {
+    let rustpad = Arc::new(Rustpad::default());
+    let rustpad = warp::any().map(move || Arc::clone(&rustpad));
+
+    let socket = warp::path("socket")
+        .and(warp::path::end())
+        .and(warp::ws())
+        .and(rustpad)
+        .map(|ws: Ws, rustpad: Arc<Rustpad>| {
+            ws.on_upgrade(move |socket| async move { rustpad.on_connection(socket).await })
+        });
+
+    socket.boxed()
+}
+
+/// The main object for a collaborative session
+#[derive(Default)]
+struct Rustpad {
+    state: RwLock<State>,
+    count: AtomicU64,
+    notify: Notify,
+}
+
+/// Shared state involving multiple users, protected by a lock
+#[derive(Default)]
+struct State {
+    messages: Vec<(u64, String)>,
+}
+
+impl Rustpad {
+    async fn on_connection(&self, mut socket: WebSocket) {
+        let id = self.count.fetch_add(1, Ordering::Relaxed);
+        info!("connection! id = {}", id);
+
+        let mut revision: usize = 0;
+
+        loop {
+            if self.num_messages().await > revision {
+                match self.send_messages(revision, &mut socket).await {
+                    Ok(new_revision) => revision = new_revision,
+                    Err(e) => {
+                        error!("websocket error: {}", e);
+                        break;
+                    }
+                }
+            }
+
+            let sleep = time::sleep(Duration::from_millis(500));
+            tokio::pin!(sleep);
+            tokio::select! {
+                _ = &mut sleep => {}
+                _ = self.notify.notified() => {}
+                result = socket.next() => {
+                    match result {
+                        None => break,
+                        Some(Ok(message)) => {
+                            self.handle_message(id, message).await
+                        }
+                        Some(Err(e)) => {
+                            error!("websocket error: {}", e);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        info!("disconnection, id = {}", id);
+    }
+
+    async fn num_messages(&self) -> usize {
+        let state = self.state.read().await;
+        state.messages.len()
+    }
+
+    async fn send_messages(
+        &self,
+        revision: usize,
+        socket: &mut WebSocket,
+    ) -> Result<usize, warp::Error> {
+        let state = self.state.read().await;
+        let len = state.messages.len();
+        if revision < len {
+            let messages = serde_json::to_string(&state.messages[revision..])
+                .expect("serde serialization failed for messages");
+            drop(state);
+            socket.send(Message::text(&messages)).await?;
+            Ok(len)
+        } else {
+            Ok(revision)
+        }
+    }
+
+    async fn handle_message(&self, id: u64, message: Message) {
+        let text = match message.to_str() {
+            Ok(text) => String::from(text),
+            Err(()) => return, // Ignore non-text messages
+        };
+
+        let mut state = self.state.write().await;
+        state.messages.push((id, text));
+        self.notify.notify_waiters();
+    }
+}