Prechádzať zdrojové kódy

Improve code docs (#246)

* document all entrypoints

* document ampc framework

* document ranking pipeline

* document the different searchers

* document generic search query flow

* document main crawler elements
Mikkel Denker 8 mesiacov pred
rodič
commit
7633b61ef3
52 zmenil súbory, kde vykonal 315 pridanie a 190 odobranie
  1. 2 0
      crates/core/src/ampc/coordinator.rs
  2. 1 1
      crates/core/src/ampc/dht/mod.rs
  3. 1 0
      crates/core/src/ampc/dht/store.rs
  4. 2 0
      crates/core/src/ampc/finisher.rs
  5. 1 0
      crates/core/src/ampc/mapper.rs
  6. 27 0
      crates/core/src/ampc/mod.rs
  7. 12 0
      crates/core/src/ampc/setup.rs
  8. 2 0
      crates/core/src/ampc/worker.rs
  9. 11 2
      crates/core/src/crawler/mod.rs
  10. 3 0
      crates/core/src/crawler/planner.rs
  11. 1 0
      crates/core/src/crawler/robot_client.rs
  12. 0 0
      crates/core/src/crawler/wander_prioritiser.rs
  13. 2 2
      crates/core/src/crawler/warc_writer.rs
  14. 5 4
      crates/core/src/crawler/worker.rs
  15. 6 5
      crates/core/src/entrypoint/configure.rs
  16. 1 1
      crates/core/src/entrypoint/search_server.rs
  17. 1 1
      crates/core/src/generic_query/get_homepage.rs
  18. 1 1
      crates/core/src/generic_query/get_site_urls.rs
  19. 1 1
      crates/core/src/generic_query/get_webpage.rs
  20. 23 1
      crates/core/src/generic_query/mod.rs
  21. 1 1
      crates/core/src/generic_query/size.rs
  22. 1 1
      crates/core/src/generic_query/top_key_phrases.rs
  23. 8 0
      crates/core/src/index.rs
  24. 3 1
      crates/core/src/inverted_index/search.rs
  25. 1 1
      crates/core/src/live_index/crawler/crawlable_site.rs
  26. 24 36
      crates/core/src/main.rs
  27. 13 0
      crates/core/src/ranking/computer/mod.rs
  28. 6 0
      crates/core/src/ranking/mod.rs
  29. 4 1
      crates/core/src/ranking/models/cross_encoder.rs
  30. 4 1
      crates/core/src/ranking/models/lambdamart.rs
  31. 1 1
      crates/core/src/ranking/models/linear.rs
  32. 4 4
      crates/core/src/ranking/pipeline/mod.rs
  33. 22 2
      crates/core/src/ranking/pipeline/modifiers/mod.rs
  34. 2 2
      crates/core/src/ranking/pipeline/scorers/lambdamart.rs
  35. 22 4
      crates/core/src/ranking/pipeline/scorers/mod.rs
  36. 1 1
      crates/core/src/ranking/pipeline/scorers/reranker.rs
  37. 5 0
      crates/core/src/ranking/pipeline/stages/precision.rs
  38. 4 0
      crates/core/src/ranking/pipeline/stages/recall.rs
  39. 7 7
      crates/core/src/searcher/distributed.rs
  40. 0 33
      crates/core/src/searcher/local/guard.rs
  41. 21 23
      crates/core/src/searcher/local/inner.rs
  42. 12 34
      crates/core/src/searcher/local/mod.rs
  43. 6 0
      crates/core/src/searcher/mod.rs
  44. 5 5
      crates/core/src/webgraph/query/backlink.rs
  45. 1 1
      crates/core/src/webgraph/query/between.rs
  46. 1 1
      crates/core/src/webgraph/query/degree.rs
  47. 4 4
      crates/core/src/webgraph/query/forwardlink.rs
  48. 2 2
      crates/core/src/webgraph/query/group_by.rs
  49. 1 1
      crates/core/src/webgraph/query/id2node.rs
  50. 21 1
      crates/core/src/webgraph/query/mod.rs
  51. 2 2
      crates/core/src/webgraph/remote.rs
  52. 3 1
      crates/core/src/webgraph/store.rs

+ 2 - 0
crates/core/src/ampc/coordinator.rs

@@ -24,6 +24,8 @@ use super::{DhtConn, Finisher, Job, JobScheduled, RemoteWorker, Setup, Worker, W
 use crate::{distributed::retry_strategy::ExponentialBackoff, Result};
 use anyhow::anyhow;
 
+/// A coordinator is responsible for scheduling jobs on workers and coordinating
+/// between rounds of computation.
 pub struct Coordinator<J>
 where
     J: Job,

+ 1 - 1
crates/core/src/ampc/dht/mod.rs

@@ -21,7 +21,7 @@
 //! with multiple shards. Each shard cluster
 //! is a Raft cluster, and each key is then routed to the correct
 //! cluster based on hash(key) % number_of_shards. The keys
-//! are currently *not* rebalanced if the number of shards change, so
+//! are currently *not* re-balanced if the number of shards change, so
 //! if an entire shard becomes unavailable or a new shard is added, all
 //! keys in the entire DHT is essentially lost as the
 //! keys might hash incorrectly.

+ 1 - 0
crates/core/src/ampc/dht/store.rs

@@ -13,6 +13,7 @@
 //
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>
+
 use std::collections::BTreeMap;
 use std::fmt::Debug;
 use std::io::Cursor;

+ 2 - 0
crates/core/src/ampc/finisher.rs

@@ -16,6 +16,8 @@
 
 use super::prelude::Job;
 
+/// A finisher is responsible for determining if the computation is finished
+/// or if another round of computation is needed.
 pub trait Finisher {
     type Job: Job;
 

+ 1 - 0
crates/core/src/ampc/mapper.rs

@@ -16,6 +16,7 @@
 
 use super::{prelude::Job, DhtConn};
 
+/// A mapper is the specific computation to be run on the graph.
 pub trait Mapper: bincode::Encode + bincode::Decode + Send + Sync + Clone {
     type Job: Job<Mapper = Self>;
 

+ 27 - 0
crates/core/src/ampc/mod.rs

@@ -14,6 +14,33 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! # Framework for Adaptive Massively Parallel Computation (AMPC).
+//!
+//! AMPC is a system for implementing large-scale distributed graph algorithms efficiently.
+//! It provides a framework for parallel computation across clusters of machines.
+//!
+//! While similar in concept to MapReduce, AMPC uses a distributed hash table (DHT) as its
+//! underlying data structure rather than the traditional map and reduce phases. This key
+//! architectural difference enables more flexible and efficient computation patterns.
+//!
+//! The main advantage over MapReduce is that workers can dynamically access any keys in
+//! the DHT during computation. This is in contrast to MapReduce where the keyspace must
+//! be statically partitioned between reducers before computation begins. The dynamic
+//! access pattern allows for more natural expression of graph algorithms in a distributed
+//! setting.
+//!
+//! This is roughly inspired by
+//! [Massively Parallel Graph Computation: From Theory to Practice](https://research.google/blog/massively-parallel-graph-computation-from-theory-to-practice/)
+//!
+//! ## Key concepts
+//!
+//! * **DHT**: A distributed hash table is used to store the result of the computation for
+//!     each round.
+//! * **Worker**: A worker owns a subset of the overall graph and is responsible for
+//!     executing mappers on its portion of the graph and sending results to the DHT.
+//! * **Mapper**: A mapper is the specific computation to be run on the graph.
+//! * **Coordinator**: The coordinator is responsible for scheduling the jobs on the workers.
+
 use self::{job::Job, worker::WorkerRef};
 use crate::distributed::sonic;
 

+ 12 - 0
crates/core/src/ampc/setup.rs

@@ -16,12 +16,24 @@
 
 use super::DhtConn;
 
+/// A setup is responsible for initializing the DHT before each round of computation.
 pub trait Setup {
     type DhtTables;
 
+    /// Setup initial state of the DHT.
     fn init_dht(&self) -> DhtConn<Self::DhtTables>;
+
+    /// Setup state for a new round.
+    ///
+    /// This is called once for each round of computation.
+    /// The first round will run `setup_first_round` first
+    /// but will still call `setup_round` after that.
     #[allow(unused_variables)] // reason = "dht might be used by implementors"
     fn setup_round(&self, dht: &Self::DhtTables) {}
+
+    /// Setup state for the first round.
+    ///
+    /// This is called once before the first round of computation.
     fn setup_first_round(&self, dht: &Self::DhtTables) {
         self.setup_round(dht);
     }

+ 2 - 0
crates/core/src/ampc/worker.rs

@@ -23,6 +23,8 @@ use crate::Result;
 use anyhow::anyhow;
 use tokio::net::ToSocketAddrs;
 
+/// A worker is responsible for executing a mapper on its portion of the graph and
+/// sending results to the DHT.
 pub trait Worker: Send + Sync {
     type Remote: RemoteWorker<Job = Self::Job>;
 

+ 11 - 2
crates/core/src/crawler/mod.rs

@@ -14,6 +14,15 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! # Crawler
+//!
+//! The crawler is responsible for fetching webpages and storing them in WARC files
+//! for later processing.
+//!
+//! Before starting a crawl, a plan needs to be created. This plan is then used by
+//! the crawler coordinator to assign sites to crawl to different workers.
+//! A site is only assigned to one worker at a time for politeness.
+
 use std::{collections::VecDeque, future::Future, net::SocketAddr, sync::Arc};
 
 type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
@@ -35,7 +44,7 @@ pub use router::Router;
 mod file_queue;
 pub mod planner;
 pub mod robot_client;
-mod wander_prirotiser;
+mod wander_prioritiser;
 mod warc_writer;
 mod worker;
 
@@ -304,7 +313,7 @@ impl Crawler {
     }
 }
 
-pub trait DatumStream: Send + Sync {
+pub trait DatumSink: Send + Sync {
     fn write(&self, crawl_datum: CrawlDatum) -> impl Future<Output = Result<()>> + Send;
     fn finish(&self) -> impl Future<Output = Result<()>> + Send;
 }

+ 3 - 0
crates/core/src/crawler/planner.rs

@@ -13,6 +13,7 @@
 //
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
 use anyhow::{anyhow, Result};
 use futures::stream::FuturesOrdered;
 use futures::StreamExt;
@@ -71,6 +72,7 @@ impl From<StoredUrl> for Url {
     }
 }
 
+/// Store urls in groups on disk based on their harmonic rank.
 struct UrlGrouper {
     groups: Vec<speedy_kv::Db<StoredUrl, ()>>,
     folder: std::path::PathBuf,
@@ -169,6 +171,7 @@ struct Budget {
     remaining_schedulable: u64,
 }
 
+/// Create a crawl plan based on the harmonic rank of the hosts.
 pub struct CrawlPlanner {
     host_centrality: Arc<speedy_kv::Db<NodeID, f64>>,
     host_centrality_rank: Arc<speedy_kv::Db<NodeID, u64>>,

+ 1 - 0
crates/core/src/crawler/robot_client.rs

@@ -45,6 +45,7 @@ pub(super) fn reqwest_client(config: &CrawlerConfig) -> Result<reqwest::Client>
         .map_err(|e| Error::from(anyhow!(e)))
 }
 
+/// Reqwest client that respects robots.txt for each request.
 #[derive(Clone)]
 pub struct RobotClient {
     robots_txt_manager: RobotsTxtManager,

+ 0 - 0
crates/core/src/crawler/wander_prirotiser.rs → crates/core/src/crawler/wander_prioritiser.rs


+ 2 - 2
crates/core/src/crawler/warc_writer.rs

@@ -21,7 +21,7 @@ use crate::{
     warc,
 };
 
-use super::{CrawlDatum, DatumStream, Error, Result};
+use super::{CrawlDatum, DatumSink, Error, Result};
 use anyhow::anyhow;
 
 /// The WarcWriter is responsible for storing the crawl datums
@@ -30,7 +30,7 @@ pub struct WarcWriter {
     tx: tokio::sync::mpsc::Sender<WarcWriterMessage>,
 }
 
-impl DatumStream for WarcWriter {
+impl DatumSink for WarcWriter {
     async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> {
         self.tx
             .send(WarcWriterMessage::Crawl(crawl_datum))

+ 5 - 4
crates/core/src/crawler/worker.rs

@@ -39,8 +39,8 @@ use crate::{
 };
 
 use super::{
-    encoded_body, robot_client::RobotClient, wander_prirotiser::WanderPrioritiser, CrawlDatum,
-    DatumStream, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
+    encoded_body, robot_client::RobotClient, wander_prioritiser::WanderPrioritiser, CrawlDatum,
+    DatumSink, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
     MAX_CONTENT_LENGTH, MAX_OUTGOING_URLS_PER_PAGE,
 };
 
@@ -126,7 +126,8 @@ impl WorkerThread {
     }
 }
 
-pub struct JobExecutor<S: DatumStream> {
+/// JobExecutor receives a job from the coordinator and crawls the urls in the job.
+pub struct JobExecutor<S: DatumSink> {
     writer: Arc<S>,
     client: RobotClient,
     has_gotten_429_response: bool,
@@ -144,7 +145,7 @@ pub struct JobExecutor<S: DatumStream> {
     job: WorkerJob,
 }
 
-impl<S: DatumStream> JobExecutor<S> {
+impl<S: DatumSink> JobExecutor<S> {
     pub fn new(
         job: WorkerJob,
         config: Arc<CrawlerConfig>,

+ 6 - 5
crates/core/src/entrypoint/configure.rs

@@ -17,7 +17,7 @@
 use tokio::fs::File;
 use tokio::io;
 use tokio_stream::StreamExt;
-use tracing::{debug, info};
+use tracing::info;
 
 use crate::config::{
     defaults, IndexerConfig, IndexerDualEncoderConfig, IndexerGraphConfig, LocalConfig,
@@ -73,7 +73,7 @@ fn download_files() {
 }
 
 fn build_spellchecker() -> Result<()> {
-    debug!("Building spellchecker");
+    info!("Building spellchecker");
     let spellchecker_path = Path::new(DATA_PATH).join("web_spell");
 
     if !spellchecker_path.exists() {
@@ -97,7 +97,7 @@ fn build_spellchecker() -> Result<()> {
 }
 
 fn create_webgraph() -> Result<()> {
-    debug!("Creating webgraph");
+    info!("Creating webgraph");
     let out_path = Path::new(DATA_PATH).join("webgraph");
 
     if out_path.exists() {
@@ -128,7 +128,7 @@ fn create_webgraph() -> Result<()> {
 }
 
 fn calculate_centrality() {
-    debug!("Calculating centrality");
+    info!("Calculating centrality");
     let webgraph_path = Path::new(DATA_PATH).join("webgraph");
     let out_path = Path::new(DATA_PATH).join("centrality");
 
@@ -144,7 +144,7 @@ fn calculate_centrality() {
 }
 
 fn create_inverted_index() -> Result<()> {
-    debug!("Creating inverted index");
+    info!("Creating inverted index");
     let out_path = Path::new(DATA_PATH).join("index");
 
     if out_path.exists() {
@@ -209,6 +209,7 @@ fn create_inverted_index() -> Result<()> {
 }
 
 fn create_entity_index() -> Result<()> {
+    info!("Creating entity index");
     let out_path = Path::new(DATA_PATH).join("entity");
     if out_path.exists() {
         std::fs::remove_dir_all(&out_path)?;

+ 1 - 1
crates/core/src/entrypoint/search_server.rs

@@ -137,7 +137,7 @@ impl_search!([
 ]);
 
 pub struct SearchService {
-    local_searcher: LocalSearcher<Arc<RwLock<Index>>>,
+    local_searcher: LocalSearcher,
     // dropping the handle leaves the cluster
     #[allow(unused)]
     cluster_handle: Arc<Cluster>,

+ 1 - 1
crates/core/src/generic_query/get_homepage.rs

@@ -72,7 +72,7 @@ impl GenericQuery for GetHomepageQuery {
         FirstDocCollector::with_shard_id(ctx.shard_id)
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         FirstDocCollector::without_shard_id()
     }
 

+ 1 - 1
crates/core/src/generic_query/get_site_urls.rs

@@ -65,7 +65,7 @@ impl GenericQuery for GetSiteUrlsQuery {
             .disable_offset()
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         Self::Collector::new()
             .with_limit(self.limit as usize)
             .with_offset(self.offset.unwrap_or(0) as usize)

+ 1 - 1
crates/core/src/generic_query/get_webpage.rs

@@ -68,7 +68,7 @@ impl GenericQuery for GetWebpageQuery {
         FirstDocCollector::with_shard_id(ctx.shard_id)
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         FirstDocCollector::without_shard_id()
     }
 

+ 23 - 1
crates/core/src/generic_query/mod.rs

@@ -14,6 +14,26 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! # Main flow
+//! ```md
+//! `coordinator`  <------>  `searcher`
+//! -----------------------------------
+//! send query to searcher
+//!                        search index
+//!                        collect fruits
+//!                        send fruits to coordinator
+//! merge fruits
+//! filter fruits
+//!     for each shard
+//! send fruits to searchers
+//!                        construct intermediate output
+//!                           from fruits
+//!                        send intermediate output to coordinator
+//! merge intermediate outputs
+//! return final output
+//! ---------------------------------------------------
+//! ```
+
 use crate::{inverted_index::ShardId, search_ctx, Result};
 
 pub mod top_key_phrases;
@@ -34,6 +54,8 @@ pub use get_site_urls::GetSiteUrlsQuery;
 pub mod collector;
 pub use collector::Collector;
 
+/// A generic query that can be executed on a searcher
+/// against an index.
 pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone {
     type Collector: Collector;
     type TantivyQuery: tantivy::query::Query;
@@ -42,7 +64,7 @@ pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone
 
     fn tantivy_query(&self, ctx: &search_ctx::Ctx) -> Self::TantivyQuery;
     fn collector(&self, ctx: &search_ctx::Ctx) -> Self::Collector;
-    fn remote_collector(&self) -> Self::Collector;
+    fn coordinator_collector(&self) -> Self::Collector;
 
     fn filter_fruit_shards(
         &self,

+ 1 - 1
crates/core/src/generic_query/size.rs

@@ -70,7 +70,7 @@ impl GenericQuery for SizeQuery {
         SizeCollector::new().with_shard_id(ctx.shard_id)
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         SizeCollector::new()
     }
 

+ 1 - 1
crates/core/src/generic_query/top_key_phrases.rs

@@ -47,7 +47,7 @@ impl GenericQuery for TopKeyPhrasesQuery {
         TopKeyPhrasesCollector::new(self.top_n).with_shard_id(ctx.shard_id)
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopKeyPhrasesCollector::new(self.top_n)
     }
 

+ 8 - 0
crates/core/src/index.rs

@@ -51,6 +51,14 @@ impl Index {
         })
     }
 
+    pub fn inverted_index(&self) -> &InvertedIndex {
+        &self.inverted_index
+    }
+
+    pub fn region_count(&self) -> &Mutex<RegionCount> {
+        &self.region_count
+    }
+
     pub fn path(&self) -> PathBuf {
         PathBuf::from(&self.path)
     }

+ 3 - 1
crates/core/src/inverted_index/search.rs

@@ -341,7 +341,9 @@ impl InvertedIndex {
             From<<Q::Collector as generic_query::Collector>::Fruit>,
     {
         let fruit = self.search_initial_generic(query)?;
-        let mut fruit = query.remote_collector().merge_fruits(vec![fruit.into()])?;
+        let mut fruit = query
+            .coordinator_collector()
+            .merge_fruits(vec![fruit.into()])?;
 
         if let Some(shard_id) = self.shard_id {
             fruit = query.filter_fruit_shards(shard_id, fruit);

+ 1 - 1
crates/core/src/live_index/crawler/crawlable_site.rs

@@ -136,7 +136,7 @@ impl CrawlableSite {
     }
 }
 
-impl crawler::DatumStream for tokio::sync::Mutex<Vec<crawler::CrawlDatum>> {
+impl crawler::DatumSink for tokio::sync::Mutex<Vec<crawler::CrawlDatum>> {
     async fn write(&self, crawl_datum: crawler::CrawlDatum) -> Result<(), crawler::Error> {
         self.lock().await.push(crawl_datum);
         Ok(())

+ 24 - 36
crates/core/src/main.rs

@@ -66,20 +66,14 @@ enum Commands {
     },
 
     /// Deploy the search server.
-    SearchServer {
-        config_path: String,
-    },
+    SearchServer { config_path: String },
 
     /// Deploy the entity search server.
-    EntitySearchServer {
-        config_path: String,
-    },
+    EntitySearchServer { config_path: String },
 
     /// Deploy the json http api. The api interacts with
     /// the search servers, webgraph servers etc. to provide the necesarry functionality.
-    Api {
-        config_path: String,
-    },
+    Api { config_path: String },
 
     /// Deploy the crawler.
     Crawler {
@@ -102,23 +96,19 @@ enum Commands {
         ml: bool,
     },
 
-    // Commands for the live index.
+    /// Commands for the live index.
     LiveIndex {
         #[clap(subcommand)]
         options: LiveIndex,
     },
 
-    // Build spell correction model.
-    WebSpell {
-        config_path: String,
-    },
+    /// Build spell correction model.
+    WebSpell { config_path: String },
 
-    // Compute statistics for sites.
-    SiteStats {
-        config_path: String,
-    },
+    /// Compute statistics for sites.
+    SiteStats { config_path: String },
 
-    // Commands to compute distributed graph algorithms.
+    /// Commands to compute distributed graph algorithms.
     Ampc {
         #[clap(subcommand)]
         options: AmpcOptions,
@@ -155,20 +145,22 @@ enum AmpcOptions {
 
 #[derive(Subcommand)]
 enum AdminOptions {
-    Init {
-        host: SocketAddr,
-    },
+    /// Create the admin config file. Run this before any other admin commands so the client knows where to connect.
+    Init { host: SocketAddr },
+
+    /// Print the reachable cluster members and which service they are running.
     Status,
-    TopKeyphrases {
-        top: usize,
-    },
 
+    /// Export the top most common phrases in the index.
+    TopKeyphrases { top: usize },
+
+    /// Get statistics about the index.
     #[clap(subcommand)]
-    Index(AdminIndexOptions),
+    IndexStats(AdminIndexStatsOptions),
 }
 
 #[derive(Subcommand)]
-enum AdminIndexOptions {
+enum AdminIndexStatsOptions {
     /// Get the size of the index
     Size,
 }
@@ -251,9 +243,7 @@ enum WebgraphOptions {
 #[derive(Subcommand)]
 enum IndexingOptions {
     /// Create the search index.
-    Search {
-        config_path: String,
-    },
+    Search { config_path: String },
 
     /// Merge multiple search indexes into a single index.
     MergeSearch {
@@ -267,10 +257,8 @@ enum IndexingOptions {
         output_path: String,
     },
 
-    // Create an index of canonical urls.
-    Canonical {
-        config_path: String,
-    },
+    /// Create an index of canonical urls.
+    Canonical { config_path: String },
 }
 
 fn load_toml_config<T: DeserializeOwned, P: AsRef<Path>>(path: P) -> T {
@@ -511,8 +499,8 @@ fn main() -> Result<()> {
                     .block_on(entrypoint::admin::top_keyphrases(top))?;
             }
 
-            AdminOptions::Index(index_options) => match index_options {
-                AdminIndexOptions::Size => {
+            AdminOptions::IndexStats(index_options) => match index_options {
+                AdminIndexStatsOptions::Size => {
                     tokio::runtime::Builder::new_current_thread()
                         .enable_all()
                         .build()?

+ 13 - 0
crates/core/src/ranking/computer/mod.rs

@@ -14,6 +14,19 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! The ranking computer is responsible for computing the core ranking signals for
+//! each potential page in the result set. This module handles the initial ranking phase
+//! that runs independently on each search node in the distributed search cluster.
+//!
+//! The computer evaluates a set of core ranking signals for each candidate page,
+//! including text-based relevance scores like BM25 and authority scores (harmonic centrality).
+//! These signals are combined using a linear model to produce an initial ranking score.
+//! The top pages are passed to the coordinator node for the final ranking phase.
+//!
+//! The core signals computed here are designed to be fast to calculate while still
+//! providing strong relevance signals. More expensive ranking features are deferred
+//! to the final ranking phase on the coordinator.
+
 use crate::query::optic::AsSearchableRule;
 use crate::query::{Query, MAX_TERMS_FOR_NGRAM_LOOKUPS};
 use crate::ranking::bm25f::MultiBm25FWeight;

+ 6 - 0
crates/core/src/ranking/mod.rs

@@ -14,6 +14,12 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! The ranking module is responsible for ranking pages based on their relevance to a query.
+//!
+//! The core ranking signals are computed by the `computer` module, which runs independently
+//! on each search shard in the search cluster. Increasingly complex stages
+//! run in the ranking pipeline on the coordinator node to produce the final ranking.
+
 pub mod bitvec_similarity;
 pub mod bm25;
 pub mod bm25f;

+ 4 - 1
crates/core/src/ranking/models/cross_encoder.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -29,6 +29,9 @@ use crate::models::bert::BertModel;
 
 const TRUNCATE_INPUT: usize = 128;
 
+/// A cross-encoder model for ranking pages.
+///
+/// Takes a query and a page body as input and returns a score for the page.
 pub struct CrossEncoderModel {
     tokenizer: tokenizers::Tokenizer,
     encoder: BertModel,

+ 4 - 1
crates/core/src/ranking/models/lambdamart.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -240,6 +240,9 @@ impl Header {
     }
 }
 
+/// A LambdaMART model for ranking pages.
+///
+/// Designed for efficient inference of lightgbm compatible models.
 pub struct LambdaMART {
     trees: Vec<Tree>,
 }

+ 1 - 1
crates/core/src/ranking/models/linear.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as

+ 4 - 4
crates/core/src/ranking/pipeline/mod.rs

@@ -68,10 +68,10 @@ impl<T> StageOrModifier<T>
 where
     T: RankableWebpage + Send + Sync,
 {
-    fn top_n(&self) -> Top {
+    fn top(&self) -> Top {
         match self {
-            StageOrModifier::Stage(stage) => stage.top_n(),
-            StageOrModifier::Modifier(modifier) => modifier.top_n(),
+            StageOrModifier::Stage(stage) => stage.top(),
+            StageOrModifier::Modifier(modifier) => modifier.top(),
         }
     }
 
@@ -139,7 +139,7 @@ where
         let coefficients = query.signal_coefficients();
 
         for stage_or_modifier in self.stages_or_modifiers.iter() {
-            let webpages = if let Top::Limit(top_n) = stage_or_modifier.top_n() {
+            let webpages = if let Top::Limit(top_n) = stage_or_modifier.top() {
                 if query.offset() > top_n {
                     continue;
                 }

+ 22 - 2
crates/core/src/ranking/pipeline/modifiers/mod.rs

@@ -14,28 +14,48 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! Modifiers are used to modify the ranking of pages.
+//!
+//! Each page is ranked by a linear combination of the signals like
+//! `score = boost * (signal_1 * weight_1 + signal_2 * weight_2 + ...)`
+//!
+//! Modifiers can either modify the multiplicative boost factor for
+//! each page or override the ranking entirely (if we want to rank
+//! for something other than the score).
+
 mod inbound_similarity;
 
 use super::{RankableWebpage, Top};
 pub use inbound_similarity::InboundSimilarity;
 
+/// A modifier that gives full control over the ranking.
 pub trait FullModifier: Send + Sync {
     type Webpage: RankableWebpage;
+    /// Modify the boost factor for each page.
     fn update_boosts(&self, webpages: &mut [Self::Webpage]);
 
+    /// Override ranking of the pages.
     fn rank(&self, webpages: &mut [Self::Webpage]) {
         webpages.sort_by(|a, b| b.score().partial_cmp(&a.score()).unwrap());
     }
 
-    fn top_n(&self) -> Top {
+    /// The number of pages to return from this part of the pipeline.
+    fn top(&self) -> Top {
         Top::Unlimited
     }
 }
 
+/// A modifier that modifies the multiplicative boost factor for each page.
+///
+/// This is the most common type of modifier.
 pub trait Modifier: Send + Sync {
     type Webpage: RankableWebpage;
+    /// Modify the boost factor for a page.
+    ///
+    /// The new boost factor will be multiplied with the page's current boost factor.
     fn boost(&self, webpage: &Self::Webpage) -> f64;
 
+    /// The number of pages to return from this part of the pipeline.
     fn top(&self) -> Top {
         Top::Unlimited
     }
@@ -54,7 +74,7 @@ where
         }
     }
 
-    fn top_n(&self) -> Top {
+    fn top(&self) -> Top {
         Modifier::top(self)
     }
 }

+ 2 - 2
crates/core/src/ranking/pipeline/scorers/lambdamart.rs

@@ -36,7 +36,7 @@ impl RankingStage for Arc<models::LambdaMART> {
         )
     }
 
-    fn top_n(&self) -> Top {
+    fn top(&self) -> Top {
         Top::Limit(20)
     }
 }
@@ -59,7 +59,7 @@ impl RankingStage for PrecisionLambda {
         )
     }
 
-    fn top_n(&self) -> Top {
+    fn top(&self) -> Top {
         Top::Limit(20)
     }
 }

+ 22 - 4
crates/core/src/ranking/pipeline/scorers/mod.rs

@@ -14,6 +14,10 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! Scorers are used to compute the ranking signals in the ranking pipeline.
+//!
+//! Each scorer computes a single signal which is then used to rank the pages.
+
 pub mod embedding;
 pub mod inbound_similarity;
 pub mod lambdamart;
@@ -26,14 +30,23 @@ use crate::ranking::{SignalCalculation, SignalCoefficients, SignalEnum};
 
 use super::{RankableWebpage, Top};
 
+/// A ranking stage that computes some signals for each page.
+///
+/// This trait is implemented for all scorers.
+/// Most of the time you will want to implement the [`RankingStage`] trait instead,
+/// but this trait gives you more control over the ranking pipeline.
 pub trait FullRankingStage: Send + Sync {
     type Webpage: RankableWebpage;
 
+    /// Compute the signal for each page.
     fn compute(&self, webpages: &mut [Self::Webpage]);
-    fn top_n(&self) -> Top {
+
+    /// The number of pages to return from this part of the pipeline.
+    fn top(&self) -> Top {
         Top::Unlimited
     }
 
+    /// Update the score for each page.
     fn update_scores(&self, webpages: &mut [Self::Webpage], coefficients: &SignalCoefficients) {
         for webpage in webpages.iter_mut() {
             webpage.set_raw_score(webpage.signals().iter().fold(0.0, |acc, (signal, calc)| {
@@ -42,16 +55,21 @@ pub trait FullRankingStage: Send + Sync {
         }
     }
 
+    /// Rank the pages by their score.
     fn rank(&self, webpages: &mut [Self::Webpage]) {
         webpages.sort_by(|a, b| b.score().partial_cmp(&a.score()).unwrap());
     }
 }
 
+/// A ranking stage that computes a single signal for each page.
 pub trait RankingStage: Send + Sync {
     type Webpage: RankableWebpage;
 
+    /// Compute the signal for a single page.
     fn compute(&self, webpage: &Self::Webpage) -> (SignalEnum, SignalCalculation);
-    fn top_n(&self) -> Top {
+
+    /// The number of pages to return from this part of the pipeline.
+    fn top(&self) -> Top {
         Top::Unlimited
     }
 }
@@ -69,7 +87,7 @@ where
         }
     }
 
-    fn top_n(&self) -> Top {
-        self.top_n()
+    fn top(&self) -> Top {
+        self.top()
     }
 }

+ 1 - 1
crates/core/src/ranking/pipeline/scorers/reranker.rs

@@ -68,7 +68,7 @@ impl<M: CrossEncoder> FullRankingStage for ReRanker<M> {
         self.crossencoder_score_webpages(webpages);
     }
 
-    fn top_n(&self) -> Top {
+    fn top(&self) -> Top {
         Top::Limit(20)
     }
 }

+ 5 - 0
crates/core/src/ranking/pipeline/stages/precision.rs

@@ -14,6 +14,11 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! The precision stage of the ranking pipeline.
+//!
+//! This stage focusses on refining the first page of results
+//! from the recall stage.
+
 use std::sync::Arc;
 
 use crate::{

+ 4 - 0
crates/core/src/ranking/pipeline/stages/recall.rs

@@ -14,6 +14,10 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! The recall stage of the ranking pipeline.
+//!
+//! This stage focusses on getting the best pages into the precision stage.
+
 use std::sync::Arc;
 
 use crate::{

+ 7 - 7
crates/core/src/searcher/distributed.rs

@@ -31,7 +31,6 @@ use crate::{
         entity_search_server, live_index::LiveIndexService, search_server::{self, RetrieveReq, SearchService}
     },
     generic_query::{self, Collector},
-    index::Index,
     inverted_index::{RetrievedWebpage, ShardId, WebpagePointer},
     ranking::pipeline::{PrecisionRankingWebpage, RecallRankingWebpage},
     Result,
@@ -44,7 +43,7 @@ use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
 use itertools::Itertools;
 use std::future::Future;
 use thiserror::Error;
-use tokio::sync::{Mutex, RwLock};
+use tokio::sync::Mutex;
 
 use super::{InitialWebsiteResult, LocalSearcher, SearchQuery};
 
@@ -284,6 +283,7 @@ impl ReusableClientManager for LiveIndexService {
     }
 }
 
+/// A searcher that runs the search on a remote cluster.
 pub struct DistributedSearcher {
     client: Mutex<ReusableShardedClient<SearchService>>,
 }
@@ -414,7 +414,7 @@ impl SearchClient for DistributedSearcher {
         >: From<<Q as sonic::service::Message<SearchService>>::Response>,
         <<Q::Collector as generic_query::Collector>::Child as tantivy::collector::SegmentCollector>::Fruit:
             From<<Q::Collector as generic_query::Collector>::Fruit> {
-        let collector = query.remote_collector();
+        let collector = query.coordinator_collector();
 
         let res = self.conn().await
             .send(query, &AllShardsSelector, &RandomReplicaSelector)
@@ -518,7 +518,7 @@ impl SearchClient for DistributedSearcher {
             queries
                 .iter()
                 .zip_eq(fruits.into_iter())
-                .map(|(query, shard_fruits)| query.remote_collector().merge_fruits(shard_fruits))
+                .map(|(query, shard_fruits)| query.coordinator_collector().merge_fruits(shard_fruits))
                 .collect::<Result<Vec<_>, _>>()
     }
     
@@ -584,9 +584,9 @@ impl SearchClient for DistributedSearcher {
 }
 
 /// This should only be used for testing and benchmarks.
-pub struct LocalSearchClient(LocalSearcher<Arc<RwLock<Index>>>);
-impl From<LocalSearcher<Arc<RwLock<Index>>>> for LocalSearchClient {
-    fn from(searcher: LocalSearcher<Arc<RwLock<Index>>>) -> Self {
+pub struct LocalSearchClient(LocalSearcher);
+impl From<LocalSearcher> for LocalSearchClient {
+    fn from(searcher: LocalSearcher) -> Self {
         Self(searcher)
     }
 }

+ 0 - 33
crates/core/src/searcher/local/guard.rs

@@ -1,33 +0,0 @@
-// Stract is an open source web search engine.
-// Copyright (C) 2024 Stract ApS
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program.  If not, see <https://www.gnu.org/licenses/>.
-
-use tokio::sync::OwnedRwLockReadGuard;
-
-use crate::index::Index;
-use crate::inverted_index::InvertedIndex;
-
-pub trait ReadGuard: Send + Sync {
-    fn search_index(&self) -> &Index;
-    fn inverted_index(&self) -> &InvertedIndex {
-        &self.search_index().inverted_index
-    }
-}
-
-impl ReadGuard for OwnedRwLockReadGuard<Index> {
-    fn search_index(&self) -> &Index {
-        self
-    }
-}

+ 21 - 23
crates/core/src/searcher/local/inner.rs

@@ -16,32 +16,31 @@
 
 use crate::{
     generic_query::{self, GenericQuery},
+    index::Index,
     inverted_index,
     ranking::{LocalRanker, SignalComputer},
     searcher::InitialWebsiteResult,
     Result,
 };
 use std::sync::Arc;
+use tokio::sync::{OwnedRwLockReadGuard, RwLock};
 
 use crate::{
     config::CollectorConfig, models::dual_encoder::DualEncoder, query::Query,
     ranking::models::linear::LinearRegression, search_ctx::Ctx, searcher::SearchQuery,
 };
 
-use super::{InvertedIndexResult, ReadGuard, SearchableIndex};
+use super::InvertedIndexResult;
 
-pub struct InnerLocalSearcher<I: SearchableIndex> {
-    index: I,
+pub struct InnerLocalSearcher {
+    index: Arc<RwLock<Index>>,
     linear_regression: Option<Arc<LinearRegression>>,
     dual_encoder: Option<Arc<DualEncoder>>,
     collector_config: CollectorConfig,
 }
 
-impl<I> InnerLocalSearcher<I>
-where
-    I: SearchableIndex,
-{
-    pub fn new(index: I) -> Self {
+impl InnerLocalSearcher {
+    pub fn new(index: Arc<RwLock<Index>>) -> Self {
         Self {
             index,
             linear_regression: None,
@@ -50,8 +49,8 @@ where
         }
     }
 
-    pub async fn guard(&self) -> I::ReadGuard {
-        self.index.read_guard().await
+    pub async fn guard(&self) -> OwnedRwLockReadGuard<Index> {
+        self.index.clone().read_owned().await
     }
 
     pub fn set_linear_model(&mut self, model: LinearRegression) {
@@ -66,19 +65,19 @@ where
         self.collector_config = config;
     }
 
-    fn parse_query<G: ReadGuard>(
+    fn parse_query(
         &self,
         ctx: &Ctx,
-        guard: &G,
+        guard: &OwnedRwLockReadGuard<Index>,
         query: &SearchQuery,
     ) -> Result<Query> {
         Query::parse(ctx, query, guard.inverted_index())
     }
 
-    fn ranker<G: ReadGuard>(
+    fn ranker(
         &self,
         query: &Query,
-        guard: &G,
+        guard: &OwnedRwLockReadGuard<Index>,
         de_rank_similar: bool,
         computer: SignalComputer,
     ) -> Result<LocalRanker> {
@@ -99,10 +98,10 @@ where
             .with_offset(query.offset()))
     }
 
-    fn search_inverted_index<G: ReadGuard>(
+    fn search_inverted_index(
         &self,
         ctx: &Ctx,
-        guard: &G,
+        guard: &OwnedRwLockReadGuard<Index>,
         query: &SearchQuery,
         de_rank_similar: bool,
     ) -> Result<InvertedIndexResult> {
@@ -112,8 +111,7 @@ where
 
         computer.set_region_count(
             guard
-                .search_index()
-                .region_count
+                .region_count()
                 .lock()
                 .unwrap_or_else(|e| e.into_inner())
                 .clone(),
@@ -149,7 +147,7 @@ where
     pub fn search_initial(
         &self,
         query: &SearchQuery,
-        guard: &I::ReadGuard,
+        guard: &OwnedRwLockReadGuard<Index>,
         de_rank_similar: bool,
     ) -> Result<InitialWebsiteResult> {
         let query = query.clone();
@@ -168,7 +166,7 @@ where
         &self,
         websites: &[inverted_index::WebpagePointer],
         query: &str,
-        guard: &I::ReadGuard,
+        guard: &OwnedRwLockReadGuard<Index>,
     ) -> Result<Vec<inverted_index::RetrievedWebpage>> {
         let ctx = guard.inverted_index().local_search_ctx();
         let query = SearchQuery {
@@ -183,7 +181,7 @@ where
     pub fn search_initial_generic<Q: GenericQuery>(
         &self,
         query: &Q,
-        guard: &I::ReadGuard,
+        guard: &OwnedRwLockReadGuard<Index>,
     ) -> Result<<Q::Collector as generic_query::Collector>::Fruit> {
         guard.inverted_index().search_initial_generic(query)
     }
@@ -192,7 +190,7 @@ where
         &self,
         query: &Q,
         fruit: <Q::Collector as generic_query::Collector>::Fruit,
-        guard: &I::ReadGuard,
+        guard: &OwnedRwLockReadGuard<Index>,
     ) -> Result<Q::IntermediateOutput> {
         guard.inverted_index().retrieve_generic(query, fruit)
     }
@@ -200,7 +198,7 @@ where
     pub fn search_generic<Q: GenericQuery>(
         &self,
         query: Q,
-        guard: &I::ReadGuard,
+        guard: &OwnedRwLockReadGuard<Index>,
     ) -> Result<Q::Output> {
         let fruit = self.search_initial_generic(&query, guard)?;
         Ok(Q::merge_results(vec![

+ 12 - 34
crates/core/src/searcher/local/mod.rs

@@ -14,15 +14,13 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-mod guard;
-use guard::ReadGuard;
+//! The local searcher runs the search against a local index.
 
 mod inner;
 use inner::InnerLocalSearcher;
-use tokio::sync::{OwnedRwLockReadGuard, RwLock};
+use tokio::sync::RwLock;
 
 use std::collections::HashMap;
-use std::future::Future;
 use std::sync::Arc;
 
 use itertools::Itertools;
@@ -44,29 +42,12 @@ use crate::{inverted_index, Result};
 use super::WebsitesResult;
 use super::{InitialWebsiteResult, SearchQuery};
 
-pub trait SearchableIndex: Send + Sync + 'static {
-    type ReadGuard: ReadGuard;
-
-    fn read_guard(&self) -> impl Future<Output = Self::ReadGuard>;
-}
-
-impl SearchableIndex for Arc<RwLock<Index>> {
-    type ReadGuard = OwnedRwLockReadGuard<Index>;
-
-    async fn read_guard(&self) -> Self::ReadGuard {
-        self.clone().read_owned().await
-    }
-}
-
-pub struct LocalSearcherBuilder<I: SearchableIndex> {
-    inner: InnerLocalSearcher<I>,
+pub struct LocalSearcherBuilder {
+    inner: InnerLocalSearcher,
 }
 
-impl<I> LocalSearcherBuilder<I>
-where
-    I: SearchableIndex,
-{
-    pub fn new(index: I) -> Self {
+impl LocalSearcherBuilder {
+    pub fn new(index: Arc<RwLock<Index>>) -> Self {
         Self {
             inner: InnerLocalSearcher::new(index),
         }
@@ -87,22 +68,19 @@ where
         self
     }
 
-    pub fn build(self) -> LocalSearcher<I> {
+    pub fn build(self) -> LocalSearcher {
         LocalSearcher {
             inner: Arc::new(self.inner),
         }
     }
 }
 
-pub struct LocalSearcher<I: SearchableIndex> {
-    inner: Arc<InnerLocalSearcher<I>>,
+pub struct LocalSearcher {
+    inner: Arc<InnerLocalSearcher>,
 }
 
-impl<I> LocalSearcher<I>
-where
-    I: SearchableIndex,
-{
-    pub fn builder(index: I) -> LocalSearcherBuilder<I> {
+impl LocalSearcher {
+    pub fn builder(index: Arc<RwLock<Index>>) -> LocalSearcherBuilder {
         LocalSearcherBuilder::new(index)
     }
 
@@ -203,7 +181,7 @@ where
         })
     }
 
-    /// This function is mainly used for tests and benchmarks
+    /// This function is only used for tests and benchmarks
     pub fn search_sync(&self, query: &SearchQuery) -> Result<WebsitesResult> {
         crate::block_on(self.search(query))
     }

+ 6 - 0
crates/core/src/searcher/mod.rs

@@ -14,6 +14,12 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! Searchers are responsible for executing search queries against an index.
+//! There are two types of searchers:
+//! - [`local::LocalSearcher`] which runs the search on the local machine.
+//! - [`distributed::DistributedSearcher`] which runs the search on a remote cluster. Each node
+//!     will run a local searcher and then the results are merged on the coordinator node.
+
 pub mod api;
 pub mod distributed;
 pub mod local;

+ 5 - 5
crates/core/src/webgraph/query/backlink.rs

@@ -182,7 +182,7 @@ impl Query for BacklinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 
@@ -314,7 +314,7 @@ impl Query for HostBacklinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit)
             .enable_offset()
             .with_deduplicator(HostDeduplicator)
@@ -467,7 +467,7 @@ impl Query for FullBacklinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 
@@ -617,7 +617,7 @@ impl Query for FullHostBacklinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit)
             .enable_offset()
             .with_deduplicator(HostDeduplicator)
@@ -742,7 +742,7 @@ impl Query for BacklinksWithLabelsQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 

+ 1 - 1
crates/core/src/webgraph/query/between.rs

@@ -113,7 +113,7 @@ impl Query for FullLinksBetweenQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 

+ 1 - 1
crates/core/src/webgraph/query/degree.rs

@@ -53,7 +53,7 @@ impl Query for InDegreeQuery {
         .with_shard_id(searcher.shard())
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         FastCountCollector::new(
             ToId.name().to_string(),
             FastCountValue::U128(self.node.as_u128()),

+ 4 - 4
crates/core/src/webgraph/query/forwardlink.rs

@@ -135,7 +135,7 @@ impl Query for ForwardlinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 
@@ -286,7 +286,7 @@ impl Query for HostForwardlinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit)
             .enable_offset()
             .with_deduplicator(HostDeduplicator)
@@ -437,7 +437,7 @@ impl Query for FullForwardlinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit).enable_offset()
     }
 
@@ -576,7 +576,7 @@ impl Query for FullHostForwardlinksQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         TopDocsCollector::from(self.limit)
             .enable_offset()
             .with_deduplicator(HostDeduplicator)

+ 2 - 2
crates/core/src/webgraph/query/group_by.rs

@@ -157,7 +157,7 @@ impl Query for HostGroupSketchQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         GroupSketchCollector::new(self.group, self.value)
     }
 
@@ -306,7 +306,7 @@ impl Query for HostGroupQuery {
         collector
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         GroupExactCollector::new(self.group, self.value)
     }
 

+ 1 - 1
crates/core/src/webgraph/query/id2node.rs

@@ -83,7 +83,7 @@ impl Query for Id2NodeQuery {
         }))
     }
 
-    fn remote_collector(&self) -> Self::Collector {
+    fn coordinator_collector(&self) -> Self::Collector {
         FirstDocCollector::without_shard_id()
     }
 

+ 21 - 1
crates/core/src/webgraph/query/mod.rs

@@ -14,6 +14,26 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+//! # Main flow
+//! ```md
+//! `coordinator`  <------>  `searcher`
+//! -----------------------------------
+//! send query to searcher
+//!                        search index
+//!                        collect fruits
+//!                        send fruits to coordinator
+//! merge fruits
+//! filter fruits
+//!     for each shard
+//! send fruits to searchers
+//!                        construct intermediate output
+//!                           from fruits
+//!                        send intermediate output to coordinator
+//! merge intermediate outputs
+//! return final output
+//! ---------------------------------------------------
+//! ```
+
 use super::searcher::Searcher;
 use crate::{ampc::dht::ShardId, Result};
 pub use collector::Collector;
@@ -49,7 +69,7 @@ pub trait Query: Send + Sync + bincode::Encode + bincode::Decode + Clone {
 
     fn tantivy_query(&self, searcher: &Searcher) -> Self::TantivyQuery;
     fn collector(&self, searcher: &Searcher) -> Self::Collector;
-    fn remote_collector(&self) -> Self::Collector;
+    fn coordinator_collector(&self) -> Self::Collector;
 
     fn filter_fruit_shards(
         &self,

+ 2 - 2
crates/core/src/webgraph/remote.rs

@@ -120,7 +120,7 @@ impl RemoteWebgraph {
         <<Q::Collector as webgraph::Collector>::Child as tantivy::collector::SegmentCollector>::Fruit:
             From<<Q::Collector as webgraph::Collector>::Fruit>,
     {
-        let collector = query.remote_collector();
+        let collector = query.coordinator_collector();
 
         let res = self
             .conn()
@@ -250,7 +250,7 @@ impl RemoteWebgraph {
         queries
             .iter()
             .zip_eq(fruits.into_iter())
-            .map(|(query, shard_fruits)| query.remote_collector().merge_fruits(shard_fruits))
+            .map(|(query, shard_fruits)| query.coordinator_collector().merge_fruits(shard_fruits))
             .collect::<Result<Vec<_>, _>>()
     }
 

+ 3 - 1
crates/core/src/webgraph/store.rs

@@ -271,7 +271,9 @@ impl EdgeStore {
             From<<Q::Collector as Collector>::Fruit>,
     {
         let fruit = self.search_initial(query)?;
-        let fruit = query.remote_collector().merge_fruits(vec![fruit.into()])?;
+        let fruit = query
+            .coordinator_collector()
+            .merge_fruits(vec![fruit.into()])?;
         let res = self.retrieve(query, fruit)?;
         Ok(Q::merge_results(vec![res]))
     }