Browse Source

treat live index as an extra shard in search index to re-use result merge logic etc

Mikkel Denker 9 tháng trước cách đây
mục cha
commit
abef4f26ff
33 tập tin đã thay đổi với 380 bổ sung672 xóa
  1. 4 5
      crates/core/examples/search_preindexed.rs
  2. 3 6
      crates/core/src/api/mod.rs
  3. 17 0
      crates/core/src/config/mod.rs
  4. 9 3
      crates/core/src/distributed/member.rs
  5. 44 47
      crates/core/src/entrypoint/live_index/search_server.rs
  6. 40 5
      crates/core/src/entrypoint/live_index/tests.rs
  7. 41 23
      crates/core/src/entrypoint/search_server.rs
  8. 4 26
      crates/core/src/index.rs
  9. 53 43
      crates/core/src/live_index/index.rs
  10. 14 13
      crates/core/src/query/mod.rs
  11. 19 19
      crates/core/src/query/optic.rs
  12. 4 5
      crates/core/src/ranking/inbound_similarity.rs
  13. 14 12
      crates/core/src/ranking/mod.rs
  14. 4 5
      crates/core/src/ranking/optics.rs
  15. 6 10
      crates/core/src/ranking/pipeline/mod.rs
  16. 2 2
      crates/core/src/ranking/pipeline/modifiers/inbound_similarity.rs
  17. 1 1
      crates/core/src/ranking/pipeline/scorers/embedding.rs
  18. 1 1
      crates/core/src/ranking/pipeline/scorers/inbound_similarity.rs
  19. 1 1
      crates/core/src/ranking/pipeline/scorers/lambdamart.rs
  20. 3 3
      crates/core/src/ranking/pipeline/scorers/term_distance.rs
  21. 9 7
      crates/core/src/ranking/pipeline/stages/recall.rs
  22. 16 145
      crates/core/src/searcher/api/mod.rs
  23. 2 3
      crates/core/src/searcher/api/sidebar.rs
  24. 33 22
      crates/core/src/searcher/distributed.rs
  25. 0 194
      crates/core/src/searcher/live.rs
  26. 3 31
      crates/core/src/searcher/local/guard.rs
  27. 11 11
      crates/core/src/searcher/local/inner.rs
  28. 10 17
      crates/core/src/searcher/local/mod.rs
  29. 1 2
      crates/core/src/searcher/mod.rs
  30. 4 3
      crates/core/src/snippet.rs
  31. 4 4
      crates/core/src/webgraph/store.rs
  32. 1 0
      crates/file-store/src/temp.rs
  33. 2 3
      crates/simple-wal/src/lib.rs

+ 4 - 5
crates/core/examples/search_preindexed.rs

@@ -10,11 +10,10 @@ use stract::{
     },
     generic_query::TopKeyPhrasesQuery,
     index::Index,
-    searcher::{
-        api::ApiSearcher, live::LiveSearcher, LocalSearchClient, LocalSearcher, SearchQuery,
-    },
+    searcher::{api::ApiSearcher, LocalSearchClient, LocalSearcher, SearchQuery},
     webgraph::Webgraph,
 };
+use tokio::sync::RwLock;
 
 #[tokio::main]
 pub async fn main() {
@@ -56,7 +55,7 @@ pub async fn main() {
         ..Default::default()
     });
 
-    let searcher = LocalSearcher::builder(Arc::new(index))
+    let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index)))
         .set_collector_config(collector_conf)
         .build();
 
