Implement proper transformation for cursors

This commit is contained in:
Eric Zhang 2021-06-05 00:29:25 -05:00
parent 211e567275
commit 64d0b632ac
7 changed files with 114 additions and 6 deletions

View file

@ -11,6 +11,7 @@ use rustpad::Rustpad;
use tokio::time::{self, Instant};
use warp::{filters::BoxedFilter, ws::Ws, Filter, Reply};
mod ot;
mod rustpad;
/// An entry stored in the global server map.

23
rustpad-server/src/ot.rs Normal file
View file

@ -0,0 +1,23 @@
//! Helper methods for working with operational transformation.
use operational_transform::{Operation, OperationSeq};
/// Return the new index of a position in the string.
pub fn transform_index(operation: &OperationSeq, position: u32) -> u32 {
let mut index = position as i32;
let mut new_index = index;
for op in operation.ops() {
match op {
&Operation::Retain(n) => index -= n as i32,
Operation::Insert(s) => new_index += s.len() as i32,
&Operation::Delete(n) => {
new_index -= std::cmp::min(index, n as i32);
index -= n as i32;
}
}
if index < 0 {
break;
}
}
new_index as u32
}

View file

@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Notify};
use warp::ws::{Message, WebSocket};
use crate::ot::transform_index;
/// The main object representing a collaborative session.
pub struct Rustpad {
/// State modified by critical sections of the code.
@ -136,8 +138,7 @@ impl Rustpad {
async fn handle_connection(&self, id: u64, mut socket: WebSocket) -> Result<()> {
let mut update_rx = self.update.subscribe();
self.send_initial(id, &mut socket).await?;
let mut revision: usize = 0;
let mut revision: usize = self.send_initial(id, &mut socket).await?;
loop {
// In order to avoid the "lost wakeup" problem, we first request a
@ -167,11 +168,17 @@ impl Rustpad {
Ok(())
}
async fn send_initial(&self, id: u64, socket: &mut WebSocket) -> Result<()> {
async fn send_initial(&self, id: u64, socket: &mut WebSocket) -> Result<usize> {
socket.send(ServerMsg::Identity(id).into()).await?;
let mut messages = Vec::new();
{
let revision = {
let state = self.state.read();
if !state.operations.is_empty() {
messages.push(ServerMsg::History {
start: 0,
operations: state.operations.clone(),
});
}
if let Some(language) = &state.language {
messages.push(ServerMsg::Language(language.clone()));
}
@ -187,11 +194,12 @@ impl Rustpad {
data: data.clone(),
});
}
state.operations.len()
};
for msg in messages {
socket.send(msg.into()).await?;
}
Ok(())
Ok(revision)
}
async fn send_history(&self, start: usize, socket: &mut WebSocket) -> Result<usize> {
@ -271,6 +279,15 @@ impl Rustpad {
}
let new_text = operation.apply(&state.text)?;
let mut state = RwLockUpgradableReadGuard::upgrade(state);
for (_, data) in state.cursors.iter_mut() {
for cursor in data.cursors.iter_mut() {
*cursor = transform_index(&operation, *cursor);
}
for (start, end) in data.selections.iter_mut() {
*start = transform_index(&operation, *start);
*end = transform_index(&operation, *end);
}
}
state.operations.push(UserOperation { id, operation });
state.text = new_text;
Ok(())

View file

@ -155,9 +155,28 @@ async fn test_cursors() -> Result<()> {
client.send(&json!({ "Invalid": "please close" })).await;
client.recv_closed().await?;
let msg = json!({
"Edit": {
"revision": 0,
"operation": ["a"]
}
});
client2.send(&msg).await;
let mut client3 = connect(&filter, "foobar").await?;
assert_eq!(client3.recv().await?, json!({ "Identity": 2 }));
assert_eq!(client3.recv().await?, cursors2_resp);
client3.recv().await?;
let transformed_cursors2_resp = json!({
"UserCursor": {
"id": 1,
"data": {
"cursors": [11],
"selections": []
}
}
});
assert_eq!(client3.recv().await?, transformed_cursors2_resp);
Ok(())
}

View file

@ -133,6 +133,27 @@ impl OpSeq {
self.0.target_len()
}
/// Return the new index of a position in the string.
pub fn transform_index(&self, position: u32) -> u32 {
let mut index = position as i32;
let mut new_index = index;
for op in self.0.ops() {
use operational_transform::Operation::*;
match op {
&Retain(n) => index -= n as i32,
Insert(s) => new_index += s.len() as i32,
&Delete(n) => {
new_index -= std::cmp::min(index, n as i32);
index -= n as i32;
}
}
if index < 0 {
break;
}
}
new_index as u32
}
/// Attempts to deserialize an `OpSeq` from a JSON string.
pub fn from_str(s: &str) -> Option<OpSeq> {
serde_json::from_str(s).ok()

View file

@ -50,3 +50,16 @@ fn invert_operations() {
let p = o.invert(s);
assert_eq!(p.apply(&o.apply(s).unwrap()).unwrap(), s);
}
#[wasm_bindgen_test]
fn transform_index() {
let mut o = OpSeq::default();
o.retain(3);
o.insert("def");
o.retain(3);
o.insert("abc");
assert_eq!(o.transform_index(2), 2);
assert_eq!(o.transform_index(3), 6);
assert_eq!(o.transform_index(5), 8);
assert_eq!(o.transform_index(7), 13);
}

View file

@ -230,6 +230,7 @@ class Rustpad {
} else {
this.buffer = this.buffer.compose(operation);
}
this.transformCursors(operation);
}
private sendOperation(operation: OpSeq) {
@ -306,6 +307,19 @@ class Rustpad {
this.lastValue = this.model.getValue();
this.ignoreChanges = false;
this.transformCursors(operation);
}
private transformCursors(operation: OpSeq) {
for (const data of Object.values(this.userCursors)) {
data.cursors = data.cursors.map((c) => operation.transform_index(c));
data.selections = data.selections.map(([s, e]) => [
operation.transform_index(s),
operation.transform_index(e),
]);
}
this.updateCursors();
}
private updateCursors() {