Explorar o código

combine sketches across workers for more precise bloom filters

Mikkel Denker hai 8 meses
pai
achega
037ec2cc9d

+ 16 - 0
crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs

@@ -28,6 +28,7 @@ use crate::ampc::{Coordinator, DefaultDhtTable, DhtConn};
 use crate::config::ShortestPathCoordinatorConfig;
 use crate::config::ShortestPathCoordinatorConfig;
 use crate::distributed::cluster::Cluster;
 use crate::distributed::cluster::Cluster;
 use crate::distributed::member::{Member, Service, ShardId};
 use crate::distributed::member::{Member, Service, ShardId};
+use crate::hyperloglog::HyperLogLog;
 use crate::webpage::url_ext::UrlExt;
 use crate::webpage::url_ext::UrlExt;
 use crate::{webgraph, Result};
 use crate::{webgraph, Result};
 
 
@@ -174,6 +175,21 @@ pub fn run(config: ShortestPathCoordinatorConfig) -> Result<()> {
         .build()?
         .build()?
         .block_on(setup_gossip(tokio_conf))?;
         .block_on(setup_gossip(tokio_conf))?;
 
 
+    let sketch = cluster
+        .workers
+        .iter()
+        .map(|worker| worker.get_node_sketch())
+        .fold(HyperLogLog::default(), |mut acc, sketch| {
+            acc.merge(&sketch);
+            acc
+        });
+
+    let num_nodes = sketch.size() as u64;
+
+    for worker in cluster.workers.iter() {
+        worker.update_changed_nodes_precision(num_nodes);
+    }
+
     let jobs: Vec<_> = cluster
     let jobs: Vec<_> = cluster
         .workers
         .workers
         .iter()
         .iter()

+ 21 - 1
crates/core/src/entrypoint/ampc/shortest_path/worker.rs

@@ -75,6 +75,11 @@ impl ShortestPathWorker {
     pub fn nodes_sketch(&self) -> &HyperLogLog<4096> {
     pub fn nodes_sketch(&self) -> &HyperLogLog<4096> {
         &self.nodes_sketch
         &self.nodes_sketch
     }
     }
+
+    pub fn update_changed_nodes_precision(&self, num_nodes: u64) {
+        let mut changed_nodes = self.changed_nodes().lock().unwrap();
+        *changed_nodes = U64BloomFilter::new(num_nodes, 0.01);
+    }
 }
 }
 
 
 #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
 #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
@@ -88,6 +93,17 @@ impl Message<ShortestPathWorker> for GetNodeSketch {
     }
     }
 }
 }
 
 
+#[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
+pub struct UpdateChangedNodesPrecision(u64);
+
+impl Message<ShortestPathWorker> for UpdateChangedNodesPrecision {
+    type Response = ();
+
+    fn handle(self, worker: &ShortestPathWorker) -> Self::Response {
+        worker.update_changed_nodes_precision(self.0);
+    }
+}
+
 #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
 #[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
 pub struct BatchId2Node(Vec<webgraph::NodeID>);
 pub struct BatchId2Node(Vec<webgraph::NodeID>);
 
 
@@ -109,7 +125,7 @@ impl Message<ShortestPathWorker> for BatchId2Node {
     }
     }
 }
 }
 
 
-impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch]);
+impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch, UpdateChangedNodesPrecision]);
 
 
 #[derive(Clone)]
 #[derive(Clone)]
 pub struct RemoteShortestPathWorker {
 pub struct RemoteShortestPathWorker {
@@ -143,6 +159,10 @@ impl RemoteShortestPathWorker {
     pub fn get_node_sketch(&self) -> HyperLogLog<4096> {
     pub fn get_node_sketch(&self) -> HyperLogLog<4096> {
         self.send(GetNodeSketch)
         self.send(GetNodeSketch)
     }
     }
+
+    pub fn update_changed_nodes_precision(&self, num_nodes: u64) {
+        self.send(UpdateChangedNodesPrecision(num_nodes));
+    }
 }
 }
 
 
 impl RemoteWorker for RemoteShortestPathWorker {
 impl RemoteWorker for RemoteShortestPathWorker {