Enable state persistence through SQLite (#28)

* Add Docker build CI to pull requests

* Add SQLite document persistence through SQLx

* Update README to describe configuration variables

* Minor changes to README wording

* Update image size estimate listed in README

* Update frontend dependencies

* Add direct database tests and restructure code

* Clarify use of `SQLITE_URI` in Docker contexts
This commit is contained in:
Eric Zhang 2021-10-21 00:40:46 -04:00 committed by GitHub
parent 8f762fa085
commit c98a6d7c4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 2526 additions and 2457 deletions

View file

@ -4,6 +4,9 @@ on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
docker:
@ -26,7 +29,7 @@ jobs:
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
push: ${{ github.event_name == 'push' }}
build-args: GITHUB_SHA
tags: ekzhang/rustpad:latest

445
Cargo.lock generated
View file

@ -1,5 +1,18 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom 0.2.3",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.18"
@ -15,6 +28,15 @@ version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
[[package]]
name = "atoi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
dependencies = [
"num-traits",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -87,6 +109,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -118,6 +146,60 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"
dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10c2722795460108a7872e1cd933a85d6ec38abc4baecad51028f702da28889f"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403"
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -143,6 +225,12 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "env_logger"
version = "0.7.1"
@ -214,6 +302,17 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-intrusive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e"
dependencies = [
"futures-core",
"lock_api",
"parking_lot",
]
[[package]]
name = "futures-io"
version = "0.3.15"
@ -319,9 +418,21 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.9.1"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
dependencies = [
"hashbrown",
]
[[package]]
name = "headers"
@ -348,6 +459,15 @@ dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
@ -357,6 +477,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "0.2.4"
@ -437,9 +563,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "1.6.2"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
@ -463,6 +589,15 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "itertools"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "0.4.7"
@ -486,9 +621,20 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.95"
version = "0.2.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "789da6d93f1b866ffe175afc5322a4d76c038605a1c3319bb57b06967ca98a36"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
[[package]]
name = "libsqlite3-sys"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290b64917f8b0cb885d9de0f9959fe1f775d7fa12f1da2db9001c1c8ab60f89d"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "lock_api"
@ -536,6 +682,12 @@ dependencies = [
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c64630dcdd71f1a64c435f54885086a0de5d6a12d104d69b165fb7d5286d677"
[[package]]
name = "mio"
version = "0.7.11"
@ -576,6 +728,17 @@ dependencies = [
"twoway",
]
[[package]]
name = "nom"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffd9d26838a953b4af82cbeb9f1592c6798916983959be223a7124e992742c1"
dependencies = [
"memchr",
"minimal-lexical",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -585,6 +748,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -597,9 +769,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.7.2"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
@ -680,6 +852,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -848,6 +1026,34 @@ dependencies = [
"winapi",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rustls"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustpad-server"
version = "0.1.0"
@ -863,6 +1069,8 @@ dependencies = [
"pretty_env_logger",
"serde",
"serde_json",
"sqlx",
"tempfile",
"tokio",
"tokio-stream",
"warp",
@ -906,6 +1114,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.126"
@ -957,7 +1175,20 @@ checksum = "8c4cfa741c5832d0ef7fab46cabed29c2aae926db0b11bb2069edd8db5e64e16"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpufeatures",
"cpufeatures 0.1.4",
"digest",
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
dependencies = [
"block-buffer",
"cfg-if 1.0.0",
"cpufeatures 0.2.1",
"digest",
"opaque-debug",
]
@ -993,6 +1224,118 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "sqlformat"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4"
dependencies = [
"itertools",
"nom",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7911b0031a0247af40095838002999c7a52fba29d9739e93326e71a5a1bc9d43"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aec89bfaca8f7737439bad16d52b07f1ccd0730520d3bf6ae9d069fe4b641fb1"
dependencies = [
"ahash",
"atoi",
"bitflags",
"byteorder",
"bytes",
"crc",
"crossbeam-channel",
"crossbeam-queue",
"crossbeam-utils",
"either",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util",
"hashlink",
"hex",
"indexmap",
"itoa",
"libc",
"libsqlite3-sys",
"log",
"memchr",
"once_cell",
"parking_lot",
"percent-encoding",
"rustls",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"tokio-stream",
"url",
"webpki",
"webpki-roots",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "584866c833511b1a152e87a7ee20dee2739746f60c858b3c5209150bc4b466f5"
dependencies = [
"dotenv",
"either",
"heck",
"once_cell",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d1bd069de53442e7a320f525a6d4deb8bb0621ac7a55f7eccbc2b58b57f43d0"
dependencies = [
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "syn"
version = "1.0.72"
@ -1027,6 +1370,26 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.1.43"
@ -1083,6 +1446,17 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.6"
@ -1215,12 +1589,30 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.2"
@ -1239,6 +1631,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.3"
@ -1396,6 +1794,35 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
]
[[package]]
name = "whoami"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "483a59fee1a93fec90eb08bc2eb4315ef10f4ebc478b3a5fadc969819cb66117"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -72,9 +72,29 @@ For the WebAssembly component, you can run tests in a headless browser with
wasm-pack test rustpad-wasm --chrome --headless
```
## Configuration
Although the default behavior of Rustpad is to store documents solely in memory
and collect garbage after 24 hours of inactivity, this can be configured by
setting the appropriate variables. The application server looks for the
following environment variables on startup:
- `EXPIRY_DAYS`: An integer corresponding to the number of days that inactive
documents are kept in memory before being garbage collected by the server
(default 1 day).
- `SQLITE_URI`: A SQLite connection string used for persistence. If provided,
Rustpad will snapshot document contents to a local file, which enables them to
be retained between server restarts and after their in-memory data structures
expire. (When deploying a Docker container, this should point to the path of a
mounted volume.)
- `PORT`: Which local port to listen for HTTP connections on (defaults to 3030).
- `RUST_LOG`: Directives that control application logging, see the
[env_logger](https://docs.rs/env_logger/#enabling-logging) docs for more
information.
## Deployment
Rustpad is distributed as a single 4 MB Docker image, which is built
Rustpad is distributed as a single 5 MB Docker image, which is built
automatically from the `Dockerfile` in this repository. You can pull the latest
version of this image from Docker Hub.

4057
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -10,28 +10,28 @@
"format": "prettier --write ."
},
"dependencies": {
"@chakra-ui/react": "^1.6.7",
"@emotion/react": "^11.4.1",
"@chakra-ui/react": "^1.6.10",
"@emotion/react": "^11.5.0",
"@emotion/styled": "^11.3.0",
"@monaco-editor/react": "^4.2.2",
"@monaco-editor/react": "^4.3.1",
"framer-motion": "^4.1.17",
"lodash.debounce": "^4.0.8",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-icons": "^4.2.0",
"react-icons": "^4.3.1",
"react-scripts": "4.0.3",
"rustpad-wasm": "file:./rustpad-wasm/pkg",
"use-local-storage-state": "^11.0.0"
},
"devDependencies": {
"@types/lodash.debounce": "^4.0.6",
"@types/react": "^17.0.21",
"@types/react-dom": "^17.0.9",
"monaco-editor": "^0.27.0",
"@types/react": "^17.0.30",
"@types/react-dom": "^17.0.10",
"monaco-editor": "^0.29.1",
"prettier": "2.4.1",
"raw.macro": "^0.4.2",
"react-app-rewired": "^2.1.8",
"typescript": "^4.4.3",
"typescript": "~4.4.4",
"wasm-loader": "^1.3.0"
},
"eslintConfig": {

View file

@ -16,6 +16,10 @@ parking_lot = "0.11.1"
pretty_env_logger = "0.4.0"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
sqlx = { version = "0.5.9", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = { version = "1.6.1", features = ["full", "test-util"] }
tokio-stream = "0.1.6"
warp = "0.3.1"
[dev-dependencies]
tempfile = "3.2.0"

View file

@ -0,0 +1,5 @@
CREATE TABLE document(
id TEXT PRIMARY KEY,
text TEXT NOT NULL,
language TEXT
)

View file

@ -0,0 +1,78 @@
//! Backend SQLite database handlers for persisting documents.
use std::str::FromStr;
use anyhow::{bail, Result};
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
/// Represents a document persisted in database storage.
#[derive(sqlx::FromRow, PartialEq, Eq, Clone, Debug)]
pub struct PersistedDocument {
/// Text content of the document.
pub text: String,
/// Language of the document for editor syntax highlighting.
pub language: Option<String>,
}
/// A driver for database operations wrapping a pool connection.
#[derive(Clone, Debug)]
pub struct Database {
pool: SqlitePool,
}
impl Database {
/// Construct a new database from Postgres connection URI.
pub async fn new(uri: &str) -> Result<Self> {
{
// Create database file if missing, and run migrations.
let mut conn = SqliteConnectOptions::from_str(uri)?
.create_if_missing(true)
.connect()
.await?;
sqlx::migrate!().run(&mut conn).await?;
}
Ok(Database {
pool: SqlitePool::connect(uri).await?,
})
}
/// Load the text of a document from the database.
pub async fn load(&self, document_id: &str) -> Result<PersistedDocument> {
sqlx::query_as(r#"SELECT text, language FROM document WHERE id = $1"#)
.bind(document_id)
.fetch_one(&self.pool)
.await
.map_err(|e| e.into())
}
/// Store the text of a document in the database.
pub async fn store(&self, document_id: &str, document: &PersistedDocument) -> Result<()> {
let result = sqlx::query(
r#"
INSERT OR REPLACE INTO
document (id, text, language)
VALUES
($1, $2, $3)"#,
)
.bind(document_id)
.bind(&document.text)
.bind(&document.language)
.execute(&self.pool)
.await?;
if result.rows_affected() != 1 {
bail!(
"expected store() to receive 1 row affected, but it affected {} rows instead",
result.rows_affected(),
);
}
Ok(())
}
/// Count the number of documents in the database.
pub async fn count(&self) -> Result<usize> {
let row: (i64,) = sqlx::query_as("SELECT count(*) FROM document")
.fetch_one(&self.pool)
.await?;
Ok(row.0 as usize)
}
}

View file

@ -7,13 +7,14 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use dashmap::DashMap;
use log::info;
use log::{error, info};
use serde::Serialize;
use tokio::time::{self, Instant};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Rejection, Reply};
use rustpad::Rustpad;
use crate::{database::Database, rustpad::Rustpad};
pub mod database;
mod ot;
mod rustpad;
@ -27,15 +28,35 @@ struct Document {
rustpad: Arc<Rustpad>,
}
impl Default for Document {
fn default() -> Self {
impl Document {
fn new(rustpad: Arc<Rustpad>) -> Self {
Self {
last_accessed: Instant::now(),
rustpad: Default::default(),
rustpad,
}
}
}
impl Drop for Document {
fn drop(&mut self) {
self.rustpad.kill();
}
}
#[derive(Debug)]
struct CustomReject(anyhow::Error);
impl warp::reject::Reject for CustomReject {}
/// The shared state of the server, accessible from within request handlers.
#[derive(Clone)]
struct ServerState {
/// Concurrent map storing in-memory documents.
documents: Arc<DashMap<String, Document>>,
/// Connection to the database pool, if persistence is enabled.
database: Option<Database>,
}
/// Statistics about the server, returned from an API endpoint.
#[derive(Serialize)]
struct Stats {
@ -43,18 +64,25 @@ struct Stats {
start_time: u64,
/// Number of documents currently tracked by the server.
num_documents: usize,
/// Number of documents persisted in the database.
database_size: usize,
}
/// Server configuration.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct ServerConfig {
/// Number of days to clean up documents after inactivity.
pub expiry_days: u32,
/// Database object, for persistence if desired.
pub database: Option<Database>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self { expiry_days: 1 }
Self {
expiry_days: 1,
database: None,
}
}
}
@ -73,70 +101,126 @@ fn frontend() -> BoxedFilter<(impl Reply,)> {
/// Construct backend routes, including WebSocket handlers.
fn backend(config: ServerConfig) -> BoxedFilter<(impl Reply,)> {
let state: Arc<DashMap<String, Document>> = Default::default();
tokio::spawn(cleaner(Arc::clone(&state), config.expiry_days));
let state = ServerState {
documents: Default::default(),
database: config.database,
};
tokio::spawn(cleaner(state.clone(), config.expiry_days));
let state_filter = warp::any().map(move || Arc::clone(&state));
let state_filter = warp::any().map(move || state.clone());
let socket = warp::path("socket")
.and(warp::path::param())
.and(warp::path::end())
let socket = warp::path!("socket" / String)
.and(warp::ws())
.and(state_filter.clone())
.map(
|id: String, ws: Ws, state: Arc<DashMap<String, Document>>| {
let mut entry = state.entry(id).or_default();
let value = entry.value_mut();
value.last_accessed = Instant::now();
let rustpad = Arc::clone(&value.rustpad);
ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await })
},
);
.and_then(socket_handler);
let text = warp::path("text")
.and(warp::path::param())
.and(warp::path::end())
let text = warp::path!("text" / String)
.and(state_filter.clone())
.map(|id: String, state: Arc<DashMap<String, Document>>| {
state
.get(&id)
.map(|value| value.rustpad.text())
.unwrap_or_default()
});
.and_then(text_handler);
let start_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime returned before UNIX_EPOCH")
.as_secs();
let stats = warp::path("stats")
.and(warp::path::end())
let stats = warp::path!("stats")
.and(warp::any().map(move || start_time))
.and(state_filter.clone())
.map(move |state: Arc<DashMap<String, Document>>| {
let num_documents = state.len();
warp::reply::json(&Stats {
start_time,
num_documents,
})
});
.and_then(stats_handler);
socket.or(text).or(stats).boxed()
}
/// Handler for the `/api/socket/{id}` endpoint.
async fn socket_handler(id: String, ws: Ws, state: ServerState) -> Result<impl Reply, Rejection> {
use dashmap::mapref::entry::Entry;
let mut entry = match state.documents.entry(id.clone()) {
Entry::Occupied(e) => e.into_ref(),
Entry::Vacant(e) => {
let rustpad = Arc::new(match &state.database {
Some(db) => db.load(&id).await.map(Rustpad::from).unwrap_or_default(),
None => Rustpad::default(),
});
if let Some(db) = &state.database {
tokio::spawn(persister(id, Arc::clone(&rustpad), db.clone()));
}
e.insert(Document::new(rustpad))
}
};
let value = entry.value_mut();
value.last_accessed = Instant::now();
let rustpad = Arc::clone(&value.rustpad);
Ok(ws.on_upgrade(|socket| async move { rustpad.on_connection(socket).await }))
}
/// Handler for the `/api/text/{id}` endpoint.
async fn text_handler(id: String, state: ServerState) -> Result<impl Reply, Rejection> {
Ok(match state.documents.get(&id) {
Some(value) => value.rustpad.text(),
None => {
if let Some(db) = &state.database {
db.load(&id)
.await
.map(|document| document.text)
.unwrap_or_default()
} else {
String::new()
}
}
})
}
/// Handler for the `/api/stats` endpoint.
async fn stats_handler(start_time: u64, state: ServerState) -> Result<impl Reply, Rejection> {
let num_documents = state.documents.len();
let database_size = match state.database {
None => 0,
Some(db) => match db.count().await {
Ok(size) => size,
Err(e) => return Err(warp::reject::custom(CustomReject(e))),
},
};
Ok(warp::reply::json(&Stats {
start_time,
num_documents,
database_size,
}))
}
const HOUR: Duration = Duration::from_secs(3600);
// Reclaims memory for documents.
async fn cleaner(state: Arc<DashMap<String, Document>>, expiry_days: u32) {
/// Reclaims memory for documents.
async fn cleaner(state: ServerState, expiry_days: u32) {
loop {
time::sleep(HOUR).await;
let mut keys = Vec::new();
for entry in &*state {
for entry in &*state.documents {
if entry.last_accessed.elapsed() > HOUR * 24 * expiry_days {
keys.push(entry.key().clone());
}
}
info!("cleaner removing keys: {:?}", keys);
for key in keys {
state.remove(&key);
state.documents.remove(&key);
}
}
}
const PERSIST_INTERVAL: Duration = Duration::from_secs(3);
/// Persists changed documents after a fixed time interval.
async fn persister(id: String, rustpad: Arc<Rustpad>, db: Database) {
let mut last_revision = 0;
while !rustpad.killed() {
time::sleep(PERSIST_INTERVAL).await;
let revision = rustpad.revision();
if revision > last_revision {
info!("persisting revision {} for id = {}", revision, id);
if let Err(e) = db.store(&id, &rustpad.snapshot()).await {
error!("when persisting document {}: {}", id, e);
}
last_revision = revision;
}
}
}

View file

@ -1,4 +1,4 @@
use rustpad_server::{server, ServerConfig};
use rustpad_server::{server, database::Database, ServerConfig};
#[tokio::main]
async fn main() {
@ -15,6 +15,14 @@ async fn main() {
.unwrap_or_else(|_| String::from("1"))
.parse()
.expect("Unable to parse EXPIRY_DAYS"),
database: match std::env::var("SQLITE_URI") {
Ok(uri) => Some(
Database::new(&uri)
.await
.expect("Unable to connect to SQLITE_URI"),
),
Err(_) => None,
},
};
warp::serve(server(config)).run(([0, 0, 0, 0], port)).await;

View file

@ -1,7 +1,7 @@
//! Eventually consistent server-side logic for Rustpad.
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use anyhow::{bail, Context, Result};
use futures::prelude::*;
@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Notify};
use warp::ws::{Message, WebSocket};
use crate::ot::transform_index;
use crate::{database::PersistedDocument, ot::transform_index};
/// The main object representing a collaborative session.
pub struct Rustpad {
@ -24,6 +24,8 @@ pub struct Rustpad {
notify: Notify,
/// Used to inform all clients of metadata updates.
update: broadcast::Sender<ServerMsg>,
/// Set to true when the document is destroyed.
killed: AtomicBool,
}
/// Shared state involving multiple users, protected by a lock.
@ -103,10 +105,30 @@ impl Default for Rustpad {
count: Default::default(),
notify: Default::default(),
update: tx,
killed: AtomicBool::new(false),
}
}
}
impl From<PersistedDocument> for Rustpad {
fn from(document: PersistedDocument) -> Self {
let mut operation = OperationSeq::default();
operation.insert(&document.text);
let rustpad = Self::default();
{
let mut state = rustpad.state.write();
state.text = document.text;
state.language = document.language;
state.operations.push(UserOperation {
id: u64::MAX,
operation,
})
}
rustpad
}
}
impl Rustpad {
/// Handle a connection from a WebSocket.
pub async fn on_connection(&self, socket: WebSocket) {
@ -129,12 +151,32 @@ impl Rustpad {
state.text.clone()
}
/// Returns a snapshot of the current document for persistence.
pub fn snapshot(&self) -> PersistedDocument {
let state = self.state.read();
PersistedDocument {
text: state.text.clone(),
language: state.language.clone(),
}
}
/// Returns the current revision.
pub fn revision(&self) -> usize {
let state = self.state.read();
state.operations.len()
}
/// Kill this object immediately, dropping all current connections.
pub fn kill(&self) {
self.killed.store(true, Ordering::Relaxed);
self.notify.notify_waiters();
}
/// Returns if this Rustpad object has been killed.
pub fn killed(&self) -> bool {
self.killed.load(Ordering::Relaxed)
}
async fn handle_connection(&self, id: u64, mut socket: WebSocket) -> Result<()> {
let mut update_rx = self.update.subscribe();
@ -145,6 +187,9 @@ impl Rustpad {
// notification, **then** check the current state for new revisions.
// This is the same approach that `tokio::sync::watch` takes.
let notified = self.notify.notified();
if self.killed() {
break;
}
if self.revision() > revision {
revision = self.send_history(revision, &mut socket).await?
}

View file

@ -13,7 +13,7 @@ impl JsonSocket {
pub async fn recv(&mut self) -> Result<Value> {
let msg = self.0.recv().await?;
let msg = msg.to_str().map_err(|_| anyhow!("non-string message"))?;
Ok(serde_json::from_str(&msg)?)
Ok(serde_json::from_str(msg)?)
}
pub async fn recv_closed(&mut self) -> Result<()> {

View file

@ -0,0 +1,112 @@
//! Tests to ensure that documents are persisted with SQLite.
use std::time::Duration;
use anyhow::Result;
use common::*;
use operational_transform::OperationSeq;
use rustpad_server::{
database::{Database, PersistedDocument},
server, ServerConfig,
};
use serde_json::json;
use tempfile::NamedTempFile;
use tokio::time;
pub mod common;
fn temp_sqlite_uri() -> Result<String> {
Ok(format!(
"sqlite://{}",
NamedTempFile::new()?
.into_temp_path()
.as_os_str()
.to_str()
.expect("failed to get name of tempfile as &str")
))
}
#[tokio::test]
async fn test_database() -> Result<()> {
pretty_env_logger::try_init().ok();
let database = Database::new(&temp_sqlite_uri()?).await?;
assert!(database.load("hello").await.is_err());
assert!(database.load("world").await.is_err());
let doc1 = PersistedDocument {
text: "Hello Text".into(),
language: None,
};
assert!(database.store("hello", &doc1).await.is_ok());
assert_eq!(database.load("hello").await?, doc1);
assert!(database.load("world").await.is_err());
let doc2 = PersistedDocument {
text: "print('World Text :)')".into(),
language: Some("python".into()),
};
assert!(database.store("world", &doc2).await.is_ok());
assert_eq!(database.load("hello").await?, doc1);
assert_eq!(database.load("world").await?, doc2);
assert!(database.store("hello", &doc2).await.is_ok());
assert_eq!(database.load("hello").await?, doc2);
Ok(())
}
#[tokio::test]
async fn test_persist() -> Result<()> {
pretty_env_logger::try_init().ok();
let filter = server(ServerConfig {
expiry_days: 2,
database: Some(Database::new(&temp_sqlite_uri()?).await?),
..ServerConfig::default()
});
expect_text(&filter, "persist", "").await;
let mut client = connect(&filter, "persist").await?;
let msg = client.recv().await?;
assert_eq!(msg, json!({ "Identity": 0 }));
let mut operation = OperationSeq::default();
operation.insert("hello");
let msg = json!({
"Edit": {
"revision": 0,
"operation": operation
}
});
client.send(&msg).await;
let msg = client.recv().await?;
msg.get("History")
.expect("should receive history operation");
expect_text(&filter, "persist", "hello").await;
let hour = Duration::from_secs(3600);
time::pause();
time::advance(47 * hour).await;
expect_text(&filter, "persist", "hello").await;
// Give SQLite some time to actually update the database.
time::resume();
time::sleep(Duration::from_millis(50)).await;
time::pause();
time::advance(3 * hour).await;
expect_text(&filter, "persist", "hello").await;
for _ in 0..50 {
time::advance(10000 * hour).await;
expect_text(&filter, "persist", "hello").await;
}
Ok(())
}

View file

@ -53,13 +53,13 @@ async fn test_lost_wakeups() -> Result<()> {
let mut total = 0;
while total < num_edits {
let msg = client.recv().await?;
total += num_ops(&msg).ok_or(anyhow!("missing json key"))?;
total += num_ops(&msg).ok_or_else(|| anyhow!("missing json key"))?;
}
let mut total2 = 0;
while total2 < num_edits {
let msg = client2.recv().await?;
total2 += num_ops(&msg).ok_or(anyhow!("missing json key"))?;
total2 += num_ops(&msg).ok_or_else(|| anyhow!("missing json key"))?;
}
info!("took {} ms", start.elapsed().as_millis());