Add SQLite document persistence through SQLx

This commit is contained in:
Eric Zhang 2021-10-19 23:35:51 -04:00
parent e00307ef03
commit 57b2e0ffd2
8 changed files with 785 additions and 57 deletions

445
Cargo.lock generated
View file

@ -1,5 +1,18 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom 0.2.3",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.18"
@ -15,6 +28,15 @@ version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
[[package]]
name = "atoi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
dependencies = [
"num-traits",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -87,6 +109,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -118,6 +146,60 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"
dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10c2722795460108a7872e1cd933a85d6ec38abc4baecad51028f702da28889f"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403"
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -143,6 +225,12 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "env_logger"
version = "0.7.1"
@ -214,6 +302,17 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-intrusive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e"
dependencies = [
"futures-core",
"lock_api",
"parking_lot",
]
[[package]]
name = "futures-io"
version = "0.3.15"
@ -319,9 +418,21 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.9.1"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
dependencies = [
"hashbrown",
]
[[package]]
name = "headers"
@ -348,6 +459,15 @@ dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
@ -357,6 +477,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "0.2.4"
@ -437,9 +563,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "1.6.2"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
@ -463,6 +589,15 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "itertools"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "0.4.7"
@ -486,9 +621,20 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.95"
version = "0.2.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "789da6d93f1b866ffe175afc5322a4d76c038605a1c3319bb57b06967ca98a36"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
[[package]]
name = "libsqlite3-sys"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "lock_api"
@ -536,6 +682,12 @@ dependencies = [
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c64630dcdd71f1a64c435f54885086a0de5d6a12d104d69b165fb7d5286d677"
[[package]]
name = "mio"
version = "0.7.11"
@ -576,6 +728,17 @@ dependencies = [
"twoway",
]
[[package]]
name = "nom"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffd9d26838a953b4af82cbeb9f1592c6798916983959be223a7124e992742c1"
dependencies = [
"memchr",
"minimal-lexical",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -585,6 +748,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -597,9 +769,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.7.2"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
@ -680,6 +852,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -848,6 +1026,34 @@ dependencies = [
"winapi",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rustls"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustpad-server"
version = "0.1.0"
@ -863,6 +1069,8 @@ dependencies = [
"pretty_env_logger",
"serde",
"serde_json",
"sqlx",
"tempfile",
"tokio",
"tokio-stream",
"warp",
@ -906,6 +1114,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.126"
@ -957,7 +1175,20 @@ checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpufeatures",
"cpufeatures 0.1.4",
"digest",
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpufeatures 0.2.1",
"digest",
"opaque-debug",
]
@ -993,6 +1224,118 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlformat"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4"
dependencies = [
"itertools",
"nom",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7911b0031a0247af40095838002999c7a52fba29d9739e93326e71a5a1bc9d43"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aec89bfaca8f7737439bad16d52b07f1ccd0730520d3bf6ae9d069fe4b641fb1"
dependencies = [
"ahash",
"atoi",
"bitflags",
"byteorder",
"bytes",
"crc",
"crossbeam-channel",
"crossbeam-queue",
"crossbeam-utils",
"either",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util",
"hashlink",
"hex",
"indexmap",
"itoa",
"libc",
"libsqlite3-sys",
"log",
"memchr",
"once_cell",
"parking_lot",
"percent-encoding",
"rustls",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"tokio-stream",
"url",
"webpki",
"webpki-roots",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "584866c833511b1a152e87a7ee20dee2739746f60c858b3c5209150bc4b466f5"
dependencies = [
"dotenv",
"either",
"heck",
"once_cell",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d1bd069de53442e7a320f525a6d4deb8bb0621ac7a55f7eccbc2b58b57f43d0"
dependencies = [
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "syn"
version = "1.0.72"
@ -1027,6 +1370,26 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.1.43"
@ -1083,6 +1446,17 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.6"
@ -1215,12 +1589,30 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.2"
@ -1239,6 +1631,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.3"
@ -1396,6 +1794,35 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
]
[[package]]
name = "whoami"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "483a59fee1a93fec90eb08bc2eb4315ef10f4ebc478b3a5fadc969819cb66117"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -16,6 +16,10 @@ parking_lot = "0.11.1"
pretty_env_logger = "0.4.0"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
sqlx = { version = "0.5.9", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = { version = "1.6.1", features = ["full", "test-util"] }
tokio-stream = "0.1.6"
warp = "0.3.1"
[dev-dependencies]
tempfile = "3.2.0"

View file

@ -0,0 +1,5 @@
CREATE TABLE document(
id TEXT PRIMARY KEY,
text TEXT NOT NULL,
language TEXT
)

View file

@ -0,0 +1,78 @@
//! Backend SQLite database handlers for persisting documents.
use std::str::FromStr;
use anyhow::{bail, Result};
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
/// Represents a document persisted in database storage.
#[derive(sqlx::FromRow)]
pub struct PersistedDocument {
/// Text content of the document.
pub text: String,
/// Language of the document for editor syntax highlighting.
pub language: Option<String>,
}
/// A driver for database operations wrapping a pool connection.
#[derive(Clone, Debug)]
pub struct Database {
pool: SqlitePool,
}
impl Database {
/// Construct a new database from Postgres connection URI.
pub async fn new(uri: &str) -> Result<Self> {
{
// Create database file if missing, and run migrations.
let mut conn = SqliteConnectOptions::from_str(uri)?
.create_if_missing(true)
.connect()
.await?;
sqlx::migrate!().run(&mut conn).await?;
}
Ok(Database {
pool: SqlitePool::connect(uri).await?,
})
}
/// Load the text of a document from the database.
pub async fn load(&self, document_id: &str) -> Result<PersistedDocument> {
sqlx::query_as(r#"SELECT text, language FROM document WHERE id = $1"#)
.bind(document_id)
.fetch_one(&self.pool)
.await
.map_err(|e| e.into())
}
/// Store the text of a document in the database.
pub async fn store(&self, document_id: &str, document: &PersistedDocument) -> Result<()> {
let result = sqlx::query(
r#"
INSERT OR REPLACE INTO
document (id, text, language)
VALUES
($1, $2, $3)"#,
)
.bind(document_id)
.bind(&document.text)
.bind(&document.language)
.execute(&self.pool)
.await?;
if result.rows_affected() != 1 {
bail!(
"expected store() to receive 1 row affected, but it affected {} rows instead",
result.rows_affected(),
);
}
Ok(())
}
/// Count the number of documents in the database.
pub async fn count(&self) -> Result<usize> {
let row: (i64,) = sqlx::query_as("SELECT count(*) FROM document")
.fetch_one(&self.pool)
.await?;
Ok(row.0 as usize)
}
}

View file

@ -7,13 +7,15 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use dashmap::DashMap;
use log::info;
use log::{error, info};
use serde::Serialize;
use tokio::time::{self, Instant};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Rejection, Reply};
use rustpad::Rustpad;
pub use crate::database::Database;
use crate::rustpad::Rustpad;
mod database;
mod ot;
mod rustpad;
@ -27,15 +29,35 @@ struct Document {
rustpad: Arc<Rustpad>,
}
impl Default for Document {
fn default() -> Self {
impl Document {
fn new(rustpad: Arc<Rustpad>) -> Self {
Self {
last_accessed: Instant::now(),
rustpad: Default::default(),
rustpad,
}
}
}
impl Drop for Document {
fn drop(&mut self) {
self.rustpad.kill();
}
}
#[derive(Debug)]
struct CustomReject(anyhow::Error);
impl warp::reject::Reject for CustomReject {}
/// The shared state of the server, accessible from within request handlers.
#[derive(Clone)]
struct ServerState {
/// Concurrent map storing in-memory documents.
documents: Arc<DashMap<String, Document>>,
/// Connection to the database pool, if persistence is enabled.
database: Option<Database>,
}
/// Statistics about the server, returned from an API endpoint.
#[derive(Serialize)]
struct Stats {
@ -43,18 +65,25 @@ struct Stats {
start_time: u64,
/// Number of documents currently tracked by the server.
num_documents: usize,
/// Number of documents persisted in the database.
database_size: usize,
}
/// Server configuration.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ServerConfig {
/// Number of days to clean up documents after inactivity.
pub expiry_days: u32,
/// Database object, for persistence if desired.
pub database: Option<Database>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self { expiry_days: 1 }
Self {
expiry_days: 1,
database: None,
}
}
}
@ -73,70 +102,126 @@ fn frontend() -> BoxedFilter<(impl Reply,)> {
/// Construct backend routes, including WebSocket handlers.
fn backend(config: ServerConfig) -> BoxedFilter<(impl Reply,)> {
let state: Arc<DashMap<String, Document>> = Default::default();
tokio::spawn(cleaner(Arc::clone(&state), config.expiry_days));
let state = ServerState {
documents: Default::default(),
database: config.database,
};
tokio::spawn(cleaner(state.clone(), config.expiry_days));
let state_filter = warp::any().map(move || Arc::clone(&state));
let state_filter = warp::any().map(move || state.clone());
let socket = warp::path("socket")
.and(warp::path::param())
.and(warp::path::end())
let socket = warp::path!("socket" / String)
.and(warp::ws())
.and(state_filter.clone())
.map(
|id: String, ws: Ws, state: Arc<DashMap<String, Document>>| {
let mut entry = state.entry(id).or_default();
let value = entry.value_mut();
value.last_accessed = Instant::now();
let rustpad = Arc::clone(&value.rustpad);
ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await })
},
);
.and_then(socket_handler);
let text = warp::path("text")
.and(warp::path::param())
.and(warp::path::end())
let text = warp::path!("text" / String)
.and(state_filter.clone())
.map(|id: String, state: Arc<DashMap<String, Document>>| {
state
.get(&id)
.map(|value| value.rustpad.text())
.unwrap_or_default()
});
.and_then(text_handler);
let start_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime returned before UNIX_EPOCH")
.as_secs();
let stats = warp::path("stats")
.and(warp::path::end())
let stats = warp::path!("stats")
.and(warp::any().map(move || start_time))
.and(state_filter.clone())
.map(move |state: Arc<DashMap<String, Document>>| {
let num_documents = state.len();
warp::reply::json(&Stats {
start_time,
num_documents,
})
});
.and_then(stats_handler);
socket.or(text).or(stats).boxed()
}
/// Handler for the `/api/socket/{id}` endpoint.
async fn socket_handler(id: String, ws: Ws, state: ServerState) -> Result<impl Reply, Rejection> {
use dashmap::mapref::entry::Entry;
let mut entry = match state.documents.entry(id.clone()) {
Entry::Occupied(e) => e.into_ref(),
Entry::Vacant(e) => {
let rustpad = Arc::new(match &state.database {
Some(db) => db.load(&id).await.map(Rustpad::from).unwrap_or_default(),
None => Rustpad::default(),
});
if let Some(db) = &state.database {
tokio::spawn(persister(id, Arc::clone(&rustpad), db.clone()));
}
e.insert(Document::new(rustpad))
}
};
let value = entry.value_mut();
value.last_accessed = Instant::now();
let rustpad = Arc::clone(&value.rustpad);
Ok(ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await }))
}
/// Handler for the `/api/text/{id}` endpoint.
async fn text_handler(id: String, state: ServerState) -> Result<impl Reply, Rejection> {
Ok(match state.documents.get(&id) {
Some(value) => value.rustpad.text(),
None => {
if let Some(db) = &state.database {
db.load(&id)
.await
.map(|document| document.text)
.unwrap_or_default()
} else {
String::new()
}
}
})
}
/// Handler for the `/api/stats` endpoint.
async fn stats_handler(start_time: u64, state: ServerState) -> Result<impl Reply, Rejection> {
let num_documents = state.documents.len();
let database_size = match state.database {
None => 0,
Some(db) => match db.count().await {
Ok(size) => size,
Err(e) => return Err(warp::reject::custom(CustomReject(e))),
},
};
Ok(warp::reply::json(&Stats {
start_time,
num_documents,
database_size,
}))
}
const HOUR: Duration = Duration::from_secs(3600);
// Reclaims memory for documents.
async fn cleaner(state: Arc<DashMap<String, Document>>, expiry_days: u32) {
/// Reclaims memory for documents.
async fn cleaner(state: ServerState, expiry_days: u32) {
loop {
time::sleep(HOUR).await;
let mut keys = Vec::new();
for entry in &*state {
for entry in &*state.documents {
if entry.last_accessed.elapsed() > HOUR * 24 * expiry_days {
keys.push(entry.key().clone());
}
}
info!("cleaner removing keys: {:?}", keys);
for key in keys {
state.remove(&key);
state.documents.remove(&key);
}
}
}
const PERSIST_INTERVAL: Duration = Duration::from_secs(3);
/// Persists changed documents after a fixed time interval.
async fn persister(id: String, rustpad: Arc<Rustpad>, db: Database) {
let mut last_revision = 0;
while !rustpad.killed() {
time::sleep(PERSIST_INTERVAL).await;
let revision = rustpad.revision();
if revision > last_revision {
info!("persisting revision {} for id = {}", revision, id);
if let Err(e) = db.store(&id, &rustpad.snapshot()).await {
error!("when persisting document {}: {}", id, e);
}
last_revision = revision;
}
}
}

View file

@ -1,4 +1,4 @@
use rustpad_server::{server, ServerConfig};
use rustpad_server::{server, Database, ServerConfig};
#[tokio::main]
async fn main() {
@ -15,6 +15,14 @@ async fn main() {
.unwrap_or_else(|_| String::from("1"))
.parse()
.expect("Unable to parse EXPIRY_DAYS"),
database: match std::env::var("SQLITE_URI") {
Ok(uri) => Some(
Database::new(&uri)
.await
.expect("Unable to connect to SQLITE_URI"),
),
Err(_) => None,
},
};
warp::serve(server(config)).run(([0, 0, 0, 0], port)).await;

View file

@ -1,7 +1,7 @@
//! Eventually consistent server-side logic for Rustpad.
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use anyhow::{bail, Context, Result};
use futures::prelude::*;
@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Notify};
use warp::ws::{Message, WebSocket};
use crate::ot::transform_index;
use crate::{database::PersistedDocument, ot::transform_index};
/// The main object representing a collaborative session.
pub struct Rustpad {
@ -24,6 +24,8 @@ pub struct Rustpad {
notify: Notify,
/// Used to inform all clients of metadata updates.
update: broadcast::Sender<ServerMsg>,
/// Set to true when the document is destroyed.
killed: AtomicBool,
}
/// Shared state involving multiple users, protected by a lock.
@ -103,10 +105,30 @@ impl Default for Rustpad {
count: Default::default(),
notify: Default::default(),
update: tx,
killed: AtomicBool::new(false),
}
}
}
impl From<PersistedDocument> for Rustpad {
fn from(document: PersistedDocument) -> Self {
let mut operation = OperationSeq::default();
operation.insert(&document.text);
let rustpad = Self::default();
{
let mut state = rustpad.state.write();
state.text = document.text;
state.language = document.language;
state.operations.push(UserOperation {
id: u64::MAX,
operation,
})
}
rustpad
}
}
impl Rustpad {
/// Handle a connection from a WebSocket.
pub async fn on_connection(&self, socket: WebSocket) {
@ -129,12 +151,32 @@ impl Rustpad {
state.text.clone()
}
/// Returns a snapshot of the current document for persistence.
pub fn snapshot(&self) -> PersistedDocument {
let state = self.state.read();
PersistedDocument {
text: state.text.clone(),
language: state.language.clone(),
}
}
/// Returns the current revision.
pub fn revision(&self) -> usize {
let state = self.state.read();
state.operations.len()
}
/// Kill this object immediately, dropping all current connections.
pub fn kill(&self) {
self.killed.store(true, Ordering::Relaxed);
self.notify.notify_waiters();
}
/// Returns if this Rustpad object has been killed.
pub fn killed(&self) -> bool {
self.killed.load(Ordering::Relaxed)
}
async fn handle_connection(&self, id: u64, mut socket: WebSocket) -> Result<()> {
let mut update_rx = self.update.subscribe();
@ -145,6 +187,9 @@ impl Rustpad {
// notification, **then** check the current state for new revisions.
// This is the same approach that `tokio::sync::watch` takes.
let notified = self.notify.notified();
if self.killed() {
break;
}
if self.revision() > revision {
revision = self.send_history(revision, &mut socket).await?
}

View file

@ -0,0 +1,76 @@
//! Tests to ensure that documents are persisted with SQLite.
use std::time::Duration;
use anyhow::Result;
use common::*;
use operational_transform::OperationSeq;
use rustpad_server::{server, Database, ServerConfig};
use serde_json::json;
use tempfile::NamedTempFile;
use tokio::time;
pub mod common;
fn temp_sqlite_uri() -> Result<String> {
Ok(format!(
"sqlite://{}",
NamedTempFile::new()?
.into_temp_path()
.as_os_str()
.to_str()
.expect("failed to get name of tempfile as &str")
))
}
#[tokio::test]
async fn test_persist() -> Result<()> {
pretty_env_logger::try_init().ok();
let filter = server(ServerConfig {
expiry_days: 2,
database: Some(Database::new(&temp_sqlite_uri()?).await?),
..ServerConfig::default()
});
expect_text(&filter, "persist", "").await;
let mut client = connect(&filter, "persist").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
let mut operation = OperationSeq::default();
operation.insert("hello");
let msg = json!({
"Edit": {
"revision": 0,
"operation": operation
}
});
client.send(&msg).await;
let msg = client.recv().await?;
msg.get("History")
.expect("should receive history operation");
expect_text(&filter, "persist", "hello").await;
let hour = Duration::from_secs(3600);
time::pause();
time::advance(47 * hour).await;
expect_text(&filter, "persist", "hello").await;
// Give SQLite some time to actually update the database.
time::resume();
time::sleep(Duration::from_millis(50)).await;
time::pause();
time::advance(3 * hour).await;
expect_text(&filter, "persist", "hello").await;
for _ in 0..50 {
time::advance(10000 * hour).await;
expect_text(&filter, "persist", "hello").await;
}
Ok(())
}