@@ -75,7 +74,7 @@ pub async fn main() {
 
     let webgraph = Webgraph::open("data/webgraph", 0u64.into()).unwrap();
 
-    let searcher: ApiSearcher<LocalSearchClient, LiveSearcher, Webgraph> =
+    let searcher: ApiSearcher<LocalSearchClient, Webgraph> =
         ApiSearcher::new(searcher, None, bangs, config)
             .await
             .with_webgraph(webgraph);

+ 3 - 6
crates/core/src/api/mod.rs

@@ -32,7 +32,7 @@ use crate::{
     leaky_queue::LeakyQueue,
     models::dual_encoder::DualEncoder,
     ranking::models::lambdamart::LambdaMART,
-    searcher::{api::ApiSearcher, live::LiveSearcher, DistributedSearcher, SearchClient},
+    searcher::{api::ApiSearcher, DistributedSearcher, SearchClient},
     similar_hosts::SimilarHostsFinder,
     webgraph::remote::RemoteWebgraph,
 };
@@ -73,7 +73,7 @@ pub struct Counters {
 
 pub struct State {
     pub config: ApiConfig,
-    pub searcher: Arc<ApiSearcher<DistributedSearcher, LiveSearcher, Arc<RemoteWebgraph>>>,
+    pub searcher: Arc<ApiSearcher<DistributedSearcher, Arc<RemoteWebgraph>>>,
     pub webgraph: Arc<RemoteWebgraph>,
     pub autosuggest: Autosuggest,
     pub counters: Counters,
@@ -180,7 +180,6 @@ pub async fn router(
     let webgraph = RemoteWebgraph::new(cluster.clone()).await;
 
     let dist_searcher = DistributedSearcher::new(Arc::clone(&cluster)).await;
-    let live_searcher = LiveSearcher::new(Arc::clone(&cluster));
 
     if !cluster
         .members()
@@ -209,9 +208,7 @@ pub async fn router(
         }
 
         let mut searcher =
-            ApiSearcher::new(dist_searcher, Some(cluster.clone()), bangs, config.clone())
-                .await
-                .with_live(live_searcher);
+            ApiSearcher::new(dist_searcher, Some(cluster.clone()), bangs, config.clone()).await;
 
         if let Some(cross_encoder) = cross_encoder {
             searcher = searcher.with_cross_encoder(cross_encoder);

+ 17 - 0
crates/core/src/config/mod.rs

@@ -352,6 +352,22 @@ pub struct SearchServerConfig {
     pub snippet: SnippetConfig,
 }
 
+impl From<LiveIndexConfig> for SearchServerConfig {
+    fn from(config: LiveIndexConfig) -> Self {
+        Self {
+            host: config.search_host,
+            gossip_seed_nodes: config.gossip_seed_nodes,
+            gossip_addr: config.gossip_addr,
+            shard: config.shard_id,
+            index_path: config.index_path,
+            linear_model_path: config.linear_model_path,
+            dual_encoder_model_path: config.lambda_model_path,
+            collector: config.collector,
+            snippet: config.snippet,
+        }
+    }
+}
+
 #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
 pub struct EntitySearchServerConfig {
     pub gossip_seed_nodes: Option<Vec<SocketAddr>>,
@@ -613,6 +629,7 @@ pub struct LiveIndexConfig {
     pub linear_model_path: Option<String>,
     pub lambda_model_path: Option<String>,
     pub host: SocketAddr,
+    pub search_host: SocketAddr,
     #[serde(default)]
     pub collector: CollectorConfig,
     #[serde(default)]

+ 9 - 3
crates/core/src/distributed/member.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -106,6 +106,7 @@ pub enum Service {
     },
     LiveIndex {
         host: SocketAddr,
+        search_host: SocketAddr,
         shard: crate::inverted_index::ShardId,
         state: LiveIndexState,
     },
@@ -141,8 +142,13 @@ impl std::fmt::Display for Service {
         match self {
             Self::Searcher { host, shard } => write!(f, "Searcher {} {}", host, shard),
             Self::EntitySearcher { host } => write!(f, "EntitySearcher {}", host),
-            Self::LiveIndex { host, shard, state } => {
-                write!(f, "LiveIndex {} {} {}", host, shard, state)
+            Self::LiveIndex {
+                host,
+                search_host,
+                shard,
+                state,
+            } => {
+                write!(f, "LiveIndex {} {} {} {}", host, search_host, shard, state)
             }
             Self::Api { host } => write!(f, "Api {}", host),
             Self::Webgraph { host, shard } => {

+ 44 - 47
crates/core/src/entrypoint/live_index/search_server.rs

@@ -29,9 +29,9 @@ use crate::{
         remote_cp,
         sonic::{self, service::sonic_service},
     },
-    inverted_index::{self, ShardId},
+    entrypoint::search_server::SearchService,
+    inverted_index::ShardId,
     live_index::{IndexManager, LiveIndex},
-    searcher::{InitialWebsiteResult, LocalSearcher},
 };
 use anyhow::{Context, Result};
 use futures::stream::FuturesUnordered;
@@ -41,23 +41,14 @@ use tokio::sync::Mutex;
 use tokio_stream::StreamExt;
 use tracing::info;
 
-use crate::entrypoint::{
-    indexer::{self, IndexableWebpage},
-    search_server::{RetrieveWebsites, Search},
-};
+use crate::entrypoint::indexer::{self, IndexableWebpage};
 
 const INDEXING_TIMEOUT: Duration = Duration::from_secs(60);
 const INDEXING_RETRIES: usize = 3;
 
 sonic_service!(
     LiveIndexService,
-    [
-        RetrieveWebsites,
-        Search,
-        IndexWebpages,
-        GetIndexPath,
-        RemoteDownload
-    ]
+    [IndexWebpages, GetIndexPath, RemoteDownload]
 );
 
 fn start_manager(index: Arc<LiveIndex>) {
@@ -95,6 +86,7 @@ async fn other_replicas(
             if member.id != id {
                 if let Service::LiveIndex {
                     host: member_host,
+                    search_host: _,
                     shard: member_shard,
                     state,
                 } = member.service
@@ -120,6 +112,7 @@ async fn setup(index: Arc<LiveIndex>, cluster: Arc<Cluster>, temp_wal: TempWal)
 
     let Service::LiveIndex {
         host,
+        search_host,
         shard,
         state: _,
     } = self_node.service.clone()
@@ -167,6 +160,7 @@ async fn setup(index: Arc<LiveIndex>, cluster: Arc<Cluster>, temp_wal: TempWal)
         .set_service(Service::LiveIndex {
             host,
             shard,
+            search_host,
             state: LiveIndexState::Ready,
         })
         .await?;
@@ -177,7 +171,6 @@ async fn setup(index: Arc<LiveIndex>, cluster: Arc<Cluster>, temp_wal: TempWal)
 type TempWal = Arc<Mutex<Option<Wal<IndexableWebpage>>>>;
 
 pub struct LiveIndexService {
-    local_searcher: LocalSearcher<Arc<LiveIndex>>,
     temp_wal: TempWal,
     index: Arc<LiveIndex>,
     cluster_handle: Arc<Cluster>,
@@ -189,6 +182,7 @@ impl LiveIndexService {
             Cluster::join(
                 Member::new(Service::LiveIndex {
                     host: config.host,
+                    search_host: config.search_host,
                     shard: ShardId::Live(config.shard_id),
                     state: crate::distributed::member::LiveIndexState::InSetup,
                 }),
@@ -215,14 +209,10 @@ impl LiveIndexService {
             )
             .await?,
         );
-        let local_searcher = LocalSearcher::builder(index.clone())
-            .set_collector_config(config.collector)
-            .build();
 
         let temp_wal = Arc::new(Mutex::new(Some(Wal::open(index_path.join("wal.temp"))?)));
 
         Ok(Self {
-            local_searcher,
             cluster_handle,
             index,
             temp_wal,
@@ -260,6 +250,7 @@ impl LiveIndexService {
         let self_id = self_member.id.clone();
         let Service::LiveIndex {
             host: self_host,
+            search_host: _,
             shard: self_shard,
             state: _,
         } = self_member.service
@@ -273,7 +264,13 @@ impl LiveIndexService {
             .await
             .into_iter()
             .filter_map(|member| {
-                if let Service::LiveIndex { host, shard, state } = member.service {
+                if let Service::LiveIndex {
+                    host,
+                    search_host: _,
+                    shard,
+                    state,
+                } = member.service
+                {
                     if member.id != self_id && shard == self_shard && host != self_host {
                         Some(RemoteIndex { host, state })
                     } else {
@@ -327,28 +324,6 @@ impl LiveIndexService {
     }
 }
 
-impl sonic::service::Message<LiveIndexService> for RetrieveWebsites {
-    type Response = Option<Vec<inverted_index::RetrievedWebpage>>;
-    async fn handle(self, server: &LiveIndexService) -> Self::Response {
-        server
-            .local_searcher
-            .retrieve_websites(&self.websites, &self.query)
-            .await
-            .ok()
-    }
-}
-
-impl sonic::service::Message<LiveIndexService> for Search {
-    type Response = Option<InitialWebsiteResult>;
-    async fn handle(self, server: &LiveIndexService) -> Self::Response {
-        server
-            .local_searcher
-            .search_initial(&self.query, true)
-            .await
-            .ok()
-    }
-}
-
 #[derive(Clone, Debug)]
 struct RemoteIndex {
     host: SocketAddr,
@@ -442,17 +417,39 @@ impl sonic::service::Message<LiveIndexService> for RemoteDownload {
 pub async fn serve(config: LiveIndexConfig) -> Result<()> {
     let addr = config.host;
 
-    let service = LiveIndexService::new(config).await?;
+    let service = LiveIndexService::new(config.clone()).await?;
+    let index = service.index().index().await;
+    let cluster = service.cluster_handle();
 
     service.background_setup();
 
     let server = service.bind(&addr).await.unwrap();
 
-    info!("live index is ready to accept requests on {}", addr);
+    let search_addr = config.search_host;
+    let search_server =
+        SearchService::new_from_existing(config.into(), cluster.clone(), index).await?;
 
-    loop {
-        if let Err(e) = server.accept().await {
-            tracing::error!("{:?}", e);
+    let search_server = search_server.bind(&search_addr).await.unwrap();
+
+    let search_server_handle = tokio::spawn(async move {
+        loop {
+            if let Err(e) = search_server.accept().await {
+                tracing::error!("{:?}", e);
+            }
         }
-    }
+    });
+
+    let live_index_handle = tokio::spawn(async move {
+        loop {
+            if let Err(e) = server.accept().await {
+                tracing::error!("{:?}", e);
+            }
+        }
+    });
+
+    info!("live index is ready to accept requests on {}", addr);
+
+    tokio::try_join!(live_index_handle, search_server_handle)?;
+
+    Ok(())
 }

+ 40 - 5
crates/core/src/entrypoint/live_index/tests.rs

@@ -59,11 +59,13 @@ fn config<P: AsRef<Path>>(path: P) -> LiveIndexConfig {
         host: free_socket_addr(),
         collector: Default::default(),
         snippet: Default::default(),
+        search_host: free_socket_addr(),
     }
 }
 
 struct RemoteIndex {
     host: SocketAddr,
+    search_host: SocketAddr,
     shard: ShardId,
     gossip_addr: SocketAddr,
     underlying_index: Arc<LiveIndex>,
@@ -78,18 +80,29 @@ impl RemoteIndex {
 
         config.shard_id = shard;
         let host = config.host;
+        let search_host = config.search_host;
         let gossip_addr = config.gossip_addr;
 
         if !gossip_seed.is_empty() {
             config.gossip_seed_nodes = Some(gossip_seed);
         }
 
-        let service = LiveIndexService::new(config).await?;
+        let service = LiveIndexService::new(config.clone()).await?;
         let cluster = service.cluster_handle();
         let index = service.index();
 
         service.background_setup();
 
+        let shard = ShardId::Live(config.shard_id);
+        let search_server = search_server::SearchService::new_from_existing(
+            config.into(),
+            cluster.clone(),
+            index.index().await,
+        )
+        .await?;
+
+        let search_server = search_server.bind(&search_host).await.unwrap();
+
         let server = service.bind(&host).await.unwrap();
 
         tokio::task::spawn(async move {
@@ -100,9 +113,18 @@ impl RemoteIndex {
             }
         });
 
+        tokio::task::spawn(async move {
+            loop {
+                if let Err(e) = search_server.accept().await {
+                    tracing::error!("{:?}", e);
+                }
+            }
+        });
+
         Ok(Self {
             host,
-            shard: ShardId::Live(shard),
+            search_host,
+            shard,
             gossip_addr,
             underlying_index: index,
             cluster,
@@ -110,6 +132,12 @@ impl RemoteIndex {
         })
     }
 
+    async fn search_conn(
+        &self,
+    ) -> Result<sonic::service::Connection<search_server::SearchService>> {
+        Ok(sonic::service::Connection::create(self.search_host).await?)
+    }
+
     async fn conn(&self) -> Result<sonic::service::Connection<LiveIndexService>> {
         Ok(sonic::service::Connection::create(self.host).await?)
     }
@@ -133,10 +161,17 @@ impl RemoteIndex {
     async fn await_ready(&self, cluster: &Cluster) {
         cluster
             .await_member(|member| {
-                if let Service::LiveIndex { host, shard, state } = member.service.clone() {
+                if let Service::LiveIndex {
+                    host,
+                    search_host,
+                    shard,
+                    state,
+                } = member.service.clone()
+                {
                     self.shard == shard
                         && matches!(state, LiveIndexState::Ready)
                         && host == self.host
+                        && search_host == self.search_host
                 } else {
                     false
                 }
@@ -145,7 +180,7 @@ impl RemoteIndex {
     }
 
     async fn search(&self, query: &str) -> Result<Vec<inverted_index::RetrievedWebpage>> {
-        let mut conn = self.conn().await?;
+        let mut conn = self.search_conn().await?;
 
         let websites: Vec<inverted_index::WebpagePointer> = conn
             .send(search_server::Search {
@@ -490,7 +525,7 @@ async fn test_segment_compaction() -> Result<()> {
 
     assert_eq!(index.meta().await.segments().len(), 1);
 
-    let searcher = LocalSearcher::builder(index.clone()).build();
+    let searcher = LocalSearcher::builder(index.index().await).build();
 
     let res = searcher
         .search(&SearchQuery {

+ 41 - 23
crates/core/src/entrypoint/search_server.rs

@@ -16,6 +16,7 @@
 
 use std::sync::Arc;
 
+use tokio::sync::RwLock;
 use tracing::info;
 
 use crate::{
@@ -136,22 +137,19 @@ impl_search!([
 ]);
 
 pub struct SearchService {
-    local_searcher: LocalSearcher<Arc<Index>>,
+    local_searcher: LocalSearcher<Arc<RwLock<Index>>>,
     // dropping the handle leaves the cluster
     #[allow(unused)]
-    cluster_handle: Cluster,
+    cluster_handle: Arc<Cluster>,
 }
 
 impl SearchService {
-    async fn new(config: config::SearchServerConfig) -> Result<Self> {
-        let shard = ShardId::Backbone(config.shard);
-        let mut search_index = Index::open(config.index_path)?;
-        search_index
-            .inverted_index
-            .set_snippet_config(config.snippet);
-        search_index.set_shard_id(shard);
-
-        let mut local_searcher = LocalSearcher::builder(Arc::new(search_index));
+    pub async fn new_from_existing(
+        config: config::SearchServerConfig,
+        cluster: Arc<Cluster>,
+        index: Arc<RwLock<Index>>,
+    ) -> Result<Self> {
+        let mut local_searcher = LocalSearcher::builder(index);
 
         if let Some(model_path) = config.linear_model_path {
             local_searcher = local_searcher.set_linear_model(LinearRegression::open(model_path)?);
@@ -163,21 +161,36 @@ impl SearchService {
 
         local_searcher = local_searcher.set_collector_config(config.collector);
 
-        let cluster_handle = Cluster::join(
-            Member::new(Service::Searcher {
-                host: config.host,
-                shard,
-            }),
-            config.gossip_addr,
-            config.gossip_seed_nodes.unwrap_or_default(),
-        )
-        .await?;
-
         Ok(SearchService {
             local_searcher: local_searcher.build(),
-            cluster_handle,
+            cluster_handle: cluster,
         })
     }
+
+    pub async fn new(config: config::SearchServerConfig, shard: ShardId) -> Result<Self> {
+        let host = config.host;
+        let gossip_addr = config.gossip_addr;
+        let gossip_seed_nodes = config.gossip_seed_nodes.clone().unwrap_or_default();
+
+        let mut search_index = Index::open(&config.index_path)?;
+        search_index
+            .inverted_index
+            .set_snippet_config(config.snippet.clone());
+        search_index.set_shard_id(shard);
+
+        let search_index = Arc::new(RwLock::new(search_index));
+
+        let cluster = Arc::new(
+            Cluster::join(
+                Member::new(Service::Searcher { host, shard }),
+                gossip_addr,
+                gossip_seed_nodes,
+            )
+            .await?,
+        );
+
+        Self::new_from_existing(config, cluster, search_index).await
+    }
 }
 
 #[derive(Debug, Clone, bincode::Encode, bincode::Decode)]
@@ -213,7 +226,12 @@ impl sonic::service::Message<SearchService> for Search {
 
 pub async fn run(config: config::SearchServerConfig) -> Result<()> {
     let addr = config.host;
-    let server = SearchService::new(config).await?.bind(addr).await.unwrap();
+    let shard = ShardId::Backbone(config.shard);
+    let server = SearchService::new(config, shard)
+        .await?
+        .bind(addr)
+        .await
+        .unwrap();
 
     info!("search server is ready to accept requests on {}", addr);
 

+ 4 - 26
crates/core/src/index.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -14,16 +14,12 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-use std::collections::HashSet;
 use std::fs;
 use std::path::{Path, PathBuf};
 use std::sync::Mutex;
 
-use crate::collector::MainCollector;
 use crate::inverted_index::{self, InvertedIndex, ShardId};
 use crate::query::Query;
-use crate::search_ctx::Ctx;
-use crate::webgraph::NodeID;
 use crate::webpage::region::{Region, RegionCount};
 use crate::webpage::Webpage;
 use crate::Result;
@@ -100,26 +96,6 @@ impl Index {
         Ok(())
     }
 
-    pub fn top_nodes(
-        &self,
-        query: &Query,
-        ctx: &Ctx,
-        collector: MainCollector,
-    ) -> Result<Vec<NodeID>> {
-        let websites = self
-            .inverted_index
-            .search_initial(query, ctx, collector)?
-            .top_websites;
-
-        let mut hosts = HashSet::with_capacity(websites.len());
-        for website in &websites {
-            if let Some(id) = self.inverted_index.website_host_node(website)? {
-                hosts.insert(id);
-            }
-        }
-        Ok(hosts.into_iter().collect())
-    }
-
     pub fn retrieve_websites(
         &self,
         websites: &[inverted_index::WebpagePointer],
@@ -156,6 +132,8 @@ impl Index {
 mod tests {
     use std::sync::Arc;
 
+    use tokio::sync::RwLock;
+
     use crate::{
         ranking,
         searcher::{LocalSearcher, SearchQuery},
@@ -235,7 +213,7 @@ mod tests {
 
         index.commit().unwrap();
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let res = searcher
             .search_sync(&SearchQuery {
                 query: "test".to_string(),

+ 53 - 43
crates/core/src/live_index/index.rs

@@ -20,7 +20,7 @@ use std::{
     sync::Arc,
 };
 
-use tokio::sync::{OwnedRwLockReadGuard, RwLock};
+use tokio::sync::RwLock;
 
 use chrono::{DateTime, NaiveDate, Utc};
 use itertools::Itertools;
@@ -107,7 +107,7 @@ impl Meta {
 }
 
 pub struct InnerIndex {
-    index: crate::index::Index,
+    index: Arc<RwLock<crate::index::Index>>,
     write_ahead_log: Wal<crate::entrypoint::indexer::IndexableWebpage>,
     has_inserts: bool,
     indexing_worker: IndexingWorker,
@@ -133,7 +133,7 @@ impl InnerIndex {
         let meta = Meta::open_or_create(path.as_ref().join("meta.json"));
 
         Ok(Self {
-            index,
+            index: Arc::new(RwLock::new(index)),
             write_ahead_log,
             indexing_worker: worker,
             has_inserts: wal_count > 0,
@@ -142,7 +142,7 @@ impl InnerIndex {
         })
     }
 
-    pub fn prune_segments(&mut self) {
+    pub async fn prune_segments(&mut self) {
         let old_segments: Vec<_> = self
             .meta
             .segments
@@ -157,12 +157,14 @@ impl InnerIndex {
             .collect();
 
         self.index
+            .write()
+            .await
             .inverted_index
             .delete_segments_by_id(&old_segments)
             .unwrap();
 
-        self.sync_meta_with_index();
-        self.re_open();
+        self.sync_meta_with_index().await;
+        self.re_open().await;
     }
 
     pub async fn start_compact_segments_by_date(&self) -> Result<Vec<CompactOperation>> {
@@ -179,6 +181,8 @@ impl InnerIndex {
 
             let (entry, merge_op) = self
                 .index
+                .write()
+                .await
                 .inverted_index
                 .start_merge_segments_by_id(&segment_ids)
                 .await?;
@@ -193,12 +197,18 @@ impl InnerIndex {
         Ok(operations)
     }
 
-    fn end_compact_segments_by_date(&mut self, operations: Vec<CompactOperation>) -> Result<()> {
+    async fn end_compact_segments_by_date(
+        &mut self,
+        operations: Vec<CompactOperation>,
+    ) -> Result<()> {
         for op in operations {
             let newest_creation_date = op.segments.iter().map(|s| s.created).max().unwrap();
             let segment_ids: Vec<SegmentId> = op.segments.iter().map(|s| s.id).collect();
 
-            if let Ok(Some(new_segment_id)) = op.end(&mut self.index.inverted_index) {
+            let mut index = self.index.write().await;
+            if let Ok(Some(new_segment_id)) = op.end(&mut index.inverted_index) {
+                drop(index);
+
                 self.update_meta_after_compaction(
                     segment_ids,
                     new_segment_id,
@@ -208,7 +218,7 @@ impl InnerIndex {
         }
 
         self.save_meta();
-        self.re_open();
+        self.re_open().await;
 
         Ok(())
     }
@@ -241,18 +251,22 @@ impl InnerIndex {
         });
     }
 
-    fn re_open(&mut self) {
-        self.index.inverted_index.re_open().unwrap();
-        self.index.prepare_writer().unwrap();
+    async fn re_open(&mut self) {
+        let mut index = self.index.write().await;
+        let shard_id = index.shard_id();
+        index.inverted_index.re_open().unwrap();
+        index.prepare_writer().unwrap();
 
-        if let Some(shard_id) = self.index.shard_id() {
-            self.index.set_shard_id(shard_id);
+        if let Some(shard_id) = shard_id {
+            index.set_shard_id(shard_id);
         }
     }
 
-    fn sync_meta_with_index(&mut self) {
+    async fn sync_meta_with_index(&mut self) {
         let segments_in_index: HashSet<_> = self
             .index
+            .write()
+            .await
             .inverted_index
             .segment_ids()
             .into_iter()
@@ -298,20 +312,18 @@ impl InnerIndex {
         self.meta.save(self.path.join("meta.json"));
     }
 
-    pub fn index(&self) -> &crate::index::Index {
-        &self.index
-    }
-
-    pub fn delete_all_pages(&mut self) {
-        let segments = self.index.inverted_index.segment_ids();
-        self.index
+    pub async fn delete_all_pages(&mut self) {
+        let mut index = self.index.write().await;
+        let segments = index.inverted_index.segment_ids();
+        index
             .inverted_index
             .delete_segments_by_id(&segments)
             .unwrap();
+        drop(index);
 
         self.meta = Meta::default();
         self.save_meta();
-        self.re_open();
+        self.re_open().await;
     }
 
     pub fn insert(&mut self, pages: &[IndexableWebpage]) {
@@ -320,6 +332,7 @@ impl InnerIndex {
     }
 
     pub async fn commit(&mut self) {
+        let mut index = self.index.write().await;
         for batch in self
             .write_ahead_log
             .iter()
@@ -330,27 +343,21 @@ impl InnerIndex {
         {
             let batch: Vec<_> = batch.collect();
             for webpage in self.indexing_worker.prepare_webpages(&batch).await {
-                self.index.insert(&webpage).unwrap();
+                index.insert(&webpage).unwrap();
             }
         }
-        self.index.commit().unwrap();
+        index.commit().unwrap();
+        drop(index);
+
         self.write_ahead_log.clear().unwrap();
-        self.sync_meta_with_index();
+        self.sync_meta_with_index().await;
         self.has_inserts = false;
-        self.re_open();
+        self.re_open().await;
     }
 
     pub fn has_inserts(&self) -> bool {
         self.has_inserts
     }
-
-    pub fn path(&self) -> &Path {
-        &self.path
-    }
-
-    pub fn meta(&self) -> &Meta {
-        &self.meta
-    }
 }
 
 pub struct LiveIndex {
@@ -377,7 +384,7 @@ impl LiveIndex {
 
     pub async fn prune_segments(&self) {
         tracing::debug!("pruning segments");
-        self.inner.write().await.prune_segments()
+        self.inner.write().await.prune_segments().await
     }
 
     pub async fn has_inserts(&self) -> bool {
@@ -396,25 +403,28 @@ impl LiveIndex {
         self.inner
             .write()
             .await
-            .end_compact_segments_by_date(operations)?;
+            .end_compact_segments_by_date(operations)
+            .await?;
 
         Ok(())
     }
 
+    pub async fn index(&self) -> Arc<RwLock<crate::index::Index>> {
+        self.inner.read().await.index.clone()
+    }
+
     pub async fn insert(&self, pages: &[IndexableWebpage]) {
         tracing::debug!("inserting {} pages into index", pages.len());
         self.inner.write().await.insert(pages)
     }
 
-    pub async fn read(&self) -> OwnedRwLockReadGuard<InnerIndex> {
-        RwLock::read_owned(self.inner.clone()).await
-    }
-
     pub async fn set_snippet_config(&self, config: SnippetConfig) {
         self.inner
             .write()
             .await
             .index
+            .write()
+            .await
             .inverted_index
             .set_snippet_config(config)
     }
@@ -424,11 +434,11 @@ impl LiveIndex {
     }
 
     pub async fn delete_all_pages(&self) {
-        self.inner.write().await.delete_all_pages();
+        self.inner.write().await.delete_all_pages().await
     }
 
     pub async fn re_open(&self) -> Result<()> {
-        self.inner.write().await.re_open();
+        self.inner.write().await.re_open().await;
 
         Ok(())
     }

+ 14 - 13
crates/core/src/query/mod.rs

@@ -209,6 +209,7 @@ mod tests {
 
     use crate::{index::Index, rand_words, searcher::LocalSearcher, webpage::Webpage};
     use proptest::prelude::*;
+    use tokio::sync::RwLock;
 
     use super::*;
 
@@ -383,7 +384,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher.search_sync(&query).expect("Search failed");
         assert_eq!(result.webpages.len(), 1);
@@ -449,7 +450,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test site:first.com".to_string(),
@@ -523,7 +524,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test linksto:first.com".to_string(),
@@ -625,7 +626,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test linkto:second.com".to_string(),
@@ -677,7 +678,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "intitle:website".to_string(),
@@ -729,7 +730,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test inurl:forum".to_string(),
@@ -824,7 +825,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test site:first.com".to_string(),
@@ -897,7 +898,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "\"Test website\"".to_string(),
@@ -964,7 +965,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "testwebsite".to_string(),
@@ -1046,7 +1047,7 @@ mod tests {
         index.insert(&webpage).expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test".to_string(),
@@ -1137,7 +1138,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test site:.com".to_string(),
@@ -1244,7 +1245,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "test exacturl:https://www.first.com/example".to_string(),
@@ -1329,7 +1330,7 @@ mod tests {
             )
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let query = SearchQuery {
             query: "\"test test\" website".to_string(),

+ 19 - 19
crates/core/src/query/optic.rs

@@ -251,6 +251,7 @@ mod tests {
     use std::sync::Arc;
 
     use optics::{HostRankings, Optic};
+    use tokio::sync::RwLock;
 
     use crate::{
         bangs::Bangs,
@@ -258,7 +259,6 @@ mod tests {
         index::Index,
         searcher::{
             api::{ApiSearcher, Config},
-            live::LiveSearcher,
             LocalSearchClient, LocalSearcher, SearchQuery,
         },
         webgraph::{Edge, Node, Webgraph},
@@ -322,7 +322,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -443,7 +443,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let _ = searcher
             .search_sync(&SearchQuery {
@@ -567,7 +567,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -735,8 +735,8 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher: ApiSearcher<_, LiveSearcher, _> = ApiSearcher::new(
-            LocalSearchClient::from(LocalSearcher::builder(Arc::new(index)).build()),
+        let searcher: ApiSearcher<_, _> = ApiSearcher::new(
+            LocalSearchClient::from(LocalSearcher::builder(Arc::new(RwLock::new(index))).build()),
             None,
             Bangs::empty(),
             Config::default(),
@@ -854,7 +854,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().unwrap();
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -983,7 +983,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1073,7 +1073,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1136,7 +1136,7 @@ mod tests {
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let res = searcher
             .search_sync(&SearchQuery {
                 query: "example".to_string(),
@@ -1292,7 +1292,7 @@ mod tests {
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1360,7 +1360,7 @@ mod tests {
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1440,7 +1440,7 @@ mod tests {
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1536,7 +1536,7 @@ mod tests {
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1632,7 +1632,7 @@ mod tests {
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1711,7 +1711,7 @@ mod tests {
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1825,7 +1825,7 @@ mod tests {
 
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1913,7 +1913,7 @@ mod tests {
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -1955,7 +1955,7 @@ mod tests {
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {

+ 4 - 5
crates/core/src/ranking/inbound_similarity.rs

@@ -142,14 +142,13 @@ mod tests {
     use std::sync::Arc;
 
     use optics::HostRankings;
+    use tokio::sync::RwLock;
 
     use crate::{
         bangs::Bangs,
         index::Index,
         rand_words,
-        searcher::{
-            api::ApiSearcher, live::LiveSearcher, LocalSearchClient, LocalSearcher, SearchQuery,
-        },
+        searcher::{api::ApiSearcher, LocalSearchClient, LocalSearcher, SearchQuery},
         webgraph::{Edge, EdgeLimit, Node, Webgraph},
         webpage::{html::links::RelFlags, Html, Webpage},
     };
@@ -362,8 +361,8 @@ mod tests {
 
         index.commit().unwrap();
 
-        let searcher: ApiSearcher<_, LiveSearcher, _> = ApiSearcher::new(
-            LocalSearchClient::from(LocalSearcher::builder(Arc::new(index)).build()),
+        let searcher: ApiSearcher<_, _> = ApiSearcher::new(
+            LocalSearchClient::from(LocalSearcher::builder(Arc::new(RwLock::new(index))).build()),
             None,
             Bangs::empty(),
             crate::searcher::api::Config::default(),

+ 14 - 12
crates/core/src/ranking/mod.rs

@@ -131,6 +131,8 @@ mod tests {
 
     use std::{path::Path, sync::Arc};
 
+    use tokio::sync::RwLock;
+
     use crate::{
         config::{IndexerConfig, IndexerDualEncoderConfig, WarcSource},
         entrypoint::indexer::IndexingWorker,
@@ -198,7 +200,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let result = searcher
             .search_sync(&SearchQuery {
                 query: "example".to_string(),
@@ -264,7 +266,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let result = searcher
             .search_sync(&SearchQuery {
                 query: "example".to_string(),
@@ -332,7 +334,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let result = searcher
             .search_sync(&SearchQuery {
                 query: "title".to_string(),
@@ -405,7 +407,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let result = searcher
             .search_sync(&SearchQuery {
                 query: "test".to_string(),
@@ -471,7 +473,7 @@ mod tests {
             .expect("failed to insert webpage");
 
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
         let result = searcher
             .search_sync(&SearchQuery {
                 query: "test".to_string(),
@@ -556,7 +558,7 @@ mod tests {
 
         index.commit().unwrap();
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let res = searcher
             .search_sync(&SearchQuery {
@@ -642,7 +644,7 @@ mod tests {
             })
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {
@@ -735,7 +737,7 @@ mod tests {
             })
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {
@@ -856,7 +858,7 @@ mod tests {
 
         index.commit().expect("failed to commit index");
 
-        let mut searcher = LocalSearcher::builder(Arc::new(index));
+        let mut searcher = LocalSearcher::builder(Arc::new(RwLock::new(index)));
         searcher = searcher
             .set_dual_encoder(DualEncoder::open(data_path).expect("failed to open dual encoder"));
         let searcher = searcher.build();
@@ -945,7 +947,7 @@ mod tests {
 
         index.commit().expect("failed to commit index");
 
-        let mut searcher = LocalSearcher::builder(Arc::new(index));
+        let mut searcher = LocalSearcher::builder(Arc::new(RwLock::new(index)));
         searcher = searcher
             .set_dual_encoder(DualEncoder::open(data_path).expect("failed to open dual encoder"));
         let searcher = searcher.build();
@@ -995,7 +997,7 @@ mod tests {
             })
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {
@@ -1075,7 +1077,7 @@ mod tests {
 
         index.insert(&page).expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {

+ 4 - 5
crates/core/src/ranking/optics.rs

@@ -19,13 +19,12 @@ mod tests {
     use std::sync::Arc;
 
     use optics::HostRankings;
+    use tokio::sync::RwLock;
 
     use crate::{
         bangs::Bangs,
         index::Index,
-        searcher::{
-            api::ApiSearcher, live::LiveSearcher, LocalSearchClient, LocalSearcher, SearchQuery,
-        },
+        searcher::{api::ApiSearcher, LocalSearchClient, LocalSearcher, SearchQuery},
         webgraph::{Edge, Node, Webgraph},
         webpage::{html::links::RelFlags, Html, Webpage},
     };
@@ -188,8 +187,8 @@ mod tests {
             })
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
-        let searcher: ApiSearcher<_, LiveSearcher, _> = ApiSearcher::new(
-            LocalSearchClient::from(LocalSearcher::builder(Arc::new(index)).build()),
+        let searcher: ApiSearcher<_, _> = ApiSearcher::new(
+            LocalSearchClient::from(LocalSearcher::builder(Arc::new(RwLock::new(index))).build()),
             None,
             Bangs::empty(),
             crate::searcher::api::Config::default(),

+ 6 - 10
crates/core/src/ranking/pipeline/mod.rs

@@ -171,18 +171,18 @@ mod tests {
         inverted_index::{DocAddress, ShardId, WebpagePointer},
         prehashed::Prehashed,
         ranking::{self, bitvec_similarity::BitVec, initial::Score},
-        searcher::api,
+        searcher::ScoredWebpagePointer,
     };
 
     use super::*;
 
-    fn pipeline() -> RankingPipeline<api::ScoredWebpagePointer> {
+    fn pipeline() -> RankingPipeline<ScoredWebpagePointer> {
         RankingPipeline::new()
             .add_stage(term_distance::TitleDistanceScorer)
             .add_stage(term_distance::BodyDistanceScorer)
     }
 
-    fn sample_websites(n: usize) -> Vec<api::ScoredWebpagePointer> {
+    fn sample_websites(n: usize) -> Vec<ScoredWebpagePointer> {
         (0..n)
             .map(|i| -> LocalRecallRankingWebpage {
                 let pointer = WebpagePointer {
@@ -206,13 +206,9 @@ mod tests {
                 signals.insert(ranking::signals::HostCentrality.into(), calc);
                 LocalRecallRankingWebpage::new_testing(pointer, signals, calc.score)
             })
-            .map(|local| {
-                api::ScoredWebpagePointer::Normal(
-                    crate::searcher::distributed::ScoredWebpagePointer {
-                        website: RecallRankingWebpage::new(local, BitVec::new(vec![])),
-                        shard: ShardId::Backbone(0),
-                    },
-                )
+            .map(|local| ScoredWebpagePointer {
+                website: RecallRankingWebpage::new(local, BitVec::new(vec![])),
+                shard: ShardId::Backbone(0),
             })
             .collect()
     }

+ 2 - 2
crates/core/src/ranking/pipeline/modifiers/inbound_similarity.rs

@@ -16,7 +16,7 @@
 
 use crate::{
     ranking::{self, pipeline::RankableWebpage},
-    searcher::api,
+    searcher::ScoredWebpagePointer,
 };
 
 use super::Modifier;
@@ -26,7 +26,7 @@ const INBOUND_SIMILARITY_SMOOTHING: f64 = 8.0;
 pub struct InboundSimilarity;
 
 impl Modifier for InboundSimilarity {
-    type Webpage = api::ScoredWebpagePointer;
+    type Webpage = ScoredWebpagePointer;
 
     fn boost(&self, webpage: &Self::Webpage) -> f64 {
         webpage

+ 1 - 1
crates/core/src/ranking/pipeline/scorers/embedding.rs

@@ -25,7 +25,7 @@ use crate::{
         pipeline::{stages::StoredEmbeddings, RankableWebpage},
         SignalEnum,
     },
-    searcher::api::ScoredWebpagePointer,
+    searcher::ScoredWebpagePointer,
     Result,
 };
 

+ 1 - 1
crates/core/src/ranking/pipeline/scorers/inbound_similarity.rs

@@ -18,7 +18,7 @@ use std::sync::Mutex;
 
 use crate::{
     ranking::{self, inbound_similarity, pipeline::RankableWebpage},
-    searcher::api::ScoredWebpagePointer,
+    searcher::ScoredWebpagePointer,
 };
 
 use super::FullRankingStage;

+ 1 - 1
crates/core/src/ranking/pipeline/scorers/lambdamart.rs

@@ -20,7 +20,7 @@ use crate::{
         pipeline::{PrecisionRankingWebpage, RankableWebpage, Top},
         SignalCalculation, SignalEnum,
     },
-    searcher::api::ScoredWebpagePointer,
+    searcher::ScoredWebpagePointer,
 };
 use std::sync::Arc;
 

+ 3 - 3
crates/core/src/ranking/pipeline/scorers/term_distance.rs

@@ -18,7 +18,7 @@ use itertools::Itertools;
 
 use crate::ranking::pipeline::RankableWebpage;
 use crate::ranking::{self, SignalCalculation, SignalEnum};
-use crate::searcher::api;
+use crate::searcher::ScoredWebpagePointer;
 
 fn min_slop_two_positions(pos_a: &[u32], pos_b: &[u32]) -> u32 {
     let mut cur_min = u32::MAX;
@@ -61,7 +61,7 @@ fn score_slop(slop: f64) -> f64 {
 pub struct TitleDistanceScorer;
 
 impl super::RankingStage for TitleDistanceScorer {
-    type Webpage = api::ScoredWebpagePointer;
+    type Webpage = ScoredWebpagePointer;
 
     fn compute(&self, webpage: &Self::Webpage) -> (SignalEnum, SignalCalculation) {
         let min_slop = min_slop(webpage.as_local_recall().iter_title_positions()) as f64;
@@ -81,7 +81,7 @@ impl super::RankingStage for TitleDistanceScorer {
 pub struct BodyDistanceScorer;
 
 impl super::RankingStage for BodyDistanceScorer {
-    type Webpage = api::ScoredWebpagePointer;
+    type Webpage = ScoredWebpagePointer;
 
     fn compute(&self, webpage: &Self::Webpage) -> (SignalEnum, SignalCalculation) {
         let min_slop = min_slop(webpage.as_local_recall().iter_clean_body_positions()) as f64;

+ 9 - 7
crates/core/src/ranking/pipeline/stages/recall.rs

@@ -37,7 +37,7 @@ use crate::{
         SignalCalculation, SignalComputer, SignalEnum,
     },
     schema::{numerical_field, text_field},
-    searcher::{api, SearchQuery},
+    searcher::{ScoredWebpagePointer, SearchQuery},
     webgraph,
 };
 
@@ -296,7 +296,7 @@ impl collector::Doc for LocalRecallRankingWebpage {
     }
 }
 
-impl RankingPipeline<api::ScoredWebpagePointer> {
+impl RankingPipeline<ScoredWebpagePointer> {
     pub fn recall_stage(
         query: &SearchQuery,
         inbound: inbound_similarity::Scorer,
@@ -307,15 +307,17 @@ impl RankingPipeline<api::ScoredWebpagePointer> {
             .add_stage(term_distance::TitleDistanceScorer)
             .add_stage(term_distance::BodyDistanceScorer)
             .add_stage(
-                EmbeddingScorer::<api::ScoredWebpagePointer, TitleEmbeddings>::new(
+                EmbeddingScorer::<ScoredWebpagePointer, TitleEmbeddings>::new(
                     query.text().to_string(),
                     dual_encoder.clone(),
                 ),
             )
-            .add_stage(EmbeddingScorer::<
-                api::ScoredWebpagePointer,
-                KeywordEmbeddings,
-            >::new(query.text().to_string(), dual_encoder))
+            .add_stage(
+                EmbeddingScorer::<ScoredWebpagePointer, KeywordEmbeddings>::new(
+                    query.text().to_string(),
+                    dual_encoder,
+                ),
+            )
             .add_stage(InboundScorer::new(inbound))
             .add_modifier(modifiers::InboundSimilarity);
 

+ 16 - 145
crates/core/src/searcher/api/mod.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -55,32 +55,10 @@ use crate::{query, webgraph, Result};
 use self::sidebar::SidebarManager;
 use self::widget::WidgetManager;
 
-use super::{distributed, live, SearchQuery, SearchResult, WebsitesResult};
+use super::{distributed, ScoredWebpagePointer, SearchQuery, SearchResult, WebsitesResult};
 
 const NUM_PIPELINE_RANKING_RESULTS: usize = 300;
 
-#[derive(Clone)]
-pub enum ScoredWebpagePointer {
-    Normal(distributed::ScoredWebpagePointer),
-    Live(live::ScoredWebpagePointer),
-}
-
-impl ScoredWebpagePointer {
-    pub fn as_ranking(&self) -> &RecallRankingWebpage {
-        match self {
-            ScoredWebpagePointer::Normal(p) => &p.website,
-            ScoredWebpagePointer::Live(p) => &p.website,
-        }
-    }
-
-    pub fn as_ranking_mut(&mut self) -> &mut RecallRankingWebpage {
-        match self {
-            ScoredWebpagePointer::Normal(p) => &mut p.website,
-            ScoredWebpagePointer::Live(p) => &mut p.website,
-        }
-    }
-}
-
 impl RankableWebpage for ScoredWebpagePointer {
     fn set_raw_score(&mut self, score: f64) {
         self.as_ranking_mut().set_raw_score(score);
@@ -103,17 +81,11 @@ impl RankableWebpage for ScoredWebpagePointer {
     }
 
     fn signals(&self) -> &EnumMap<SignalEnum, SignalCalculation> {
-        match self {
-            ScoredWebpagePointer::Normal(p) => p.website.signals(),
-            ScoredWebpagePointer::Live(p) => p.website.signals(),
-        }
+        self.as_ranking().signals()
     }
 
     fn signals_mut(&mut self) -> &mut EnumMap<SignalEnum, SignalCalculation> {
-        match self {
-            ScoredWebpagePointer::Normal(p) => p.website.signals_mut(),
-            ScoredWebpagePointer::Live(p) => p.website.signals_mut(),
-        }
+        self.as_ranking_mut().signals_mut()
     }
 }
 
@@ -243,10 +215,9 @@ where
     }
 }
 
-pub struct ApiSearcher<S, L, G> {
+pub struct ApiSearcher<S, G> {
     distributed_searcher: Arc<S>,
     sidebar_manager: Option<SidebarManager>,
-    live_searcher: Option<L>,
     cross_encoder: Option<Arc<CrossEncoderModel>>,
     lambda_model: Option<Arc<LambdaMART>>,
     dual_encoder: Option<Arc<DualEncoder>>,
@@ -257,10 +228,9 @@ pub struct ApiSearcher<S, L, G> {
     webgraph: Option<G>,
 }
 
-impl<S, L, G> ApiSearcher<S, L, G>
+impl<S, G> ApiSearcher<S, G>
 where
     S: distributed::SearchClient,
-    L: live::SearchClient,
     G: Graph,
 {
     pub async fn new<C>(
@@ -284,7 +254,6 @@ where
         Self {
             distributed_searcher: dist_searcher,
             sidebar_manager,
-            live_searcher: None,
             cross_encoder: None,
             lambda_model: None,
             dual_encoder: None,
@@ -298,11 +267,6 @@ where
         }
     }
 
-    pub fn with_live(mut self, live_searcher: L) -> Self {
-        self.live_searcher = Some(live_searcher);
-        self
-    }
-
     pub fn with_cross_encoder(mut self, cross_encoder: CrossEncoderModel) -> Self {
         self.cross_encoder = Some(Arc::new(cross_encoder));
         self
@@ -436,64 +400,9 @@ where
         query: &str,
         top_websites: &[ScoredWebpagePointer],
     ) -> Vec<PrecisionRankingWebpage> {
-        let normal: Vec<_> = top_websites
-            .iter()
-            .enumerate()
-            .filter_map(|(i, pointer)| {
-                if let ScoredWebpagePointer::Normal(p) = pointer {
-                    Some((i, p.clone()))
-                } else {
-                    None
-                }
-            })
-            .collect();
-
-        let live: Vec<_> = top_websites
-            .iter()
-            .enumerate()
-            .filter_map(|(i, pointer)| {
-                if let ScoredWebpagePointer::Live(p) = pointer {
-                    Some((i, p.clone()))
-                } else {
-                    None
-                }
-            })
-            .collect();
-
-        let (retrieved_normal, retrieved_live) = tokio::join!(
-            self.distributed_searcher.retrieve_webpages(&normal, query),
-            self.retrieve_webpages_from_live(&live, query),
-        );
-
-        let mut retrieved_webpages: Vec<_> =
-            retrieved_normal.into_iter().chain(retrieved_live).collect();
-        retrieved_webpages.sort_by(|(a, _), (b, _)| a.cmp(b));
-
-        retrieved_webpages
-            .into_iter()
-            .map(|(_, webpage)| webpage)
-            .collect::<Vec<_>>()
-    }
-
-    async fn search_initial_from_live(
-        &self,
-        query: &SearchQuery,
-    ) -> Option<Vec<live::InitialSearchResultShard>> {
-        match &self.live_searcher {
-            Some(searcher) => Some(searcher.search_initial(query).await),
-            None => None,
-        }
-    }
-
-    async fn retrieve_webpages_from_live(
-        &self,
-        pointers: &[(usize, live::ScoredWebpagePointer)],
-        query: &str,
-    ) -> Vec<(usize, PrecisionRankingWebpage)> {
-        match &self.live_searcher {
-            Some(searcher) => searcher.retrieve_webpages(pointers, query).await,
-            None => vec![],
-        }
+        self.distributed_searcher
+            .retrieve_webpages(top_websites, query)
+            .await
     }
 
     async fn inbound_vecs(&self, ids: &[webgraph::NodeID]) -> Vec<bitvec_similarity::BitVec> {
@@ -507,7 +416,6 @@ where
         &self,
         query: &SearchQuery,
         initial_results: Vec<distributed::InitialSearchResultShard>,
-        live_results: Vec<live::InitialSearchResultShard>,
     ) -> (Vec<ScoredWebpagePointer>, bool) {
         let mut collector =
             BucketCollector::new(NUM_PIPELINE_RANKING_RESULTS, self.collector_config.clone());
@@ -518,17 +426,7 @@ where
             .map(|r| *r.host_id())
             .collect::<Vec<_>>();
 
-        let live_host_nodes = live_results
-            .iter()
-            .flat_map(|r| r.local_result.websites.iter())
-            .map(|r| *r.host_id())
-            .collect::<Vec<_>>();
-
-        let host_nodes = initial_host_nodes
-            .into_iter()
-            .chain(live_host_nodes)
-            .unique()
-            .collect::<Vec<_>>();
+        let host_nodes = initial_host_nodes.into_iter().unique().collect::<Vec<_>>();
 
         let inbound_vecs = if !query.fetch_backlinks() {
             HashMap::default()
@@ -555,26 +453,6 @@ where
                     shard: result.shard,
                 };
 
-                let pointer = ScoredWebpagePointer::Normal(pointer);
-
-                collector.insert(pointer);
-            }
-        }
-
-        for result in live_results {
-            num_results += result.local_result.websites.len();
-            for website in result.local_result.websites {
-                let inbound = inbound_vecs
-                    .get(website.host_id())
-                    .cloned()
-                    .unwrap_or_default();
-                let pointer = live::ScoredWebpagePointer {
-                    website: RecallRankingWebpage::new(website, inbound),
-                    shard_id: result.shard_id,
-                };
-
-                let pointer = ScoredWebpagePointer::Live(pointer);
-
                 collector.insert(pointer);
             }
         }
@@ -648,7 +526,7 @@ where
             .map(|result| result.local_result.num_websites)
             .fold(approx_count::Count::Exact(0), |acc, count| acc + count);
 
-        let (combined, _) = self.combine_results(query, results, vec![]).await;
+        let (combined, _) = self.combine_results(query, results).await;
         let combined: Vec<_> = combined.into_iter().take(query.num_results).collect();
 
         let mut retrieved_webpages: Vec<_> = self
@@ -696,24 +574,17 @@ where
             ..query.clone()
         };
 
-        let (initial_results, live_results) = tokio::join!(
-            self.distributed_searcher.search_initial(&search_query),
-            self.search_initial_from_live(&search_query),
-        );
+        let initial_results = self
+            .distributed_searcher
+            .search_initial(&search_query)
+            .await;
 
         let num_docs = initial_results
             .iter()
             .map(|result| result.local_result.num_websites)
-            .chain(live_results.iter().flat_map(|results| {
-                results
-                    .iter()
-                    .map(|result| result.local_result.num_websites)
-            }))
             .fold(approx_count::Count::Exact(0), |acc, count| acc + count);
 
-        let (top_websites, has_more_results) = self
-            .combine_results(query, initial_results, live_results.unwrap_or_default())
-            .await;
+        let (top_websites, has_more_results) = self.combine_results(query, initial_results).await;
 
         let inbound_scorer = self.inbound_scorer(query).await;
 

+ 2 - 3
crates/core/src/searcher/api/sidebar.rs

@@ -137,14 +137,13 @@ impl SidebarManager {
             tracing::debug!(?score, ?self.thresholds.stackoverflow, "stackoverflow score");
             if website.score() > self.thresholds.stackoverflow {
                 let website = RecallRankingWebpage::new(website, Default::default());
-                let scored_websites =
-                    vec![(0, distributed::ScoredWebpagePointer { website, shard })];
+                let scored_websites = vec![distributed::ScoredWebpagePointer { website, shard }];
                 let mut retrieved = self
                     .distributed_searcher
                     .retrieve_webpages(&scored_websites, &query.query)
                     .await;
 
-                if let Some((_, res)) = retrieved.pop() {
+                if let Some(res) = retrieved.pop() {
                     let res = res.into_retrieved_webpage();
                     return Ok(Some(create_stackoverflow_sidebar(
                         res.schema_org,

+ 33 - 22
crates/core/src/searcher/distributed.rs

@@ -28,9 +28,7 @@ use crate::{
         },
     },
     entrypoint::{
-        entity_search_server,
-        live_index::LiveIndexService,
-        search_server::{self, RetrieveReq, SearchService},
+        entity_search_server, live_index::LiveIndexService, search_server::{self, RetrieveReq, SearchService}
     },
     generic_query::{self, Collector},
     index::Index,
@@ -46,7 +44,7 @@ use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
 use itertools::Itertools;
 use std::future::Future;
 use thiserror::Error;
-use tokio::sync::Mutex;
+use tokio::sync::{Mutex, RwLock};
 
 use super::{InitialWebsiteResult, LocalSearcher, SearchQuery};
 
@@ -72,9 +70,9 @@ pub trait SearchClient {
 
     fn retrieve_webpages(
         &self,
-        top_websites: &[(usize, ScoredWebpagePointer)],
+        top_websites: &[ScoredWebpagePointer],
         query: &str,
-    ) -> impl Future<Output = Vec<(usize, PrecisionRankingWebpage)>> + Send;
+    ) -> impl Future<Output = Vec<PrecisionRankingWebpage>> + Send;
 
     fn search_initial_generic<Q>(
         &self,
@@ -183,6 +181,16 @@ pub struct ScoredWebpagePointer {
     pub shard: ShardId,
 }
 
+impl ScoredWebpagePointer {
+    pub fn as_ranking(&self) -> &RecallRankingWebpage {
+        &self.website
+    }
+
+    pub fn as_ranking_mut(&mut self) -> &mut RecallRankingWebpage {
+        &mut self.website
+    }
+}
+
 impl ShardIdentifier for ShardId {}
 
 #[derive(Debug)]
@@ -202,6 +210,10 @@ impl ReusableClientManager for SearchService {
         for member in cluster.members().await {
             if let Service::Searcher { host, shard } = member.service {
                 shards.entry(shard).or_insert_with(Vec::new).push(host);
+            } else if let Service::LiveIndex { search_host, shard, state, .. } = member.service {
+                if state == LiveIndexState::Ready {
+                    shards.entry(shard).or_insert_with(Vec::new).push(search_host);
+                }
             }
         }
 
@@ -252,7 +264,7 @@ impl ReusableClientManager for LiveIndexService {
     async fn new_client(cluster: &Cluster) -> ShardedClient<Self::Service, Self::ShardId> {
         let mut shards = HashMap::new();
         for member in cluster.members().await {
-            if let Service::LiveIndex { host, shard, state } = member.service {
+            if let Service::LiveIndex { host, shard, state, .. } = member.service {
                 if state == LiveIndexState::Ready {
                     shards.entry(shard).or_insert_with(Vec::new).push(host);
                 }
@@ -355,19 +367,19 @@ impl SearchClient for DistributedSearcher {
 
     async fn retrieve_webpages(
         &self,
-        top_websites: &[(usize, ScoredWebpagePointer)],
+        top_websites: &[ScoredWebpagePointer],
         query: &str,
-    ) -> Vec<(usize, PrecisionRankingWebpage)> {
+    ) -> Vec<PrecisionRankingWebpage> {
         let mut rankings = FnvHashMap::default();
         let mut pointers: HashMap<_, Vec<_>> = HashMap::new();
 
-        for (i, pointer) in top_websites {
+        for (i, pointer) in top_websites.iter().enumerate() {
             pointers
                 .entry(pointer.shard)
                 .or_default()
-                .push((*i, pointer.website.pointer().clone()));
+                .push((i, pointer.website.pointer().clone()));
 
-            rankings.insert(*i, pointer.website.clone());
+            rankings.insert(i, pointer.website.clone());
         }
 
         let client = self.conn().await;
@@ -387,8 +399,7 @@ impl SearchClient for DistributedSearcher {
         debug_assert_eq!(retrieved_webpages.len(), top_websites.len());
 
         retrieved_webpages.sort_by(|(a, _), (b, _)| a.cmp(b));
-
-        retrieved_webpages
+        retrieved_webpages.into_iter().map(|(_, v)| v).collect()
     }
 
     async fn search_initial_generic<Q>(
@@ -573,9 +584,9 @@ impl SearchClient for DistributedSearcher {
 }
 
 /// This should only be used for testing and benchmarks.
-pub struct LocalSearchClient(LocalSearcher<Arc<Index>>);
-impl From<LocalSearcher<Arc<Index>>> for LocalSearchClient {
-    fn from(searcher: LocalSearcher<Arc<Index>>) -> Self {
+pub struct LocalSearchClient(LocalSearcher<Arc<RwLock<Index>>>);
+impl From<LocalSearcher<Arc<RwLock<Index>>>> for LocalSearchClient {
+    fn from(searcher: LocalSearcher<Arc<RwLock<Index>>>) -> Self {
         Self(searcher)
     }
 }
@@ -592,12 +603,12 @@ impl SearchClient for LocalSearchClient {
 
     async fn retrieve_webpages(
         &self,
-        top_websites: &[(usize, ScoredWebpagePointer)],
+        top_websites: &[ScoredWebpagePointer],
         query: &str,
-    ) -> Vec<(usize, PrecisionRankingWebpage)> {
+    ) -> Vec<PrecisionRankingWebpage> {
         let pointers = top_websites
             .iter()
-            .map(|(_, p)| p.website.pointer().clone())
+            .map(|p| p.website.pointer().clone())
             .collect::<Vec<_>>();
 
         let res = self
@@ -606,8 +617,8 @@ impl SearchClient for LocalSearchClient {
             .await
             .unwrap()
             .into_iter()
-            .zip(top_websites.iter().map(|(i, p)| (*i, p.website.clone())))
-            .map(|(ret, (i, ran))| (i, PrecisionRankingWebpage::new(ret, ran)))
+            .zip(top_websites.iter().map(|p| p.website.clone()))
+            .map(|(ret, ran)| PrecisionRankingWebpage::new(ret, ran))
             .collect::<Vec<_>>();
 
         res

+ 0 - 194
crates/core/src/searcher/live.rs

@@ -1,194 +0,0 @@
-// Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program.  If not, see <https://www.gnu.org/licenses/>.
-
-use crate::{
-    distributed::{
-        cluster::Cluster,
-        member::{LiveIndexState, Service},
-        sonic::replication::{
-            AllShardsSelector, RandomReplicaSelector, RemoteClient, ReplicatedClient, Shard,
-            ShardedClient, SpecificShardSelector,
-        },
-    },
-    entrypoint::search_server::{self, SearchService},
-    inverted_index::{RetrievedWebpage, ShardId, WebpagePointer},
-    ranking::pipeline::{PrecisionRankingWebpage, RecallRankingWebpage},
-};
-
-use std::future::Future;
-use std::{collections::HashMap, sync::Arc};
-
-use fnv::FnvHashMap;
-use futures::future::join_all;
-use itertools::Itertools;
-
-use super::{InitialWebsiteResult, SearchQuery};
-
-#[derive(Clone, Debug)]
-pub struct ScoredWebpagePointer {
-    pub website: RecallRankingWebpage,
-    pub shard_id: ShardId,
-}
-
-#[derive(Debug)]
-pub struct InitialSearchResultShard {
-    pub local_result: InitialWebsiteResult,
-    pub shard_id: ShardId,
-}
-
-pub struct LiveSearcher {
-    cluster: Arc<Cluster>,
-}
-
-impl LiveSearcher {
-    pub fn new(cluster: Arc<Cluster>) -> Self {
-        Self { cluster }
-    }
-
-    async fn client(&self) -> ShardedClient<SearchService, ShardId> {
-        let mut shards = HashMap::new();
-        for member in self.cluster.members().await {
-            if let Service::LiveIndex { host, shard, state } = member.service {
-                if matches!(state, LiveIndexState::Ready) {
-                    shards.entry(shard).or_insert_with(Vec::new).push(host);
-                }
-            }
-        }
-
-        let mut shard_clients = Vec::new();
-
-        for (id, replicas) in shards {
-            let replicated =
-                ReplicatedClient::new(replicas.into_iter().map(RemoteClient::new).collect());
-            let shard = Shard::new(id, replicated);
-            shard_clients.push(shard);
-        }
-
-        ShardedClient::new(shard_clients)
-    }
-
-    async fn retrieve_webpages_from_shard(
-        &self,
-        shard: ShardId,
-        client: &ShardedClient<SearchService, ShardId>,
-        query: &str,
-        pointers: Vec<(usize, WebpagePointer)>,
-    ) -> Vec<(usize, RetrievedWebpage)> {
-        let (idxs, pointers): (Vec<usize>, Vec<WebpagePointer>) = pointers.into_iter().unzip();
-
-        match client
-            .send(
-                search_server::RetrieveWebsites {
-                    websites: pointers,
-                    query: query.to_string(),
-                },
-                &SpecificShardSelector(shard),
-                &RandomReplicaSelector,
-            )
-            .await
-        {
-            Ok(v) => v
-                .into_iter()
-                .flatten()
-                .flat_map(|(_, v)| v.into_iter().map(|(_, v)| v))
-                .flatten()
-                .flatten()
-                .zip_eq(idxs)
-                .map(|(v, i)| (i, v))
-                .collect(),
-            _ => vec![],
-        }
-    }
-}
-
-impl SearchClient for LiveSearcher {
-    async fn search_initial(&self, query: &SearchQuery) -> Vec<InitialSearchResultShard> {
-        let client = self.client().await;
-        let mut results = Vec::new();
-
-        if let Ok(res) = client
-            .send(
-                search_server::Search {
-                    query: query.clone(),
-                },
-                &AllShardsSelector,
-                &RandomReplicaSelector,
-            )
-            .await
-        {
-            for (shard_id, mut res) in res.into_iter().flatten() {
-                if let Some((_, Some(res))) = res.pop() {
-                    results.push(InitialSearchResultShard {
-                        local_result: res,
-                        shard_id,
-                    });
-                }
-            }
-        }
-
-        results
-    }
-
-    async fn retrieve_webpages(
-        &self,
-        top_websites: &[(usize, ScoredWebpagePointer)],
-        query: &str,
-    ) -> Vec<(usize, PrecisionRankingWebpage)> {
-        let mut rankings = FnvHashMap::default();
-        let mut pointers: HashMap<_, Vec<_>> = HashMap::new();
-
-        for (i, pointer) in top_websites {
-            pointers
-                .entry(pointer.shard_id)
-                .or_default()
-                .push((*i, pointer.website.pointer().clone()));
-
-            rankings.insert(*i, pointer.website.clone());
-        }
-
-        let client = self.client().await;
-        let mut futures = Vec::new();
-        for (shard, pointers) in pointers {
-            futures.push(self.retrieve_webpages_from_shard(shard, &client, query, pointers));
-        }
-
-        let mut retrieved_webpages = Vec::new();
-        for pages in join_all(futures).await {
-            for (i, page) in pages {
-                retrieved_webpages
-                    .push((i, PrecisionRankingWebpage::new(page, rankings[&i].clone())));
-            }
-        }
-
-        debug_assert_eq!(retrieved_webpages.len(), top_websites.len());
-
-        retrieved_webpages.sort_by(|(a, _), (b, _)| a.cmp(b));
-
-        retrieved_webpages
-    }
-}
-
-pub trait SearchClient {
-    fn search_initial(
-        &self,
-        query: &SearchQuery,
-    ) -> impl Future<Output = Vec<InitialSearchResultShard>> + Send;
-    fn retrieve_webpages(
-        &self,
-        top_websites: &[(usize, ScoredWebpagePointer)],
-        query: &str,
-    ) -> impl Future<Output = Vec<(usize, PrecisionRankingWebpage)>> + Send;
-}

+ 3 - 31
crates/core/src/searcher/local/guard.rs

@@ -14,48 +14,20 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-use std::sync::Arc;
 use tokio::sync::OwnedRwLockReadGuard;
 
 use crate::index::Index;
 use crate::inverted_index::InvertedIndex;
-use crate::live_index;
 
-pub trait SearchGuard: Send + Sync {
+pub trait ReadGuard: Send + Sync {
     fn search_index(&self) -> &Index;
     fn inverted_index(&self) -> &InvertedIndex {
         &self.search_index().inverted_index
     }
 }
 
-pub struct NormalIndexSearchGuard {
-    search_index: Arc<Index>,
-}
-
-impl NormalIndexSearchGuard {
-    pub fn new(search_index: Arc<Index>) -> Self {
-        Self { search_index }
-    }
-}
-
-impl SearchGuard for NormalIndexSearchGuard {
-    fn search_index(&self) -> &Index {
-        self.search_index.as_ref()
-    }
-}
-
-pub struct LiveIndexSearchGuard {
-    lock_guard: OwnedRwLockReadGuard<live_index::index::InnerIndex>,
-}
-
-impl LiveIndexSearchGuard {
-    pub fn new(lock_guard: OwnedRwLockReadGuard<live_index::index::InnerIndex>) -> Self {
-        Self { lock_guard }
-    }
-}
-
-impl SearchGuard for LiveIndexSearchGuard {
+impl ReadGuard for OwnedRwLockReadGuard<Index> {
     fn search_index(&self) -> &Index {
-        self.lock_guard.index()
+        self
     }
 }

+ 11 - 11
crates/core/src/searcher/local/inner.rs

@@ -28,7 +28,7 @@ use crate::{
     ranking::models::linear::LinearRegression, search_ctx::Ctx, searcher::SearchQuery,
 };
 
-use super::{InvertedIndexResult, SearchGuard, SearchableIndex};
+use super::{InvertedIndexResult, ReadGuard, SearchableIndex};
 
 pub struct InnerLocalSearcher<I: SearchableIndex> {
     index: I,
@@ -50,8 +50,8 @@ where
         }
     }
 
-    pub async fn guard(&self) -> I::SearchGuard {
-        self.index.guard().await
+    pub async fn guard(&self) -> I::ReadGuard {
+        self.index.read_guard().await
     }
 
     pub fn set_linear_model(&mut self, model: LinearRegression) {
@@ -66,7 +66,7 @@ where
         self.collector_config = config;
     }
 
-    fn parse_query<G: SearchGuard>(
+    fn parse_query<G: ReadGuard>(
         &self,
         ctx: &Ctx,
         guard: &G,
@@ -75,7 +75,7 @@ where
         Query::parse(ctx, query, guard.inverted_index())
     }
 
-    fn ranker<G: SearchGuard>(
+    fn ranker<G: ReadGuard>(
         &self,
         query: &Query,
         guard: &G,
@@ -99,7 +99,7 @@ where
             .with_offset(query.offset()))
     }
 
-    fn search_inverted_index<G: SearchGuard>(
+    fn search_inverted_index<G: ReadGuard>(
         &self,
         ctx: &Ctx,
         guard: &G,
@@ -149,7 +149,7 @@ where
     pub fn search_initial(
         &self,
         query: &SearchQuery,
-        guard: &I::SearchGuard,
+        guard: &I::ReadGuard,
         de_rank_similar: bool,
     ) -> Result<InitialWebsiteResult> {
         let query = query.clone();
@@ -168,7 +168,7 @@ where
         &self,
         websites: &[inverted_index::WebpagePointer],
         query: &str,
-        guard: &I::SearchGuard,
+        guard: &I::ReadGuard,
     ) -> Result<Vec<inverted_index::RetrievedWebpage>> {
         let ctx = guard.inverted_index().local_search_ctx();
         let query = SearchQuery {
@@ -183,7 +183,7 @@ where
     pub fn search_initial_generic<Q: GenericQuery>(
         &self,
         query: &Q,
-        guard: &I::SearchGuard,
+        guard: &I::ReadGuard,
     ) -> Result<<Q::Collector as generic_query::Collector>::Fruit> {
         guard.inverted_index().search_initial_generic(query)
     }
@@ -192,7 +192,7 @@ where
         &self,
         query: &Q,
         fruit: <Q::Collector as generic_query::Collector>::Fruit,
-        guard: &I::SearchGuard,
+        guard: &I::ReadGuard,
     ) -> Result<Q::IntermediateOutput> {
         guard.inverted_index().retrieve_generic(query, fruit)
     }
@@ -200,7 +200,7 @@ where
     pub fn search_generic<Q: GenericQuery>(
         &self,
         query: Q,
-        guard: &I::SearchGuard,
+        guard: &I::ReadGuard,
     ) -> Result<Q::Output> {
         let fruit = self.search_initial_generic(&query, guard)?;
         Ok(Q::merge_results(vec![

+ 10 - 17
crates/core/src/searcher/local/mod.rs

@@ -15,10 +15,11 @@
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 mod guard;
-use guard::{LiveIndexSearchGuard, NormalIndexSearchGuard, SearchGuard};
+use guard::ReadGuard;
 
 mod inner;
 use inner::InnerLocalSearcher;
+use tokio::sync::{OwnedRwLockReadGuard, RwLock};
 
 use std::collections::HashMap;
 use std::future::Future;
@@ -38,30 +39,22 @@ use crate::ranking::pipeline::{
 };
 use crate::ranking::{SignalEnum, SignalScore};
 use crate::search_prettifier::DisplayedWebpage;
-use crate::{inverted_index, live_index, Result};
+use crate::{inverted_index, Result};
 
 use super::WebsitesResult;
 use super::{InitialWebsiteResult, SearchQuery};
 
 pub trait SearchableIndex: Send + Sync + 'static {
-    type SearchGuard: SearchGuard;
+    type ReadGuard: ReadGuard;
 
-    fn guard(&self) -> impl Future<Output = Self::SearchGuard>;
+    fn read_guard(&self) -> impl Future<Output = Self::ReadGuard>;
 }
 
-impl SearchableIndex for Arc<Index> {
-    type SearchGuard = NormalIndexSearchGuard;
+impl SearchableIndex for Arc<RwLock<Index>> {
+    type ReadGuard = OwnedRwLockReadGuard<Index>;
 
-    async fn guard(&self) -> Self::SearchGuard {
-        NormalIndexSearchGuard::new(self.clone())
-    }
-}
-
-impl SearchableIndex for Arc<live_index::LiveIndex> {
-    type SearchGuard = LiveIndexSearchGuard;
-
-    async fn guard(&self) -> Self::SearchGuard {
-        LiveIndexSearchGuard::new(self.read().await)
+    async fn read_guard(&self) -> Self::ReadGuard {
+        self.clone().read_owned().await
     }
 }
 
@@ -306,7 +299,7 @@ mod tests {
 
         index.commit().unwrap();
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         for p in 0..NUM_PAGES {
             let urls: Vec<_> = searcher

+ 1 - 2
crates/core/src/searcher/mod.rs

@@ -1,5 +1,5 @@
 // Stract is an open source web search engine.
-// Copyright (C) 2023 Stract ApS
+// Copyright (C) 2024 Stract ApS
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Affero General Public License as
@@ -16,7 +16,6 @@
 
 pub mod api;
 pub mod distributed;
-pub mod live;
 pub mod local;
 
 pub use distributed::*;

+ 4 - 3
crates/core/src/snippet.rs

@@ -359,6 +359,7 @@ mod tests {
         webpage::Webpage,
     };
     use proptest::prelude::*;
+    use tokio::sync::RwLock;
 
     const TEST_TEXT: &str = r#"Rust is a systems programming language sponsored by
 Mozilla which describes it as a "safe, concurrent, practical language", supporting functional and
@@ -415,7 +416,7 @@ Survey in 2016, 2017, and 2018."#;
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {
@@ -454,7 +455,7 @@ Survey in 2016, 2017, and 2018."#;
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {
@@ -493,7 +494,7 @@ Survey in 2016, 2017, and 2018."#;
             .expect("failed to insert webpage");
         index.commit().expect("failed to commit index");
 
-        let searcher = LocalSearcher::builder(Arc::new(index)).build();
+        let searcher = LocalSearcher::builder(Arc::new(RwLock::new(index))).build();
 
         let result = searcher
             .search_sync(&SearchQuery {

+ 4 - 4
crates/core/src/webgraph/store.rs

@@ -113,7 +113,7 @@ impl EdgeStore {
         let mut segments_to_merge = Vec::new();
 
         for segment in segments {
-            if num_docs.saturating_add(segment.max_doc()) > tantivy::TERMINATED {
+            if num_docs.saturating_add(segment.max_doc()) >= tantivy::TERMINATED {
                 let segments_to_merge = std::mem::take(&mut segments_to_merge);
                 num_docs = 0;
 
@@ -127,10 +127,10 @@ impl EdgeStore {
                         1,
                     )?;
                 }
-            } else {
-                num_docs += segment.max_doc();
-                segments_to_merge.push(segment);
             }
+
+            num_docs = num_docs.saturating_add(segment.max_doc());
+            segments_to_merge.push(segment);
         }
 
         if !segments_to_merge.is_empty() {

+ 1 - 0
crates/file-store/src/temp.rs

@@ -21,6 +21,7 @@ use std::{
 
 use crate::gen_temp_path;
 
+#[derive(Debug)]
 pub struct TempDir {
     path: std::path::PathBuf,
 }

+ 2 - 3
crates/simple-wal/src/lib.rs

@@ -85,9 +85,8 @@ pub struct WalIterator<T> {
 
 impl<T> WalIterator<T> {
     pub fn open<P: AsRef<Path>>(file: P) -> Result<Self> {
-        Ok(Self {
-            iter: file_store::iterable::IterableStoreReader::open(file)?,
-        })
+        let iter = file_store::iterable::IterableStoreReader::open(file)?;
+        Ok(Self { iter })
     }
 }