Procházet zdrojové kódy

Overview docs (#73)

* Begin overview documentation in mdbook format

* Overview of the different docs

* Move overview documentation to mkdocs

* Reduce webgraph segment merges by introducing a webgraph commit mode that commits the live segment directly to the stored segment

* Parallel harmonic centrality calculations

* Even more parallelism in harmonic centrality calculations

* Way faster hyperloglog but also less accurate

* Dynamic exact counting threshold proportional to size of graph

* improve inbound similarity speed and fix hyperloglog out-of-bounds bug

* no need to load all nodes into memory for harmonic centrality

* Use rayon directly in indexer.
Hopefully this fixes the bug where the indexer takes a new job before it has finished the first one. I think what happened was that the indexer thread took a new job when hitting the webgraph executor.

* single threaded webgraph when indexing

* No need for node2id anymore

* Use single thread in tantviy by default.
We introduce a method to optmize the index for search, which currently just sets the tantivy executor to be multithreaded. This should improve the indexing performance.

* Reduce memory arena in tantivy

* try jemalloc

* Revert tantivy memory arena reduction. Caused too many files to be created when indexing warc files
Mikkel Denker před 2 roky
rodič
revize
36f22e801e

+ 138 - 5
Cargo.lock

@@ -796,7 +796,7 @@ dependencies = [
  "bitflags",
  "clap_derive 3.2.18",
  "clap_lex 0.2.4",
- "indexmap",
+ "indexmap 1.9.3",
  "once_cell",
  "strsim",
  "termcolor",
@@ -1209,6 +1209,7 @@ dependencies = [
  "lock_api",
  "once_cell",
  "parking_lot_core",
+ "rayon",
 ]
 
 [[package]]
@@ -1370,6 +1371,12 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "equivalent"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
+
 [[package]]
 name = "errno"
 version = "0.3.1"
@@ -1783,7 +1790,7 @@ dependencies = [
  "futures-sink",
  "futures-util",
  "http",
- "indexmap",
+ "indexmap 1.9.3",
  "slab",
  "tokio",
  "tokio-util",
@@ -1824,6 +1831,12 @@ dependencies = [
  "ahash 0.8.3",
 ]
 
+[[package]]
+name = "hashbrown"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
+
 [[package]]
 name = "heck"
 version = "0.3.3"
@@ -2065,6 +2078,17 @@ dependencies = [
  "hashbrown 0.12.3",
 ]
 
+[[package]]
+name = "indexmap"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
+dependencies = [
+ "equivalent",
+ "hashbrown 0.14.0",
+ "serde",
+]
+
 [[package]]
 name = "indicatif"
 version = "0.15.0"
@@ -2212,7 +2236,7 @@ version = "0.8.1"
 dependencies = [
  "cssparser",
  "html5ever",
- "indexmap",
+ "indexmap 1.9.3",
  "matches",
  "selectors",
  "tempfile",
@@ -2309,6 +2333,8 @@ dependencies = [
  "glob",
  "libc",
  "libz-sys",
+ "pkg-config",
+ "tikv-jemalloc-sys",
  "zstd-sys",
 ]
 
@@ -3047,7 +3073,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
 dependencies = [
  "fixedbitset",
- "indexmap",
+ "indexmap 1.9.3",
 ]
 
 [[package]]
@@ -3713,6 +3739,41 @@ dependencies = [
  "librocksdb-sys",
 ]
 
+[[package]]
+name = "rust-embed"
+version = "6.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a36224c3276f8c4ebc8c20f158eca7ca4359c8db89991c4925132aaaf6702661"
+dependencies = [
+ "rust-embed-impl",
+ "rust-embed-utils",
+ "walkdir",
+]
+
+[[package]]
+name = "rust-embed-impl"
+version = "6.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "49b94b81e5b2c284684141a2fb9e2a31be90638caf040bf9afbc5a0416afe1ac"
+dependencies = [
+ "proc-macro2 1.0.56",
+ "quote 1.0.26",
+ "rust-embed-utils",
+ "shellexpand",
+ "syn 2.0.15",
+ "walkdir",
+]
+
+[[package]]
+name = "rust-embed-utils"
+version = "7.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d38ff6bf570dc3bb7100fce9f7b60c33fa71d80e88da3f2580df4ff2bdded74"
+dependencies = [
+ "sha2",
+ "walkdir",
+]
+
 [[package]]
 name = "rust-ini"
 version = "0.18.0"
@@ -4110,6 +4171,15 @@ dependencies = [
  "lazy_static",
 ]
 
+[[package]]
+name = "shellexpand"
+version = "2.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7ccc8076840c4da029af4f87e4e8daeb0fca6b87bbb02e10cb60b791450e11e4"
+dependencies = [
+ "dirs",
+]
+
 [[package]]
 name = "shlex"
 version = "1.1.0"
@@ -4292,6 +4362,7 @@ dependencies = [
  "tantivy",
  "tch",
  "thiserror",
+ "tikv-jemallocator",
  "tokenizers",
  "tokio",
  "tokio-stream",
@@ -4302,6 +4373,8 @@ dependencies = [
  "tracing-subscriber",
  "url",
  "urlencoding",
+ "utoipa",
+ "utoipa-swagger-ui",
  "uuid",
  "whatlang",
 ]
@@ -4632,6 +4705,26 @@ dependencies = [
  "weezl",
 ]
 
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.5.4+5.3.0-patched"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
 [[package]]
 name = "time"
 version = "0.1.45"
@@ -4827,7 +4920,7 @@ version = "0.19.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "239410c8609e8125456927e6707163a3b1fdb40561e4b803bc041f466ccfdc13"
 dependencies = [
- "indexmap",
+ "indexmap 1.9.3",
  "toml_datetime",
  "winnow",
 ]
@@ -5110,6 +5203,46 @@ version = "0.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
 
+[[package]]
+name = "utoipa"
+version = "3.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de634b7f8178c9c246c88ea251f3a0215c9a4d80778db2d7bd4423a78b5170ec"
+dependencies = [
+ "indexmap 2.0.0",
+ "serde",
+ "serde_json",
+ "utoipa-gen",
+]
+
+[[package]]
+name = "utoipa-gen"
+version = "3.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fcba79cb3e5020d9bcc8313cd5aadaf51d6d54a6b3fd08c3d0360ae6b3c83d0"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2 1.0.56",
+ "quote 1.0.26",
+ "syn 2.0.15",
+]
+
+[[package]]
+name = "utoipa-swagger-ui"
+version = "3.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4602d7100d3cfd8a086f30494e68532402ab662fa366c9d201d677e33cee138d"
+dependencies = [
+ "axum",
+ "mime_guess",
+ "regex",
+ "rust-embed",
+ "serde",
+ "serde_json",
+ "utoipa",
+ "zip",
+]
+
 [[package]]
 name = "uuid"
 version = "1.3.1"

+ 5 - 2
Cargo.toml

@@ -43,7 +43,7 @@ html-escape = "0.2.11"
 logos = "0.12.1"
 csv = "1.1.6"
 fst = { version = "0.4.7", features = ["levenshtein"] }
-rocksdb = "0.19.0"
+rocksdb = { version = "0.19.0", features = ["default", "jemalloc", "io-uring"] }
 image = "0.24.3"
 chrono = {version = "0.4.23", features = ["serde"] }
 uuid = "1.1.2"
@@ -79,7 +79,7 @@ scylla = "0.7.0"
 chitchat = "0.5.0"
 memmap2 = "0.5.10"
 rkyv =  "0.7.41"
-dashmap = "5.4.0"
+dashmap = { version = "5.4.0", features = ["rayon"] }
 tch = "0.13.0"
 torch-sys = "0.13.0"
 libc = "0.2.142"
@@ -95,4 +95,7 @@ anyhow = { version = "1.0.72", features = ["backtrace"] }
 bytemuck = { version = "1.13.1", features = ["derive"] }
 proptest = "1.2.0"
 proptest-derive = "0.3.0"
+utoipa = "3.4.4"
+utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
+tikv-jemallocator = "0.5"
 

+ 9 - 0
core/Cargo.toml

@@ -93,6 +93,11 @@ lru = { workspace = true }
 url = { workspace = true }
 anyhow = { workspace = true }
 bytemuck = { workspace = true }
+utoipa = { workspace = true }
+utoipa-swagger-ui = { workspace = true }
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = { workspace = true }
 
 [build-dependencies]
 lalrpop = { workspace = true }
@@ -134,3 +139,7 @@ harness = false
 [[bench]]
 name = "similar-sites"
 harness = false
+
+[[bench]]
+name = "hyperloglog"
+harness = false

+ 21 - 0
core/benches/hyperloglog.rs

@@ -0,0 +1,21 @@
+use criterion::{criterion_group, criterion_main, Criterion};
+use stract::hyperloglog::HyperLogLog;
+
+pub fn criterion_benchmark(c: &mut Criterion) {
+    c.bench_function("Hyperloglog", |b| {
+        b.iter(|| {
+            let mut log: HyperLogLog<128> = HyperLogLog::default();
+            for i in 0..10_000_000 {
+                log.add(i);
+                log.size();
+            }
+
+            for _ in 0..1_000_000_000 {
+                log.size();
+            }
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);

+ 47 - 0
core/src/api/docs.rs

@@ -0,0 +1,47 @@
+// Stract is an open source web search engine.
+// Copyright (C) 2023 Stract ApS
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+use axum::Router;
+use utoipa::{Modify, OpenApi};
+use utoipa_swagger_ui::SwaggerUi;
+
+#[derive(OpenApi)]
+#[openapi(
+        paths(),
+        modifiers(&ApiModifier),
+        tags(
+            (name = "stract"),
+        )
+    )]
+struct ApiDoc;
+
+struct ApiModifier;
+
+impl Modify for ApiModifier {
+    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
+        openapi.info.description = Some(
+            r#"Stract is an open source web search engine. The API is totally free while in beta, but some endpoints will most likely be paid by consumption in the future.
+The API might also change quite a bit during the beta period, but we will try to keep it as stable as possible. We look forward to see what you will build!"#.to_string(),
+        );
+    }
+}
+
+pub fn router<S: Clone + Send + Sync + 'static, B: axum::body::HttpBody + Send + Sync + 'static>(
+) -> impl Into<Router<S, B>> {
+    SwaggerUi::new("/beta/api/docs")
+        .url("/beta/api/docs/openapi.json", ApiDoc::openapi())
+        .config(utoipa_swagger_ui::Config::default().use_base_layout())
+}

+ 10 - 10
core/src/api/mod.rs

@@ -57,6 +57,7 @@ mod alice;
 mod autosuggest;
 mod chat;
 mod crawler;
+mod docs;
 mod explore;
 mod fact_check;
 pub mod improvement;
@@ -206,17 +207,16 @@ pub async fn router(config: &FrontendConfig, counters: Counters) -> Result<Route
         )
         .fallback(get_service(ServeDir::new("frontend/dist/")))
         .layer(CompressionLayer::new())
-        .merge(
+        .merge(docs::router())
+        .nest(
+            "/beta",
             Router::new()
-                .route("/beta/api/summarize", get(summarize::route))
-                .route(
-                    "/beta/api/webgraph/similar_sites",
-                    post(webgraph::similar_sites),
-                )
-                .route("/beta/api/webgraph/knows_site", get(webgraph::knows_site))
-                .route("/beta/api/alice", get(alice::route))
-                .route("/beta/api/alice/save_state", post(alice::save_state))
-                .route("/beta/api/fact_check", post(fact_check::route)),
+                .route("/api/summarize", get(summarize::route))
+                .route("/api/webgraph/similar_sites", post(webgraph::similar_sites))
+                .route("/api/webgraph/knows_site", get(webgraph::knows_site))
+                .route("/api/alice", get(alice::route))
+                .route("/api/alice/save_state", post(alice::save_state))
+                .route("/api/fact_check", post(fact_check::route)),
         )
         .with_state(state))
 }

+ 8 - 8
core/src/entrypoint/configure.rs

@@ -61,12 +61,8 @@ fn download_files() {
 
 fn create_webgraph() -> Result<()> {
     debug!("Creating webgraph");
-    let out_path_tmp = Path::new(DATA_PATH).join("webgraph_tmp");
     let out_path = Path::new(DATA_PATH).join("webgraph");
 
-    if out_path_tmp.exists() {
-        std::fs::remove_dir_all(&out_path_tmp)?;
-    }
     if out_path.exists() {
         std::fs::remove_dir_all(&out_path)?;
     }
@@ -79,14 +75,18 @@ fn create_webgraph() -> Result<()> {
             names: vec![warc_path.to_str().unwrap().to_string()],
         }),
         warc_paths: vec![warc_path.to_str().unwrap().to_string()],
-        graph_base_path: out_path_tmp.to_str().unwrap().to_string(),
+        graph_base_path: out_path.to_str().unwrap().to_string(),
         level: crate::config::WebgraphLevel::Host,
     };
 
-    let worker = webgraph::WebgraphWorker { redirect: None };
-    let graph = worker.process_job(&job);
+    let mut worker = webgraph::WebgraphWorker {
+        redirect: None,
+        graph: webgraph::open_graph(&job.graph_base_path),
+    };
+
+    worker.process_job(&job);
+    let graph = worker.graph;
     std::fs::rename(graph.path, out_path)?;
-    std::fs::remove_dir_all(&out_path_tmp)?;
 
     Ok(())
 }

+ 133 - 145
core/src/entrypoint/indexer.rs

@@ -14,6 +14,7 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 use chrono::Utc;
+use rayon::prelude::*;
 use std::path::Path;
 use std::thread;
 use url::Url;
@@ -25,12 +26,10 @@ use tracing::{debug, info, trace, warn};
 
 use crate::config;
 use crate::entrypoint::download_all_warc_files;
-use crate::executor::Executor;
 use crate::index::{FrozenIndex, Index};
 use crate::mapreduce::{Map, Reduce, Worker};
 use crate::ranking::centrality_store::IndexerCentralityStore;
 use crate::ranking::SignalAggregator;
-use crate::warc::WarcFile;
 use crate::webgraph::{Node, Webgraph, WebgraphBuilder};
 use crate::webpage::{Html, Link, Webpage};
 use crate::{human_website_annotations, Result};
@@ -90,7 +89,7 @@ impl IndexingWorker {
         Self {
             host_centrality_store: IndexerCentralityStore::open(host_centrality_store_path),
             page_centrality_store: page_centrality_store_path.map(IndexerCentralityStore::open),
-            webgraph: webgraph_path.map(|path| WebgraphBuilder::new(path).open()),
+            webgraph: webgraph_path.map(|path| WebgraphBuilder::new(path).single_threaded().open()),
             topics: topics_path.map(|path| human_website_annotations::Mapper::open(path).unwrap()),
         }
     }
@@ -115,162 +114,150 @@ pub fn process_job(job: &Job, worker: &IndexingWorker) -> Index {
     let current_timestamp = Utc::now().timestamp().max(0) as usize;
 
     for file in warc_files.by_ref() {
-        let name = file.split('/').last().unwrap();
-        let path = Path::new(&job.base_path).join("warc_files").join(name);
-
-        if let Ok(file) = WarcFile::open(&path) {
-            for record in
-                file.records()
-                    .flatten()
-                    .filter(|record| match &record.response.payload_type {
-                        Some(payload_type) => !matches!(payload_type.as_str(), "application/pdf"),
-                        None => true,
-                    })
-            {
-                let mut html =
-                    match Html::parse_without_text(&record.response.body, &record.request.url) {
-                        Ok(html) => html,
-                        Err(err) => {
-                            debug!("error parsing html: {:?}", err);
-                            continue;
-                        }
-                    };
+        for record in
+            file.records()
+                .flatten()
+                .filter(|record| match &record.response.payload_type {
+                    Some(payload_type) => !matches!(payload_type.as_str(), "application/pdf"),
+                    None => true,
+                })
+        {
+            let mut html =
+                match Html::parse_without_text(&record.response.body, &record.request.url) {
+                    Ok(html) => html,
+                    Err(err) => {
+                        debug!("error parsing html: {:?}", err);
+                        continue;
+                    }
+                };
 
-                if html.is_no_index() {
-                    continue;
-                }
+            if html.is_no_index() {
+                continue;
+            }
 
-                let node = Node::from(html.url());
-                let node_id = worker
-                    .host_centrality_store
-                    .node2id
-                    .get(&node.clone().into_host());
+            let title = html.title().unwrap_or_default();
+            if title.is_empty() || title.chars().all(|c| c.is_whitespace()) {
+                continue;
+            }
 
-                let host_centrality = node_id
-                    .and_then(|node_id| worker.host_centrality_store.harmonic.get(&node_id))
-                    .unwrap_or_default();
+            let node = Node::from(html.url());
+            let host_node_id = node.clone().into_host().id();
 
-                if let Some(host_centrality_threshold) = job.host_centrality_threshold {
-                    if host_centrality < host_centrality_threshold {
-                        debug!("skipping due to low host_centrality value");
-                        continue;
-                    }
+            let host_centrality = worker
+                .host_centrality_store
+                .harmonic
+                .get(&host_node_id)
+                .unwrap_or_default();
+
+            if let Some(host_centrality_threshold) = job.host_centrality_threshold {
+                if host_centrality < host_centrality_threshold {
+                    debug!("skipping due to low host_centrality value");
+                    continue;
                 }
+            }
 
-                html.parse_text();
-                if let Some(minimum_clean_words) = job.minimum_clean_words {
-                    match html.clean_text() {
-                        Some(clean_text) => {
-                            if clean_text.split_whitespace().count() < minimum_clean_words {
-                                continue;
-                            }
+            html.parse_text();
+            if let Some(minimum_clean_words) = job.minimum_clean_words {
+                match html.clean_text() {
+                    Some(clean_text) => {
+                        if clean_text.split_whitespace().count() < minimum_clean_words {
+                            continue;
                         }
-                        None => continue,
                     }
+                    None => continue,
                 }
+            }
 
-                let backlinks: Vec<Link> = worker
-                    .webgraph
-                    .as_ref()
-                    .map(|webgraph| {
-                        webgraph
-                            .ingoing_edges(Node::from(html.url()))
-                            .into_iter()
-                            .map(|edge| Link {
-                                source: Url::parse(
-                                    &("http://".to_string() + edge.from.name.as_str()),
-                                )
-                                .unwrap(),
-                                destination: Url::parse(
-                                    &("http://".to_string() + edge.to.name.as_str()),
-                                )
+            let backlinks: Vec<Link> = worker
+                .webgraph
+                .as_ref()
+                .map(|webgraph| {
+                    webgraph
+                        .ingoing_edges(Node::from(html.url()))
+                        .into_iter()
+                        .map(|edge| Link {
+                            source: Url::parse(&("http://".to_string() + edge.from.name.as_str()))
                                 .unwrap(),
-                                text: edge.label,
-                            })
-                            .collect()
-                    })
-                    .unwrap_or_else(Vec::new);
-
-                let mut page_centrality = 0.0;
-
-                if let Some(store) = worker.page_centrality_store.as_ref() {
-                    let node_id = store.node2id.get(&node);
-
-                    page_centrality = node_id
-                        .and_then(|node_id| store.harmonic.get(&node_id))
-                        .unwrap_or_default();
-                }
-
-                if host_centrality > 0.0 {
-                    has_host_centrality = true;
-                }
+                            destination: Url::parse(
+                                &("http://".to_string() + edge.to.name.as_str()),
+                            )
+                            .unwrap(),
+                            text: edge.label,
+                        })
+                        .collect()
+                })
+                .unwrap_or_else(Vec::new);
+
+            let mut page_centrality = 0.0;
+
+            if let Some(store) = worker.page_centrality_store.as_ref() {
+                let node_id = node.id();
+
+                page_centrality = store.harmonic.get(&node_id).unwrap_or_default();
+            }
 
-                if page_centrality > 0.0 {
-                    has_page_centrality = true;
-                }
+            if host_centrality > 0.0 {
+                has_host_centrality = true;
+            }
 
-                if !backlinks.is_empty() {
-                    has_backlinks = true;
-                }
+            if page_centrality > 0.0 {
+                has_page_centrality = true;
+            }
 
-                let fetch_time_ms = record.metadata.fetch_time_ms as u64;
+            if !backlinks.is_empty() {
+                has_backlinks = true;
+            }
 
-                trace!("inserting webpage: {:?}", html.url());
+            let fetch_time_ms = record.metadata.fetch_time_ms as u64;
 
-                trace!("title = {:?}", html.title());
-                trace!("text = {:?}", html.clean_text());
+            trace!("inserting webpage: {:?}", html.url());
 
-                let node_id = worker
-                    .host_centrality_store
-                    .node2id
-                    .get(&Node::from(html.url()).into_host());
+            trace!("title = {:?}", html.title());
+            trace!("text = {:?}", html.clean_text());
 
-                let mut dmoz_description = None;
+            let mut dmoz_description = None;
 
-                if let Some(mapper) = worker.topics.as_ref() {
-                    if let Some(info) =
-                        mapper.get(&html.url().host_str().unwrap_or_default().to_string())
-                    {
-                        dmoz_description = Some(info.description.clone())
-                    }
+            if let Some(mapper) = worker.topics.as_ref() {
+                if let Some(info) =
+                    mapper.get(&html.url().host_str().unwrap_or_default().to_string())
+                {
+                    dmoz_description = Some(info.description.clone())
                 }
+            }
 
-                let mut webpage = Webpage {
-                    html,
-                    backlinks,
-                    page_centrality,
-                    host_centrality,
-                    fetch_time_ms,
-                    pre_computed_score: 0.0,
-                    node_id,
-                    dmoz_description,
-                };
+            let mut webpage = Webpage {
+                html,
+                backlinks,
+                page_centrality,
+                host_centrality,
+                fetch_time_ms,
+                pre_computed_score: 0.0,
+                node_id: Some(host_node_id),
+                dmoz_description,
+            };
 
-                let mut signal_aggregator = SignalAggregator::new(None);
-                signal_aggregator.set_current_timestamp(current_timestamp);
+            let mut signal_aggregator = SignalAggregator::new(None);
+            signal_aggregator.set_current_timestamp(current_timestamp);
 
-                webpage.pre_computed_score = signal_aggregator.precompute_score(&webpage);
+            webpage.pre_computed_score = signal_aggregator.precompute_score(&webpage);
 
-                if let Err(err) = index.insert(webpage) {
-                    debug!("{:?}", err);
-                }
+            if let Err(err) = index.insert(webpage) {
+                warn!("{:?}", err);
             }
         }
 
         index.commit().unwrap();
-
-        std::fs::remove_file(path).unwrap();
     }
 
     if !has_host_centrality {
         warn!("no host centrality values found in {}", name);
     }
 
-    if !has_page_centrality {
+    if !has_page_centrality && worker.page_centrality_store.is_some() {
         warn!("no page centrality values found in {}", name);
     }
 
-    if !has_backlinks {
+    if !has_backlinks && worker.webgraph.is_some() {
         warn!("no backlinks found in {}", name);
     }
 
@@ -346,29 +333,30 @@ impl Indexer {
             config.topics_path.clone(),
         );
 
-        let executor = Executor::multi_thread("indexer").unwrap();
-
-        let indexes = executor
-            .map(
-                |job| -> IndexPointer { job.map(&worker) },
-                warc_paths
-                    .into_iter()
-                    .skip(config.skip_warc_files.unwrap_or(0))
-                    .take(config.limit_warc_files.unwrap_or(usize::MAX))
-                    .chunks(config.batch_size.unwrap_or(1))
-                    .into_iter()
-                    .map(|warc_paths| Job {
-                        source_config: job_config.clone(),
-                        warc_paths: warc_paths.collect_vec(),
-                        host_centrality_threshold: config.host_centrality_threshold,
-                        base_path: config
-                            .output_path
-                            .clone()
-                            .unwrap_or_else(|| "data/index".to_string()),
-                        minimum_clean_words: config.minimum_clean_words,
-                    }),
-            )
-            .unwrap_or_default();
+        let indexes = warc_paths
+            .into_iter()
+            .skip(config.skip_warc_files.unwrap_or(0))
+            .take(config.limit_warc_files.unwrap_or(usize::MAX))
+            .chunks(config.batch_size.unwrap_or(1))
+            .into_iter()
+            .map(|paths| paths.collect_vec())
+            .collect_vec()
+            .into_par_iter()
+            .map(|warc_paths| Job {
+                source_config: job_config.clone(),
+                warc_paths,
+                host_centrality_threshold: config.host_centrality_threshold,
+                base_path: config
+                    .output_path
+                    .clone()
+                    .unwrap_or_else(|| "data/index".to_string()),
+                minimum_clean_words: config.minimum_clean_words,
+            })
+            .map(|job| {
+                let pointer: IndexPointer = job.map(&worker);
+                pointer
+            })
+            .collect();
 
         Self::merge(indexes)?;
         Ok(())

+ 6 - 13
core/src/entrypoint/mod.rs

@@ -29,7 +29,7 @@ pub mod search_server;
 mod webgraph;
 pub mod webgraph_server;
 
-use std::{fs::File, path::Path};
+use std::path::Path;
 
 pub use centrality::Centrality;
 pub use entity::EntityIndexer;
@@ -43,7 +43,7 @@ fn download_all_warc_files<'a>(
     warc_paths: &'a [String],
     source: &'a config::WarcSource,
     base_path: &'a str,
-) -> impl Iterator<Item = String> + 'a {
+) -> impl Iterator<Item = WarcFile> + 'a {
     let download_path = Path::new(base_path).join("warc_files");
 
     if !download_path.exists() {
@@ -55,24 +55,17 @@ fn download_all_warc_files<'a>(
         .map(|warc_path| warc_path.to_string())
         .collect();
 
-    warc_paths.into_iter().map(|warc_path| {
-        let download_path = Path::new(base_path).join("warc_files");
-        let name = warc_path.split('/').last().unwrap();
-        let mut file = File::options()
-            .create(true)
-            .truncate(true)
-            .write(true)
-            .open(download_path.join(name))
-            .unwrap();
+    warc_paths.into_iter().filter_map(|warc_path| {
         debug!("downloading warc file {}", &warc_path);
-        let res = WarcFile::download_into_buf(source, &warc_path, &mut file);
+        let res = WarcFile::download(source, &warc_path);
 
         if let Err(err) = res {
             error!("error while downloading: {:?}", err);
+            return None;
         }
 
         debug!("finished downloading");
 
-        warc_path
+        Some(res.unwrap())
     })
 }

+ 80 - 159
core/src/entrypoint/webgraph.rs

@@ -19,16 +19,14 @@ use crate::{
     config::{self, WebgraphConstructConfig},
     crawler::crawl_db::RedirectDb,
     entrypoint::download_all_warc_files,
-    mapreduce::{Map, Reduce, Worker},
-    warc::WarcFile,
-    webgraph::{self, FrozenWebgraph, Node, WebgraphBuilder},
+    mapreduce::Worker,
+    webgraph::{self, Node, WebgraphBuilder},
     webpage::Html,
     Result,
 };
 use itertools::Itertools;
-use rayon::prelude::*;
 use serde::{Deserialize, Serialize};
-use std::{path::Path, thread};
+use std::{path::Path, sync::Arc};
 use tokio::pin;
 use tracing::{info, trace};
 
@@ -64,7 +62,7 @@ impl From<JobConfig> for config::WarcSource {
     }
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize, Clone)]
 pub struct Job {
     pub level: WebgraphLevel,
     pub config: JobConfig,
@@ -72,39 +70,32 @@ pub struct Job {
     pub graph_base_path: String,
 }
 
-fn open_graph<P: AsRef<Path>>(path: P) -> webgraph::Webgraph {
-    WebgraphBuilder::new(path).open()
+pub fn open_graph<P: AsRef<Path>>(path: P) -> webgraph::Webgraph {
+    WebgraphBuilder::new(path)
+        .commit_mode(webgraph::CommitMode::SingleSegment)
+        .open()
 }
 
 pub struct WebgraphWorker {
-    pub redirect: Option<RedirectDb>,
+    pub graph: webgraph::Webgraph,
+    pub redirect: Option<Arc<RedirectDb>>,
 }
 
 impl WebgraphWorker {
-    pub fn process_job(&self, job: &Job) -> webgraph::Webgraph {
+    pub fn process_job(&mut self, job: &Job) {
         let name = job.warc_paths.first().unwrap().split('/').last().unwrap();
 
         info!("processing {}", name);
 
-        let mut graph = open_graph(Path::new(&job.graph_base_path).join(name));
-
         let source = WarcSource::from(job.config.clone());
 
         let warc_files = download_all_warc_files(&job.warc_paths, &source, &job.graph_base_path);
         pin!(warc_files);
 
-        for warc_path in warc_files.by_ref() {
-            let name = warc_path.split('/').last().unwrap();
-            let path = Path::new(&job.graph_base_path)
-                .join("warc_files")
-                .join(name);
-
-            if let Ok(file) = WarcFile::open(&path) {
-                for record in file.records().flatten() {
-                    let webpage = match Html::parse_without_text(
-                        &record.response.body,
-                        &record.request.url,
-                    ) {
+        for file in warc_files.by_ref() {
+            for record in file.records().flatten() {
+                let webpage =
+                    match Html::parse_without_text(&record.response.body, &record.request.url) {
                         Ok(webpage) => webpage,
                         Err(err) => {
                             tracing::error!("error parsing webpage: {}", err);
@@ -112,112 +103,53 @@ impl WebgraphWorker {
                         }
                     };
 
-                    for link in webpage
-                        .anchor_links()
-                        .into_iter()
-                        .filter(|link| matches!(link.destination.scheme(), "http" | "https"))
-                        .filter(|link| link.source.domain() != link.destination.domain())
-                        .filter(|link| {
-                            link.source.domain().is_some() && link.destination.domain().is_some()
-                        })
-                    {
-                        let source = link.source.clone();
-                        let mut destination = link.destination.clone();
-
-                        if let Some(redirect) = &self.redirect {
-                            if let Some(new_destination) = redirect.get(&destination).unwrap() {
-                                trace!("redirecting {:?} to {:?}", destination, new_destination);
-                                destination = new_destination;
-                            }
+                for link in webpage
+                    .anchor_links()
+                    .into_iter()
+                    .filter(|link| matches!(link.destination.scheme(), "http" | "https"))
+                    .filter(|link| link.source.domain() != link.destination.domain())
+                    .filter(|link| {
+                        link.source.domain().is_some() && link.destination.domain().is_some()
+                    })
+                {
+                    let source = link.source.clone();
+                    let mut destination = link.destination.clone();
+
+                    if let Some(redirect) = &self.redirect {
+                        if let Some(new_destination) = redirect.get(&destination).unwrap() {
+                            trace!("redirecting {:?} to {:?}", destination, new_destination);
+                            destination = new_destination;
                         }
+                    }
 
-                        if source.domain() == destination.domain() {
-                            continue;
-                        }
+                    if source.domain() == destination.domain() {
+                        continue;
+                    }
 
-                        trace!("inserting link {:?}", link);
-                        let mut source = Node::from(source);
+                    trace!("inserting link {:?}", link);
+                    let mut source = Node::from(source);
 
-                        let mut destination = Node::from(destination);
+                    let mut destination = Node::from(destination);
 
-                        if let WebgraphLevel::Host = job.level {
-                            source = source.into_host();
-                            destination = destination.into_host();
-                        }
-
-                        graph.insert(source, destination, link.text);
+                    if let WebgraphLevel::Host = job.level {
+                        source = source.into_host();
+                        destination = destination.into_host();
                     }
+
+                    self.graph.insert(source, destination, link.text);
                 }
             }
 
-            graph.commit();
-
-            std::fs::remove_file(path).unwrap();
+            self.graph.commit();
         }
-        graph.merge_segments(1);
+        self.graph.merge_segments(1);
 
         info!("{} done", name);
-
-        graph
     }
 }
 
 impl Worker for WebgraphWorker {}
 
-impl Map<WebgraphWorker, FrozenWebgraph> for Job {
-    fn map(&self, worker: &WebgraphWorker) -> FrozenWebgraph {
-        let graph = worker.process_job(self);
-        graph.into()
-    }
-}
-
-impl Map<WebgraphWorker, GraphPointer> for Job {
-    fn map(&self, worker: &WebgraphWorker) -> GraphPointer {
-        let graph = worker.process_job(self);
-        GraphPointer { path: graph.path }
-    }
-}
-
-impl Reduce<FrozenWebgraph> for webgraph::Webgraph {
-    fn reduce(self, other: FrozenWebgraph) -> webgraph::Webgraph {
-        let other: webgraph::Webgraph = other.into();
-        self.reduce(other)
-    }
-}
-
-impl Reduce<webgraph::Webgraph> for webgraph::Webgraph {
-    fn reduce(mut self, element: webgraph::Webgraph) -> Self {
-        let other_path = element.path.clone();
-
-        self.merge(element);
-
-        std::fs::remove_dir_all(other_path).unwrap();
-        self
-    }
-}
-
-impl Reduce<GraphPointer> for GraphPointer {
-    fn reduce(self, other: GraphPointer) -> Self {
-        let self_clone = self.clone();
-
-        {
-            let graph = open_graph(self.path);
-            let other_graph = open_graph(other.path);
-
-            let _ = graph.reduce(other_graph);
-        }
-
-        self_clone
-    }
-}
-
-impl Reduce<GraphPointer> for webgraph::Webgraph {
-    fn reduce(self, other: GraphPointer) -> Self {
-        let other = open_graph(other.path);
-        self.reduce(other)
-    }
-}
-
 pub struct Webgraph {}
 
 impl Webgraph {
@@ -227,13 +159,11 @@ impl Webgraph {
         let job_config = JobConfig::from(config.warc_source.clone());
 
         let redirect = match &config.redirect_db_path {
-            Some(path) => Some(RedirectDb::open(path)?),
+            Some(path) => Some(Arc::new(RedirectDb::open(path)?)),
             None => None,
         };
 
-        let worker = WebgraphWorker { redirect };
-
-        let graphs: Vec<_> = warc_paths
+        let jobs: Vec<_> = warc_paths
             .into_iter()
             .take(config.limit_warc_files.unwrap_or(usize::MAX))
             .chunks(config.batch_size.unwrap_or(1))
@@ -247,57 +177,48 @@ impl Webgraph {
                     .clone()
                     .unwrap_or_else(|| "data/webgraph".to_string()),
             })
-            .collect_vec()
-            .into_par_iter()
-            .map(|job| -> GraphPointer { job.map(&worker) })
-            .collect();
-
-        if graphs.len() > 1 {
-            Self::merge(graphs);
-        }
-
-        Ok(())
-    }
-
-    fn merge(graphs: Vec<GraphPointer>) {
-        let num_graphs = graphs.len();
-        let mut it = graphs.into_iter();
-        let num_cores = num_cpus::get();
-
-        let mut threads = Vec::new();
-
-        for _ in 0..(num_cores + 1) {
-            let graphs = it
-                .by_ref()
-                .take(((num_graphs as f64) / (num_cores as f64)).ceil() as usize)
-                .collect_vec();
-
-            if graphs.is_empty() {
-                break;
-            }
-
-            threads.push(thread::spawn(move || {
-                let mut it = graphs.into_iter();
-                let mut graph = open_graph(it.next().unwrap().path);
-
-                for other in it {
-                    graph = graph.reduce(other);
+            .collect_vec();
+
+        let num_workers = num_cpus::get();
+
+        let mut handlers = Vec::new();
+        let parent_path = config
+            .graph_base_path
+            .clone()
+            .unwrap_or_else(|| "data/webgraph".to_string());
+
+        for i in 0..num_workers {
+            let path = parent_path.clone();
+            let path = Path::new(&path);
+            let path = path.join(format!("worker_{i}"));
+
+            let mut worker = WebgraphWorker {
+                redirect: redirect.clone(),
+                graph: open_graph(path),
+            };
+
+            let jobs = jobs.clone();
+            handlers.push(std::thread::spawn(move || {
+                for job in jobs.iter().skip(i).step_by(num_workers) {
+                    worker.process_job(job);
                 }
-                graph.merge_segments(1);
 
-                graph
+                worker.graph
             }));
         }
 
         let mut graphs = Vec::new();
-        for thread in threads {
-            graphs.push(thread.join().unwrap());
+        for handler in handlers {
+            graphs.push(handler.join().unwrap());
         }
 
         let mut graph = graphs.pop().unwrap();
-
-        for other in graphs {
-            graph = graph.reduce(other);
+        for other_graph in graphs {
+            graph.merge(other_graph);
         }
+
+        graph.move_to(&parent_path);
+
+        Ok(())
     }
 }

+ 293 - 12
core/src/hyperloglog.rs

@@ -4034,6 +4034,265 @@ pub(crate) const BIAS_DATA_VEC: &[&[f64]] = &[
     ],
 ];
 
+const ONE_OVER_POWER_OF_TWO: [f64; 256] = [
+    1.0,
+    0.5,
+    0.25,
+    0.125,
+    0.0625,
+    0.03125,
+    0.015625,
+    0.0078125,
+    0.00390625,
+    0.001953125,
+    0.0009765625,
+    0.00048828125,
+    0.000244140625,
+    0.0001220703125,
+    6.103515625e-05,
+    3.0517578125e-05,
+    1.52587890625e-05,
+    7.62939453125e-06,
+    3.814697265625e-06,
+    1.9073486328125e-06,
+    9.5367431640625e-07,
+    4.76837158203125e-07,
+    2.384_185_791_015_625e-7,
+    1.192_092_895_507_812_5e-7,
+    5.960_464_477_539_063e-8,
+    2.980_232_238_769_531_3e-8,
+    1.490_116_119_384_765_6e-8,
+    7.450_580_596_923_828e-9,
+    3.725_290_298_461_914e-9,
+    1.862_645_149_230_957e-9,
+    9.313225746154785e-10,
+    4.656612873077393e-10,
+    2.3283064365386963e-10,
+    1.1641532182693481e-10,
+    5.820766091346741e-11,
+    2.9103830456733704e-11,
+    1.4551915228366852e-11,
+    7.275957614183426e-12,
+    3.637978807091713e-12,
+    1.8189894035458565e-12,
+    9.094947017729282e-13,
+    4.547473508864641e-13,
+    2.2737367544323206e-13,
+    1.1368683772161603e-13,
+    5.684341886080802e-14,
+    2.842170943040401e-14,
+    1.4210854715202004e-14,
+    7.105427357601002e-15,
+    3.552713678800501e-15,
+    1.7763568394002505e-15,
+    8.881784197001252e-16,
+    4.440892098500626e-16,
+    2.220446049250313e-16,
+    1.1102230246251565e-16,
+    5.551115123125783e-17,
+    2.7755575615628914e-17,
+    1.3877787807814457e-17,
+    6.938893903907228e-18,
+    3.469446951953614e-18,
+    1.734723475976807e-18,
+    8.673617379884035e-19,
+    4.336808689942018e-19,
+    2.168404344971009e-19,
+    1.0842021724855044e-19,
+    5.421010862427522e-20,
+    2.710505431213761e-20,
+    1.3552527156068805e-20,
+    6.776263578034403e-21,
+    3.3881317890172014e-21,
+    1.6940658945086007e-21,
+    8.470329472543003e-22,
+    4.235164736271502e-22,
+    2.117582368135751e-22,
+    1.0587911840678754e-22,
+    5.293955920339377e-23,
+    2.6469779601696886e-23,
+    1.3234889800848443e-23,
+    6.617444900424222e-24,
+    3.308722450212111e-24,
+    1.6543612251060553e-24,
+    8.271806125530277e-25,
+    4.1359030627651384e-25,
+    2.0679515313825692e-25,
+    1.0339757656912846e-25,
+    5.169878828456423e-26,
+    2.5849394142282115e-26,
+    1.2924697071141057e-26,
+    6.462348535570529e-27,
+    3.2311742677852644e-27,
+    1.6155871338926322e-27,
+    8.077935669463161e-28,
+    4.0389678347315804e-28,
+    2.0194839173657902e-28,
+    1.0097419586828951e-28,
+    5.048709793414476e-29,
+    2.524354896707238e-29,
+    1.262177448353619e-29,
+    6.310887241768095e-30,
+    3.1554436208840472e-30,
+    1.5777218104420236e-30,
+    7.888609052210118e-31,
+    3.944304526105059e-31,
+    1.9721522630525295e-31,
+    9.860761315262648e-32,
+    4.930380657631324e-32,
+    2.465190328815662e-32,
+    1.232595164407831e-32,
+    6.162975822039155e-33,
+    3.0814879110195774e-33,
+    1.5407439555097887e-33,
+    7.703719777548943e-34,
+    3.851859888774472e-34,
+    1.925929944387236e-34,
+    9.62964972193618e-35,
+    4.81482486096809e-35,
+    2.407412430484045e-35,
+    1.2037062152420224e-35,
+    6.018531076210112e-36,
+    3.009265538105056e-36,
+    1.504632769052528e-36,
+    7.52316384526264e-37,
+    3.76158192263132e-37,
+    1.88079096131566e-37,
+    9.4039548065783e-38,
+    4.70197740328915e-38,
+    2.350988701644575e-38,
+    1.1754943508222875e-38,
+    5.877471754111438e-39,
+    2.938735877055719e-39,
+    1.4693679385278594e-39,
+    7.346839692639297e-40,
+    3.6734198463196485e-40,
+    1.8367099231598242e-40,
+    9.183549615799121e-41,
+    4.591774807899561e-41,
+    2.2958874039497803e-41,
+    1.1479437019748901e-41,
+    5.739718509874451e-42,
+    2.8698592549372254e-42,
+    1.4349296274686127e-42,
+    7.174648137343064e-43,
+    3.587324068671532e-43,
+    1.793662034335766e-43,
+    8.96831017167883e-44,
+    4.484155085839415e-44,
+    2.2420775429197073e-44,
+    1.1210387714598537e-44,
+    5.605193857299268e-45,
+    2.802596928649634e-45,
+    1.401298464324817e-45,
+    7.006492321624085e-46,
+    3.503246160812043e-46,
+    1.7516230804060213e-46,
+    8.758115402030107e-47,
+    4.3790577010150533e-47,
+    2.1895288505075267e-47,
+    1.0947644252537633e-47,
+    5.473822126268817e-48,
+    2.7369110631344083e-48,
+    1.3684555315672042e-48,
+    6.842277657836021e-49,
+    3.4211388289180104e-49,
+    1.7105694144590052e-49,
+    8.552847072295026e-50,
+    4.276423536147513e-50,
+    2.1382117680737565e-50,
+    1.0691058840368783e-50,
+    5.345529420184391e-51,
+    2.6727647100921956e-51,
+    1.3363823550460978e-51,
+    6.681911775230489e-52,
+    3.3409558876152446e-52,
+    1.6704779438076223e-52,
+    8.352389719038111e-53,
+    4.176194859519056e-53,
+    2.088097429759528e-53,
+    1.044048714879764e-53,
+    5.22024357439882e-54,
+    2.61012178719941e-54,
+    1.305060893599705e-54,
+    6.525304467998525e-55,
+    3.2626522339992623e-55,
+    1.6313261169996311e-55,
+    8.156630584998156e-56,
+    4.078315292499078e-56,
+    2.039157646249539e-56,
+    1.0195788231247695e-56,
+    5.0978941156238473e-57,
+    2.5489470578119236e-57,
+    1.2744735289059618e-57,
+    6.372367644529809e-58,
+    3.1861838222649046e-58,
+    1.5930919111324523e-58,
+    7.965459555662261e-59,
+    3.982729777831131e-59,
+    1.9913648889155653e-59,
+    9.956824444577827e-60,
+    4.9784122222889134e-60,
+    2.4892061111444567e-60,
+    1.2446030555722283e-60,
+    6.223015277861142e-61,
+    3.111507638930571e-61,
+    1.5557538194652854e-61,
+    7.778769097326427e-62,
+    3.8893845486632136e-62,
+    1.9446922743316068e-62,
+    9.723461371658034e-63,
+    4.861730685829017e-63,
+    2.4308653429145085e-63,
+    1.2154326714572542e-63,
+    6.077163357286271e-64,
+    3.0385816786431356e-64,
+    1.5192908393215678e-64,
+    7.596454196607839e-65,
+    3.7982270983039195e-65,
+    1.8991135491519597e-65,
+    9.495567745759799e-66,
+    4.7477838728798994e-66,
+    2.3738919364399497e-66,
+    1.1869459682199748e-66,
+    5.934729841099874e-67,
+    2.967364920549937e-67,
+    1.4836824602749686e-67,
+    7.418412301374843e-68,
+    3.7092061506874214e-68,
+    1.8546030753437107e-68,
+    9.273015376718553e-69,
+    4.636507688359277e-69,
+    2.3182538441796384e-69,
+    1.1591269220898192e-69,
+    5.795634610449096e-70,
+    2.897817305224548e-70,
+    1.448908652612274e-70,
+    7.24454326306137e-71,
+    3.622271631530685e-71,
+    1.8111358157653425e-71,
+    9.055679078826712e-72,
+    4.527839539413356e-72,
+    2.263919769706678e-72,
+    1.131959884853339e-72,
+    5.659799424266695e-73,
+    2.8298997121333476e-73,
+    1.4149498560666738e-73,
+    7.074749280333369e-74,
+    3.5373746401666845e-74,
+    1.7686873200833423e-74,
+    8.843436600416711e-75,
+    4.421718300208356e-75,
+    2.210859150104178e-75,
+    1.105429575052089e-75,
+    5.527147875260445e-76,
+    2.7635739376302223e-76,
+    1.3817869688151111e-76,
+    6.908934844075556e-77,
+    3.454467422037778e-77,
+    1.727233711018889e-77,
+];
+
 #[derive(Clone)]
 pub struct HyperLogLog<const N: usize> {
     registers: Vec<u8>,
@@ -4048,10 +4307,12 @@ impl<const N: usize> Default for HyperLogLog<N> {
 }
 
 impl<const N: usize> HyperLogLog<N> {
+    #[inline]
     fn hash(item: u64) -> u64 {
         item.wrapping_mul(11400714819323198549)
     }
 
+    #[inline]
     fn am(&self) -> f64 {
         let m = self.registers.len();
 
@@ -4066,6 +4327,7 @@ impl<const N: usize> HyperLogLog<N> {
         }
     }
 
+    #[inline]
     fn b(&self) -> usize {
         (N as f64).log2() as usize
     }
@@ -4086,9 +4348,11 @@ impl<const N: usize> HyperLogLog<N> {
     pub fn estimate_bias(&self, e: f64, b: usize) -> f64 {
         // binary search first nearest neighbor
         let lookup_array = RAW_ESTIMATE_DATA_VEC[b - 1 - RAW_ESTIMATE_DATA_OFFSET];
+
         let mut idx_left = match lookup_array.binary_search_by(|v| v.partial_cmp(&e).unwrap()) {
-            Ok(i) => Some(i),  // exact match
-            Err(i) => Some(i), // no match, i points to left neighbor
+            Ok(i) => Some(i),                                 // exact match
+            Err(i) if i == lookup_array.len() => Some(i - 1), // no match, i points to end of array
+            Err(i) => Some(i),                                // no match, i points to left neighbor
         };
 
         let mut idx_right = match idx_left {
@@ -4105,8 +4369,8 @@ impl<const N: usize> HyperLogLog<N> {
         // collect k nearest neighbors
         const K: usize = 6;
         debug_assert!(lookup_array.len() >= K);
-        let mut neighbors = Vec::with_capacity(K);
-        for _ in 0..K {
+        let mut neighbors = [0; K];
+        for neighbor in neighbors.iter_mut() {
             let (right_instead_left, idx) = match (idx_left, idx_right) {
                 (Some(i_left), Some(i_right)) => {
                     // 2 candidates, find better one
@@ -4128,7 +4392,7 @@ impl<const N: usize> HyperLogLog<N> {
                 }
                 _ => panic!("neighborhood search failed, this is bug!"),
             };
-            neighbors.push(idx);
+            *neighbor = idx;
             if right_instead_left {
                 idx_right = if idx < lookup_array.len() - 1 {
                     Some(idx + 1)
@@ -4141,7 +4405,8 @@ impl<const N: usize> HyperLogLog<N> {
         }
 
         // calculate mean of neighbors
-        let bias_data = BIAS_DATA_VEC[b - BIAS_DATA_OFFSET];
+        let bias_data = BIAS_DATA_VEC[b - 1 - BIAS_DATA_OFFSET];
+
         neighbors.iter().map(|&i| bias_data[i]).sum::<f64>() / (K as f64)
     }
 
@@ -4159,12 +4424,13 @@ impl<const N: usize> HyperLogLog<N> {
         let m = self.registers.len() as f64;
         let b = self.b();
 
-        let z = 1f64
-            / self
-                .registers
-                .iter()
-                .map(|&val| 2_f64.powi(-(i32::from(val))))
-                .sum::<f64>();
+        let sum: f64 = self
+            .registers
+            .iter()
+            .map(|&val| ONE_OVER_POWER_OF_TWO[val as usize])
+            .sum();
+
+        let z = 1f64 / sum;
 
         let e = self.am() * m.powi(2) * z;
 
@@ -4222,6 +4488,21 @@ mod tests {
 
         assert!(set.size() > lower_bound && set.size() < upper_bound);
     }
+    #[test]
+    fn many_different_sizes() {
+        let mut set: HyperLogLog<128> = HyperLogLog::default();
+
+        for item in 0..10_000 {
+            set.add(item);
+            set.size();
+        }
+
+        let delta = (set.relative_error() * (set.size() as f64)) as usize;
+        let lower_bound = set.size() - delta;
+        let upper_bound = set.size() + delta;
+
+        assert!(set.size() > lower_bound && set.size() < upper_bound);
+    }
 
     #[test]
     fn merge() {

+ 6 - 0
core/src/index.rs

@@ -67,6 +67,12 @@ impl Index {
         })
     }
 
+    pub fn optimize_for_search(&mut self) -> Result<()> {
+        self.inverted_index.optimize_for_search()?;
+
+        Ok(())
+    }
+
     pub fn tokenizers(&self) -> &TokenizerManager {
         self.inverted_index.tokenizers()
     }

+ 0 - 12
core/src/intmap.rs

@@ -131,18 +131,10 @@ impl<K: Key, V> IntMap<K, V> {
             .map(|(_, val)| val)
     }
 
-    pub fn len(&self) -> usize {
-        self.len
-    }
-
     pub fn into_iter(self) -> impl Iterator<Item = (K, V)> {
         self.bins.into_iter().flat_map(|bin| bin.into_iter())
     }
 
-    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
-        self.bins.iter_mut().flat_map(|bin| bin.iter_mut())
-    }
-
     pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
         self.bins.iter().flat_map(|bin| bin.iter())
     }
@@ -202,10 +194,6 @@ impl<K: Key> IntSet<K> {
         self.map.into_iter().map(|(key, _)| key)
     }
 
-    pub fn len(&self) -> usize {
-        self.map.len()
-    }
-
     pub fn contains(&self, item: &K) -> bool {
         self.map.contains_key(item)
     }

+ 7 - 3
core/src/inverted_index.rs

@@ -111,7 +111,7 @@ impl InvertedIndex {
     pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
         let schema = create_schema();
 
-        let mut tantivy_index = if path.as_ref().exists() {
+        let tantivy_index = if path.as_ref().exists() {
             let mmap_directory = MmapDirectory::open(&path)?;
             tantivy::Index::open(mmap_directory)?
         } else {
@@ -128,8 +128,6 @@ impl InvertedIndex {
             tantivy::Index::create(mmap_directory, schema.clone(), index_settings)?
         };
 
-        tantivy_index.set_default_multithread_executor()?;
-
         let tokenizer = Tokenizer::default();
         tantivy_index
             .tokenizers()
@@ -493,6 +491,12 @@ impl InvertedIndex {
         res.pop()
             .map(|(_, doc)| self.retrieve_doc(doc.into(), &tv_searcher).unwrap())
     }
+
+    pub fn optimize_for_search(&mut self) -> Result<()> {
+        self.tantivy_index.set_default_multithread_executor()?;
+
+        Ok(())
+    }
 }
 
 #[derive(Debug, Serialize)]

+ 3 - 1
core/src/kahan_sum.rs

@@ -16,7 +16,7 @@
 
 use std::ops::{Add, AddAssign};
 
-#[derive(Default)]
+#[derive(Default, Clone, Copy, Debug)]
 pub struct KahanSum {
     sum: f64,
     err: f64,
@@ -59,6 +59,8 @@ mod tests {
     #[test]
     fn it_works() {
         let mut sum = KahanSum::default();
+        assert_eq!(0.0, f64::from(sum));
+
         for elem in [
             10000.0f64,
             std::f64::consts::PI,

+ 1 - 0
core/src/kv/rocksdb_store.rs

@@ -41,6 +41,7 @@ impl RocksDbStore {
         options.create_if_missing(true);
         options.set_max_open_files(1);
         options.set_max_file_opening_threads(1);
+        options.optimize_for_point_lookup(512 * 1024 * 1024); // 512 MB
 
         Box::new(DB::open(&options, path).expect("unable to open rocks db"))
     }

+ 4 - 2
core/src/lib.rs

@@ -15,6 +15,8 @@
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 //! Main library for Stract.
+
+#![doc(html_logo_url = "https://trystract.com/images/biglogo.svg")]
 // #![warn(clippy::pedantic)]
 // #![warn(missing_docs)]
 // #![warn(clippy::missing_docs_in_private_items)]
@@ -45,7 +47,7 @@ mod executor;
 mod fact_check_model;
 mod fastfield_reader;
 mod human_website_annotations;
-mod hyperloglog;
+pub mod hyperloglog;
 mod image_downloader;
 mod image_store;
 mod improvement;
@@ -83,7 +85,7 @@ pub enum Error {
     #[error("Failed to parse WARC file")]
     WarcParse(&'static str),
 
-    #[error("Encountered an empty required field when converting to tantivy")]
+    #[error("Encountered an empty required field ({0}) when converting to tantivy")]
     EmptyField(&'static str),
 
     #[error("Parsing error")]

+ 7 - 0
core/src/main.rs

@@ -30,6 +30,13 @@ use stract::webgraph::WebgraphBuilder;
 use tracing::Level;
 use tracing_subscriber::FmtSubscriber;
 
+#[cfg(not(target_env = "msvc"))]
+use tikv_jemallocator::Jemalloc;
+
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
 #[derive(Parser)]
 #[clap(author, version, about, long_about = None)]
 #[clap(propagate_version = true)]

+ 6 - 126
core/src/ranking/centrality_store.rs

@@ -16,139 +16,26 @@
 
 use std::{fs::File, path::Path};
 
-use rocksdb::BlockBasedOptions;
 use tracing::debug;
 use tracing::log::trace;
 
 use crate::{
     kv::{rocksdb_store::RocksDbStore, Kv},
-    webgraph::{centrality::harmonic::HarmonicCentrality, Node, NodeID, Webgraph},
+    webgraph::{centrality::harmonic::HarmonicCentrality, NodeID, Webgraph},
 };
 
 use super::inbound_similarity::InboundSimilarity;
 
-pub struct Node2Id {
-    db: rocksdb::DB,
-    // from rocksdb docs: `Cache must outlive DB instance which uses it.`
-    #[allow(dead_code)]
-    cache: rocksdb::Cache,
-}
-
-impl Node2Id {
-    pub fn open<P: AsRef<Path>>(path: P) -> Self {
-        let mut options = rocksdb::Options::default();
-        options.create_if_missing(true);
-        options.increase_parallelism(8);
-        options.set_write_buffer_size(256 * 1024 * 1024); // 256 MB memtable
-        options.set_max_write_buffer_number(8);
-
-        let mut block_options = BlockBasedOptions::default();
-        block_options.set_ribbon_filter(10.0);
-
-        let cache = rocksdb::Cache::new_lru_cache(256 * 1024 * 1024).unwrap(); // 256 MB cache
-        block_options.set_block_cache(&cache);
-
-        options.set_block_based_table_factory(&block_options);
-
-        let db = rocksdb::DB::open(&options, path.as_ref().to_str().unwrap()).unwrap();
-
-        Self { db, cache }
-    }
-
-    pub fn get(&self, key: &Node) -> Option<NodeID> {
-        let bytes = bincode::serialize(key).unwrap();
-
-        self.db
-            .get(bytes)
-            .unwrap()
-            .map(|bytes| bincode::deserialize(&bytes).unwrap())
-    }
-
-    pub fn put(&self, key: &Node, value: &NodeID) {
-        let key_bytes = bincode::serialize(key).unwrap();
-        let value_bytes = bincode::serialize(value).unwrap();
-
-        self.db.put(key_bytes, value_bytes).unwrap();
-    }
-
-    pub fn batch_put(&self, it: impl Iterator<Item = (Node, NodeID)>) {
-        let mut batch = rocksdb::WriteBatch::default();
-
-        for (key, value) in it {
-            let key_bytes = bincode::serialize(&key).unwrap();
-            let value_bytes = bincode::serialize(&value).unwrap();
-
-            batch.put(key_bytes, value_bytes);
-        }
-
-        self.db.write(batch).unwrap();
-    }
-
-    pub fn contains(&self, key: &Node) -> bool {
-        let bytes = bincode::serialize(key).unwrap();
-
-        self.db.get(bytes).unwrap().is_some()
-    }
-
-    pub fn nodes(&self) -> impl Iterator<Item = Node> + '_ {
-        let mut read_opts = rocksdb::ReadOptions::default();
-
-        read_opts.set_readahead_size(4_194_304); // 4 MB
-
-        self.db
-            .iterator_opt(rocksdb::IteratorMode::Start, read_opts)
-            .map(|res| {
-                let (key, _) = res.unwrap();
-                bincode::deserialize(&key).unwrap()
-            })
-    }
-
-    pub fn ids(&self) -> impl Iterator<Item = NodeID> + '_ {
-        let mut read_opts = rocksdb::ReadOptions::default();
-
-        read_opts.set_readahead_size(4_194_304); // 4 MB
-
-        self.db
-            .iterator_opt(rocksdb::IteratorMode::Start, read_opts)
-            .map(|res| {
-                let (_, val) = res.unwrap();
-                bincode::deserialize(&val).unwrap()
-            })
-    }
-
-    pub fn iter(&self) -> impl Iterator<Item = (Node, NodeID)> + '_ {
-        let mut read_opts = rocksdb::ReadOptions::default();
-
-        read_opts.set_readahead_size(4_194_304); // 4 MB
-
-        self.db
-            .iterator_opt(rocksdb::IteratorMode::Start, read_opts)
-            .map(|res| {
-                let (key, val) = res.unwrap();
-                (
-                    bincode::deserialize(&key).unwrap(),
-                    bincode::deserialize(&val).unwrap(),
-                )
-            })
-    }
-
-    pub fn flush(&self) {
-        self.db.flush().unwrap();
-    }
-}
-
 pub type HarmonicCentralityStore = Box<dyn Kv<NodeID, f64> + Send + Sync>;
 
 pub struct IndexerCentralityStore {
     pub harmonic: HarmonicCentralityStore,
-    pub node2id: Node2Id,
 }
 
 impl IndexerCentralityStore {
     pub fn open<P: AsRef<Path>>(path: P) -> Self {
         Self {
             harmonic: RocksDbStore::open(path.as_ref().join("harmonic")),
-            node2id: Node2Id::open(path.as_ref().join("node2id")),
         }
     }
 }
@@ -156,7 +43,6 @@ impl IndexerCentralityStore {
 impl From<CentralityStore> for IndexerCentralityStore {
     fn from(store: CentralityStore) -> Self {
         Self {
-            node2id: store.node2id,
             harmonic: store.harmonic,
         }
     }
@@ -164,7 +50,6 @@ impl From<CentralityStore> for IndexerCentralityStore {
 
 pub struct SearchCentralityStore {
     pub inbound_similarity: InboundSimilarity,
-    pub node2id: Node2Id,
 }
 
 impl SearchCentralityStore {
@@ -172,7 +57,6 @@ impl SearchCentralityStore {
         Self {
             inbound_similarity: InboundSimilarity::open(path.as_ref().join("inbound_similarity"))
                 .unwrap(),
-            node2id: Node2Id::open(path.as_ref().join("node2id")),
         }
     }
 }
@@ -181,7 +65,6 @@ impl From<CentralityStore> for SearchCentralityStore {
     fn from(store: CentralityStore) -> Self {
         Self {
             inbound_similarity: store.inbound_similarity,
-            node2id: store.node2id,
         }
     }
 }
@@ -189,7 +72,6 @@ impl From<CentralityStore> for SearchCentralityStore {
 pub struct CentralityStore {
     pub harmonic: HarmonicCentralityStore,
     pub inbound_similarity: InboundSimilarity,
-    pub node2id: Node2Id,
     pub base_path: String,
 }
 
@@ -200,7 +82,6 @@ impl CentralityStore {
             inbound_similarity: InboundSimilarity::open(path.as_ref().join("inbound_similarity"))
                 .ok()
                 .unwrap_or_default(),
-            node2id: Node2Id::open(path.as_ref().join("node2id")),
             base_path: path.as_ref().to_str().unwrap().to_string(),
         }
     }
@@ -218,6 +99,11 @@ impl CentralityStore {
             .open(output_path.as_ref().join("harmonic.csv"))
             .unwrap();
 
+        for (node_id, centrality) in harmonic_centrality.iter() {
+            store.harmonic.insert(*node_id, centrality);
+        }
+        store.harmonic.flush();
+
         let mut harmonic: Vec<_> = harmonic_centrality
             .iter()
             .map(|(node, centrality)| (*node, centrality))
@@ -229,12 +115,10 @@ impl CentralityStore {
         for (node_id, centrality) in harmonic {
             let node = graph.id2node(&node_id).unwrap();
 
-            store.harmonic.insert(node_id, centrality);
             wtr.write_record(&[node.name, centrality.to_string()])
                 .unwrap();
         }
         wtr.flush().unwrap();
-        store.harmonic.flush();
     }
 
     pub fn build<P: AsRef<Path>>(graph: &Webgraph, output_path: P) -> Self {
@@ -245,7 +129,6 @@ impl CentralityStore {
     pub fn build_harmonic<P: AsRef<Path>>(graph: &Webgraph, output_path: P) -> Self {
         let mut store = CentralityStore::open(output_path.as_ref());
 
-        store.node2id.batch_put(graph.node_ids());
         let harmonic_centrality = HarmonicCentrality::calculate(graph);
         Self::store_harmonic(&output_path, &mut store, harmonic_centrality, graph);
 
@@ -271,8 +154,5 @@ impl CentralityStore {
         self.inbound_similarity
             .save(Path::new(&self.base_path).join("inbound_similarity"))
             .unwrap();
-
-        trace!("saving node2id");
-        self.node2id.flush();
     }
 }

+ 16 - 15
core/src/ranking/inbound_similarity.rs

@@ -24,6 +24,7 @@ use std::{
 };
 
 use dashmap::DashMap;
+use rayon::prelude::ParallelIterator;
 use serde::{Deserialize, Serialize};
 
 use crate::{
@@ -33,7 +34,7 @@ use crate::{
 };
 
 use super::{bitvec_similarity, centrality_store::HarmonicCentralityStore};
-const DEFAULT_NUM_TOP_HARMONIC_CENTRALITY_FOR_NODES: usize = 1_000_000;
+const DEFAULT_NUM_TOP_HARMONIC_CENTRALITY_NODES: usize = 1_000_000;
 
 pub struct Scorer {
     liked: Vec<NodeScorer>,
@@ -144,11 +145,7 @@ pub struct InboundSimilarity {
 
 impl InboundSimilarity {
     pub fn build(graph: &Webgraph, harmonic: &HarmonicCentralityStore) -> Self {
-        Self::build_with_threshold(
-            graph,
-            harmonic,
-            DEFAULT_NUM_TOP_HARMONIC_CENTRALITY_FOR_NODES,
-        )
+        Self::build_with_threshold(graph, harmonic, DEFAULT_NUM_TOP_HARMONIC_CENTRALITY_NODES)
     }
 
     fn build_with_threshold(
@@ -179,18 +176,22 @@ impl InboundSimilarity {
 
         let top_nodes: IntSet<NodeID> = top_nodes.into_iter().map(|n| n.node).collect();
 
-        for node_id in graph.nodes() {
-            let mut ranks = Vec::new();
+        let adjacency: DashMap<NodeID, Vec<NodeID>> = DashMap::new();
 
-            for edge in graph.raw_ingoing_edges(&node_id) {
-                if !top_nodes.contains(&edge.from) {
-                    continue;
-                }
-
-                ranks.push(edge.from.bit_128());
+        graph.par_edges().for_each(|edge| {
+            if top_nodes.contains(&edge.from) {
+                adjacency
+                    .entry(edge.to)
+                    .or_insert_with(Vec::new)
+                    .push(edge.from);
             }
+        });
 
-            vectors.insert(node_id, bitvec_similarity::BitVec::new(ranks));
+        for (node_id, inbound) in adjacency {
+            vectors.insert(
+                node_id,
+                bitvec_similarity::BitVec::new(inbound.into_iter().map(|n| n.bit_128()).collect()),
+            );
         }
 
         Self {

+ 6 - 2
core/src/searcher/local.rs

@@ -66,6 +66,10 @@ struct InvertedIndexResult {
 impl LocalSearcher {
     pub fn new(index: Index) -> Self {
         let spell = Spell::for_index(&index);
+
+        let mut index = index;
+        index.optimize_for_search().unwrap();
+
         LocalSearcher {
             index,
             spell,
@@ -184,7 +188,7 @@ impl LocalSearcher {
                 .liked
                 .iter()
                 .map(|site| Node::from(site.clone()).into_host())
-                .filter_map(|node| store.node2id.get(&node))
+                .map(|node| node.id())
                 .collect();
 
             let disliked_sites: Vec<_> = parsed_query
@@ -192,7 +196,7 @@ impl LocalSearcher {
                 .disliked
                 .iter()
                 .map(|site| Node::from(site.clone()).into_host())
-                .filter_map(|node| store.node2id.get(&node))
+                .map(|node| node.id())
                 .collect();
 
             aggregator.set_inbound_similarity(

+ 3 - 1
core/src/warc.rs

@@ -81,10 +81,10 @@ impl WarcFile {
         }
     }
 
-    #[allow(unused)]
     pub(crate) fn download(source: &WarcSource, warc_path: &str) -> Result<Self> {
         let mut cursor = Cursor::new(Vec::new());
         Self::download_into_buf(source, warc_path, &mut cursor)?;
+        cursor.rewind()?;
 
         let mut buf = Vec::new();
         cursor.read_to_end(&mut buf)?;
@@ -131,7 +131,9 @@ impl WarcFile {
         let mut reader = BufReader::new(f);
 
         buf.rewind()?;
+
         std::io::copy(&mut reader, buf)?;
+
         Ok(())
     }
 

+ 78 - 65
core/src/webgraph/centrality/harmonic.rs

@@ -14,20 +14,25 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-use std::collections::BTreeMap;
+use std::{
+    collections::BTreeMap,
+    sync::{atomic::AtomicBool, Mutex},
+};
+
+use std::sync::atomic::Ordering;
 
 use bitvec::vec::BitVec;
+use dashmap::{DashMap, DashSet};
+use rayon::prelude::*;
 use tracing::info;
 
 use crate::{
     hyperloglog::HyperLogLog,
-    intmap::{IntMap, IntSet},
     kahan_sum::KahanSum,
     webgraph::{NodeID, Webgraph},
 };
 
 const HYPERLOGLOG_COUNTERS: usize = 64;
-const EXACT_COUNTING_THRESHOLD: u64 = 1_000_000;
 
 #[derive(Clone)]
 struct JankyBloomFilter {
@@ -80,50 +85,55 @@ impl JankyBloomFilter {
 }
 
 fn calculate_centrality(graph: &Webgraph) -> BTreeMap<NodeID, f64> {
-    let nodes: Vec<_> = graph.nodes().collect();
-    info!("Found {} nodes in the graph", nodes.len());
-    let norm_factor = (nodes.len() - 1) as f64;
-
-    let mut counters: IntMap<NodeID, HyperLogLog<HYPERLOGLOG_COUNTERS>> = nodes
-        .iter()
-        .map(|node| {
-            let mut counter = HyperLogLog::default();
-            counter.add(node.bit_64());
-
-            (*node, counter)
-        })
-        .collect();
+    let mut num_nodes = 0;
 
-    let mut exact_counting = false;
-    let mut has_changes = true;
-    let mut t = 0;
-    let mut centralities: IntMap<NodeID, KahanSum> = nodes
-        .iter()
-        .map(|node| (*node, KahanSum::default()))
-        .collect();
+    let mut counters: DashMap<NodeID, HyperLogLog<HYPERLOGLOG_COUNTERS>> = DashMap::new();
+    let centralities: DashMap<NodeID, KahanSum> = DashMap::new();
+
+    for node in graph.nodes() {
+        let mut counter = HyperLogLog::default();
+        counter.add(node.bit_64());
+
+        counters.insert(node, counter);
+        centralities.insert(node, KahanSum::default());
 
-    let mut exact_changed_nodes: IntSet<NodeID> = IntSet::default();
-    let mut changed_nodes = JankyBloomFilter::new(nodes.len() as u64, 0.05);
-    for node in &nodes {
+        num_nodes += 1;
+    }
+
+    let mut changed_nodes = JankyBloomFilter::new(num_nodes as u64, 0.05);
+
+    for node in graph.nodes() {
         changed_nodes.insert(node.bit_64());
     }
 
+    info!("Found {} nodes in the graph", num_nodes);
+    let exact_counting_threshold = (num_nodes as f64).sqrt().max(0.0).round() as u64;
+    let norm_factor = (num_nodes - 1) as f64;
+
+    let mut exact_counting = false;
+    let has_changes = AtomicBool::new(true);
+    let mut t = 0;
+
+    let mut exact_changed_nodes: DashSet<NodeID> = DashSet::default();
+
     loop {
-        if !has_changes {
+        if !has_changes.load(Ordering::Relaxed) {
             break;
         }
 
-        let mut new_counters: IntMap<_, _> = counters.clone();
+        let new_counters: DashMap<_, _> = counters.clone();
 
-        has_changes = false;
-        let mut new_changed_nodes = JankyBloomFilter::new(nodes.len() as u64, 0.05);
+        has_changes.store(false, Ordering::Relaxed);
+        let new_changed_nodes = Mutex::new(JankyBloomFilter::new(num_nodes as u64, 0.05));
 
-        if exact_changed_nodes.len() > 0 {
-            let mut new_exact_changed_nodes = IntSet::default();
+        if !exact_changed_nodes.is_empty()
+            && exact_changed_nodes.len() as u64 <= exact_counting_threshold
+        {
+            let new_exact_changed_nodes = DashSet::default();
 
-            for changed_node in exact_changed_nodes.into_iter() {
+            exact_changed_nodes.par_iter().for_each(|changed_node| {
                 for edge in graph.raw_outgoing_edges(&changed_node) {
-                    if let (Some(counter_to), Some(counter_from)) =
+                    if let (Some(mut counter_to), Some(counter_from)) =
                         (new_counters.get_mut(&edge.to), counters.get(&edge.from))
                     {
                         if counter_to
@@ -132,48 +142,47 @@ fn calculate_centrality(graph: &Webgraph) -> BTreeMap<NodeID, f64> {
                             .zip(counter_from.registers().iter())
                             .any(|(to, from)| *from > *to)
                         {
-                            counter_to.merge(counter_from);
-                            new_changed_nodes.insert(edge.to.bit_64());
+                            counter_to.merge(&counter_from);
+                            new_changed_nodes.lock().unwrap().insert(edge.to.bit_64());
 
                             new_exact_changed_nodes.insert(edge.to);
 
-                            has_changes = true;
+                            has_changes.store(true, Ordering::Relaxed);
                         }
                     }
                 }
-            }
+            });
 
             exact_changed_nodes = new_exact_changed_nodes;
         } else {
-            exact_changed_nodes = IntSet::default();
-            for edge in graph.edges() {
-                if !changed_nodes.contains(&edge.from.bit_64()) {
-                    continue;
-                }
-
-                if let (Some(counter_to), Some(counter_from)) =
-                    (new_counters.get_mut(&edge.to), counters.get(&edge.from))
-                {
-                    if counter_to
-                        .registers()
-                        .iter()
-                        .zip(counter_from.registers().iter())
-                        .any(|(to, from)| *from > *to)
+            exact_changed_nodes = DashSet::default();
+            graph.par_edges().for_each(|edge| {
+                if changed_nodes.contains(&edge.from.bit_64()) {
+                    if let (Some(mut counter_to), Some(counter_from)) =
+                        (new_counters.get_mut(&edge.to), counters.get(&edge.from))
                     {
-                        counter_to.merge(counter_from);
-                        new_changed_nodes.insert(edge.to.bit_64());
+                        if counter_to
+                            .registers()
+                            .iter()
+                            .zip(counter_from.registers().iter())
+                            .any(|(to, from)| *from > *to)
+                        {
+                            counter_to.merge(&counter_from);
+                            new_changed_nodes.lock().unwrap().insert(edge.to.bit_64());
 
-                        if exact_counting {
-                            exact_changed_nodes.insert(edge.to);
-                        }
+                            if exact_counting {
+                                exact_changed_nodes.insert(edge.to);
+                            }
 
-                        has_changes = true;
+                            has_changes.store(true, Ordering::Relaxed);
+                        }
                     }
                 }
-            }
+            })
         }
 
-        for (node, score) in centralities.iter_mut() {
+        centralities.par_iter_mut().for_each(|mut r| {
+            let (node, score) = r.pair_mut();
             *score += new_counters
                 .get(node)
                 .map(|counter| counter.size())
@@ -186,23 +195,27 @@ fn calculate_centrality(graph: &Webgraph) -> BTreeMap<NodeID, f64> {
                 )
                 .unwrap_or_default() as f64
                 / (t + 1) as f64;
-        }
+        });
 
         counters = new_counters;
-        changed_nodes = new_changed_nodes;
+        changed_nodes = new_changed_nodes.into_inner().unwrap();
         t += 1;
 
-        if changed_nodes.estimate_card() <= EXACT_COUNTING_THRESHOLD {
+        if changed_nodes.estimate_card() <= exact_counting_threshold {
             exact_counting = true;
         }
     }
 
-    centralities
+    let res = centralities
         .into_iter()
         .map(|(node_id, sum)| (node_id, f64::from(sum)))
         .filter(|(_, centrality)| *centrality > 0.0)
         .map(|(node_id, centrality)| (node_id, centrality / norm_factor))
-        .collect()
+        .collect();
+
+    info!("Harmonic centrality calculated");
+
+    res
 }
 
 pub struct HarmonicCentrality(BTreeMap<NodeID, f64>);

+ 140 - 113
core/src/webgraph/mod.rs

@@ -23,6 +23,7 @@ use std::sync::{Arc, Mutex};
 use std::{cmp, fs};
 
 use lru::LruCache;
+use rayon::prelude::*;
 use serde::{Deserialize, Serialize};
 use url::Url;
 
@@ -220,8 +221,17 @@ pub struct FullEdge {
     pub label: String,
 }
 
+#[derive(Default, Clone, Copy)]
+pub enum CommitMode {
+    #[default]
+    NewSegment,
+    SingleSegment,
+}
+
 pub struct WebgraphBuilder {
+    commit_mode: CommitMode,
     path: Box<Path>,
+    executor: Executor,
 }
 
 impl WebgraphBuilder {
@@ -235,12 +245,24 @@ impl WebgraphBuilder {
 
     pub fn new<P: AsRef<Path>>(path: P) -> Self {
         Self {
+            commit_mode: CommitMode::default(),
             path: path.as_ref().into(),
+            executor: Executor::multi_thread("webgraph").unwrap(),
         }
     }
 
+    pub fn commit_mode(mut self, mode: CommitMode) -> Self {
+        self.commit_mode = mode;
+        self
+    }
+
+    pub fn single_threaded(mut self) -> Self {
+        self.executor = Executor::single_thread();
+        self
+    }
+
     pub fn open(self) -> Webgraph {
-        Webgraph::open(self.path)
+        Webgraph::open(self.path, self.commit_mode, self.executor)
     }
 }
 
@@ -339,6 +361,7 @@ pub struct Webgraph {
     executor: Arc<Executor>,
     id2node: Store<NodeID, Node>,
     id2node_cache: Mutex<LruCache<NodeID, Node>>,
+    commit_mode: CommitMode,
     meta: Meta,
 }
 
@@ -374,7 +397,7 @@ impl Webgraph {
         writer.write_all(json.as_bytes()).unwrap();
     }
 
-    fn open<P: AsRef<Path>>(path: P) -> Self {
+    fn open<P: AsRef<Path>>(path: P, commit_mode: CommitMode, executor: Executor) -> Self {
         fs::create_dir_all(&path).unwrap();
         let meta = Self::meta(&path);
 
@@ -391,7 +414,8 @@ impl Webgraph {
             path: path.as_ref().as_os_str().to_str().unwrap().to_string(),
             live_segment: LiveSegment::default(),
             segments,
-            executor: Arc::new(Executor::multi_thread("webgraph").unwrap()),
+            commit_mode,
+            executor: Arc::new(executor),
             id2node: Store::open(path.as_ref().join("id2node")),
             id2node_cache: Mutex::new(LruCache::new(10_000.try_into().unwrap())),
             meta,
@@ -416,9 +440,7 @@ impl Webgraph {
     pub fn merge(&mut self, mut other: Webgraph) {
         other.commit();
 
-        for (other_id, node) in other.id2node.iter() {
-            self.id2node.put(&other_id, &node);
-        }
+        self.id2node.batch_put_owned(other.id2node.iter());
 
         for segment in other.segments {
             let id = segment.id();
@@ -436,10 +458,18 @@ impl Webgraph {
     pub fn commit(&mut self) {
         if !self.live_segment.is_empty() {
             let live_segment = std::mem::take(&mut self.live_segment);
-            let segment = live_segment.commit(Path::new(&self.path).join("segments"));
 
-            self.meta.comitted_segments.push(segment.id());
-            self.segments.push(segment);
+            match (self.commit_mode, self.segments.first_mut()) {
+                (CommitMode::SingleSegment, Some(segment)) => {
+                    segment.add(live_segment);
+                }
+                _ => {
+                    let segment = live_segment.commit(Path::new(&self.path).join("segments"));
+
+                    self.meta.comitted_segments.push(segment.id());
+                    self.segments.push(segment);
+                }
+            }
         }
 
         self.save_metadata();
@@ -532,6 +562,12 @@ impl Webgraph {
         self.segments.iter().flat_map(|segment| segment.edges())
     }
 
+    pub fn par_edges(&self) -> impl ParallelIterator<Item = Edge> + '_ {
+        self.segments
+            .par_iter()
+            .flat_map(|segment| segment.edges().par_bridge())
+    }
+
     pub fn merge_segments(&mut self, num_segments: usize) {
         if num_segments >= self.segments.len() {
             return;
@@ -590,6 +626,24 @@ impl Webgraph {
             }
         }
     }
+
+    pub fn move_to<P: AsRef<Path>>(&mut self, new_path: P) {
+        let path = Path::new(&self.path);
+        let new_path = new_path.as_ref();
+
+        if path == new_path {
+            return;
+        }
+
+        fs::rename(path, new_path).unwrap();
+
+        let path = Path::new(&self.path);
+        if path.exists() {
+            fs::remove_dir_all(path).unwrap();
+        }
+
+        self.path = new_path.as_os_str().to_str().unwrap().to_string();
+    }
 }
 
 impl From<FrozenWebgraph> for Webgraph {
@@ -667,6 +721,68 @@ mod test {
         graph
     }
 
+    fn verify_graph(graph: &Webgraph) {
+        let mut res = graph.outgoing_edges(Node::from("A"));
+        res.sort_by(|a, b| a.to.name.cmp(&b.to.name));
+
+        assert_eq!(
+            res,
+            vec![
+                FullEdge {
+                    from: Node::from("A"),
+                    to: Node::from("B"),
+                    label: String::new()
+                },
+                FullEdge {
+                    from: Node::from("A"),
+                    to: Node::from("C"),
+                    label: String::new()
+                }
+            ]
+        );
+
+        let mut res = graph.ingoing_edges(Node::from("C"));
+        res.sort_by(|a, b| a.from.name.cmp(&b.from.name));
+
+        assert_eq!(
+            res,
+            vec![
+                FullEdge {
+                    from: Node::from("A"),
+                    to: Node::from("C"),
+                    label: String::new()
+                },
+                FullEdge {
+                    from: Node::from("B"),
+                    to: Node::from("C"),
+                    label: String::new()
+                },
+                FullEdge {
+                    from: Node::from("D"),
+                    to: Node::from("C"),
+                    label: String::new()
+                },
+            ]
+        );
+
+        let res = graph.outgoing_edges(Node::from("D"));
+
+        assert_eq!(
+            res,
+            vec![FullEdge {
+                from: Node::from("D"),
+                to: Node::from("C"),
+                label: String::new()
+            },]
+        );
+
+        let distances = graph.distances(Node::from("D"));
+
+        assert_eq!(distances.get(&Node::from("C")), Some(&1));
+        assert_eq!(distances.get(&Node::from("A")), Some(&2));
+        assert_eq!(distances.get(&Node::from("B")), Some(&3));
+    }
+
     #[test]
     fn distance_calculation() {
         let graph = test_graph();
@@ -824,117 +940,28 @@ mod test {
         graph.commit();
 
         assert_eq!(num_edges, graph.segments.len());
-
-        let distances = graph.distances(Node::from("D"));
-
-        assert_eq!(distances.get(&Node::from("C")), Some(&1));
-        assert_eq!(distances.get(&Node::from("A")), Some(&2));
-        assert_eq!(distances.get(&Node::from("B")), Some(&3));
-
-        let mut res = graph.outgoing_edges(Node::from("A"));
-        res.sort_by(|a, b| a.to.name.cmp(&b.to.name));
-
-        assert_eq!(
-            res,
-            vec![
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("B"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("C"),
-                    label: String::new()
-                }
-            ]
-        );
-
-        let mut res = graph.ingoing_edges(Node::from("C"));
-        res.sort_by(|a, b| a.from.name.cmp(&b.from.name));
-
-        assert_eq!(
-            res,
-            vec![
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("B"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("D"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-            ]
-        );
+        verify_graph(&graph);
 
         graph.merge_segments(2);
         assert_eq!(graph.segments.len(), 2);
 
-        let mut res = graph.outgoing_edges(Node::from("A"));
-        res.sort_by(|a, b| a.to.name.cmp(&b.to.name));
-
-        assert_eq!(
-            res,
-            vec![
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("B"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("C"),
-                    label: String::new()
-                }
-            ]
-        );
-
-        let mut res = graph.ingoing_edges(Node::from("C"));
-        res.sort_by(|a, b| a.from.name.cmp(&b.from.name));
-
-        assert_eq!(
-            res,
-            vec![
-                FullEdge {
-                    from: Node::from("A"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("B"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-                FullEdge {
-                    from: Node::from("D"),
-                    to: Node::from("C"),
-                    label: String::new()
-                },
-            ]
-        );
+        verify_graph(&graph);
+    }
 
-        let res = graph.outgoing_edges(Node::from("D"));
+    #[test]
+    fn single_segment_commit_mode() {
+        let mut graph = WebgraphBuilder::new_memory()
+            .commit_mode(CommitMode::SingleSegment)
+            .open();
 
-        assert_eq!(
-            res,
-            vec![FullEdge {
-                from: Node::from("D"),
-                to: Node::from("C"),
-                label: String::new()
-            },]
-        );
+        for (from, to, label) in test_edges() {
+            graph.insert(from, to, label);
+            graph.commit();
+        }
 
-        let distances = graph.distances(Node::from("D"));
+        graph.commit();
 
-        assert_eq!(distances.get(&Node::from("C")), Some(&1));
-        assert_eq!(distances.get(&Node::from("A")), Some(&2));
-        assert_eq!(distances.get(&Node::from("B")), Some(&3));
+        assert_eq!(graph.segments.len(), 1);
+        verify_graph(&graph);
     }
 }

+ 37 - 1
core/src/webgraph/segment.rs

@@ -187,7 +187,12 @@ impl StoredSegment {
     }
 
     pub fn merge(segments: Vec<StoredSegment>) -> Self {
-        debug_assert!(segments.len() > 1);
+        debug_assert!(!segments.is_empty());
+
+        if segments.len() == 1 {
+            let mut segments = segments;
+            return segments.pop().unwrap();
+        }
 
         let new_segment_id = uuid::Uuid::new_v4().to_string();
 
@@ -239,6 +244,37 @@ impl StoredSegment {
 
         res
     }
+
+    pub fn add(&mut self, live_segment: LiveSegment) {
+        for (node_id, adjacency) in live_segment.adjacency {
+            let mut existing = self.full_adjacency.get(&node_id).unwrap_or_default();
+            existing.extend(adjacency);
+            self.full_adjacency.put(&node_id, &existing);
+            self.small_adjacency.put(
+                &node_id,
+                &existing
+                    .into_iter()
+                    .map(|edge| SmallStoredEdge { other: edge.other })
+                    .collect::<HashSet<_>>(),
+            );
+        }
+
+        for (node_id, adjacency) in live_segment.reversed_adjacency {
+            let mut existing = self
+                .full_reversed_adjacency
+                .get(&node_id)
+                .unwrap_or_default();
+            existing.extend(adjacency);
+            self.full_reversed_adjacency.put(&node_id, &existing);
+            self.small_reversed_adjacency.put(
+                &node_id,
+                &existing
+                    .into_iter()
+                    .map(|edge| SmallStoredEdge { other: edge.other })
+                    .collect::<HashSet<_>>(),
+            );
+        }
+    }
 }
 
 #[derive(Default)]

+ 15 - 1
core/src/webgraph/store.rs

@@ -25,7 +25,7 @@ pub struct Store<K, V> {
 
 impl<K, V> Store<K, V>
 where
-    K: serde::de::DeserializeOwned + serde::Serialize,
+    K: serde::de::DeserializeOwned + serde::Serialize + Send,
     V: serde::de::DeserializeOwned + serde::Serialize,
 {
     pub fn open<P: AsRef<Path>>(path: P) -> Self {
@@ -77,6 +77,19 @@ where
     pub fn batch_put<'a>(&'a self, it: impl Iterator<Item = (&'a K, &'a V)>) {
         let mut batch = rocksdb::WriteBatch::default();
 
+        for (key, value) in it {
+            let key_bytes = bincode::serialize(key).unwrap();
+            let value_bytes = bincode::serialize(value).unwrap();
+
+            batch.put(key_bytes, value_bytes);
+        }
+
+        self.db.write(batch).unwrap();
+    }
+
+    pub fn batch_put_owned(&self, it: impl Iterator<Item = (K, V)>) {
+        let mut batch = rocksdb::WriteBatch::default();
+
         for (key, value) in it {
             let key_bytes = bincode::serialize(&key).unwrap();
             let value_bytes = bincode::serialize(&value).unwrap();
@@ -91,6 +104,7 @@ where
         let mut read_opts = rocksdb::ReadOptions::default();
 
         read_opts.set_readahead_size(4_194_304); // 4 MB
+        read_opts.set_pin_data(true);
 
         self.db
             .iterator_opt(rocksdb::IteratorMode::Start, read_opts)

+ 5 - 1
core/src/webpage/mod.rs

@@ -14,6 +14,7 @@
 // You should have received a copy of the GNU Affero General Public License
 // along with this program.  If not, see <https://www.gnu.org/licenses/>.
 use crate::{
+    ceil_char_boundary,
     enum_map::EnumSet,
     prehashed::hash,
     schema::{FastField, TextField},
@@ -1007,7 +1008,10 @@ impl Html {
                             .domain()
                             .unwrap_or_default()
                             .find('.')
-                            .map(|index| &domain.text[..index])
+                            .map(|index| {
+                                &domain.text[..ceil_char_boundary(&domain.text, index)
+                                    .min(domain.text.len())]
+                            })
                             .unwrap_or_default();
 
                         doc.add_pre_tokenized_text(

+ 1 - 0
docs/.gitignore

@@ -0,0 +1 @@
+site

+ 36 - 0
docs/mkdocs.yml

@@ -0,0 +1,36 @@
+site_name: Overview Docs
+docs_dir: src
+
+nav:
+  - Introduction: index.md
+  - Overview: overview.md
+  - Crawler: crawler.md
+  - Webgraph: webgraph.md
+  - Search Index: search_index.md
+  - Optics: ()
+
+theme:
+  name: material
+  logo: assets/images/biglogo.svg
+  favicon: assets/images/favicon.ico
+  features:
+    - navigation.expand
+  palette:
+    primary: white
+    accent: light blue
+
+extra_javascript:
+  - assets/js/katex.js
+  - https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.16.7/katex.min.js
+  - https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.16.7/contrib/auto-render.min.js
+
+extra_css:
+  - assets/styles/main.css
+  - https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.16.7/katex.min.css
+
+extra:
+  expand: true
+  generator: false
+  homepage: https://trystract.com
+plugins:
+  - search

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 2 - 0
docs/src/assets/images/biglogo.svg


Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 6 - 0
docs/src/assets/images/crawler_overview.svg


binární
docs/src/assets/images/favicon.ico


+ 12 - 0
docs/src/assets/js/katex.js

@@ -0,0 +1,12 @@
+document$.subscribe(({ body }) => {
+
+
+    renderMathInElement(body, {
+        delimiters: [
+            { left: "$$", right: "$$", display: true },
+            { left: "$", right: "$", display: false },
+            { left: "\\(", right: "\\)", display: false },
+            { left: "\\[", right: "\\]", display: true }
+        ],
+    })
+})

+ 18 - 0
docs/src/assets/styles/main.css

@@ -0,0 +1,18 @@
+.md-header__title {
+    font-size: 0.8rem;
+}
+
+.md-logo>img {
+    width: 100% !important;
+    height: 100% !important;
+}
+
+.md-logo {
+    width: 5rem;
+    height: auto;
+}
+
+.md-footer,
+.md-footer-meta {
+    background-color: white !important;
+}

+ 27 - 0
docs/src/crawler.md

@@ -0,0 +1,27 @@
+# Crawler
+![Overview of Crawler Architecture](assets/images/crawler_overview.svg)
+The crawler is a distributed system that scours the web. It has a coordinator process that determines which URLs to crawl and a set of worker processes that fetch the content of those URLs. Each worker receives a batch of crawl jobs to process, stores the fetched contents in an S3 bucket and sends newly discovered URLs back to the coordinator. This continues until the coordinator has determined that the crawl is complete.
+
+## Coordinator
+This is the brains of the crawl operation. The coordinator is responsible for determining which URLs to crawl and distributing them to the workers.
+
+### URL Frontier
+The coordinator starts with a list of seed urls, schedules these to the available workers and receives a list of newly discovered urls from each worker. These newly discovered urls are added to the url frontier, which is a list of urls to crawl.
+
+You can imagine that the url frontier can grow very large, very quickly. This begs the question: How does the coordinator determine which urls to crawl next? We could just crawl the urls in the order they were discovered, but this might not lead to the most interesting results.
+
+Instead, the coordinator assigns a score to each url and performs a weighted random selection of the next url to crawl. The score is determined by the number of incoming links to the url from other urls on different domains. The more incoming links a url has from other domains, the higher its score and the more likely it is to be selected for crawling.
+This prioritizes urls that are more likely to be interesting to the user. After a url has been chosen, we again sample from the url frontier but this time only choosing urls from the same domain as the chosen url. This ensures that we get a fairly good coverage of the domain before moving on to the next one.
+
+The sampled urls are then scheduled to the available workers and the process repeats.
+
+### Respectfullness
+It is of utmost importance that we are respectful of the websites we crawl. We do not want to overload a website with requests and we do not want to crawl pages from the website that the website owner does not want us to crawl.
+
+When a domain has been sampled it is therefore marked as `CrawlInProgress` until the worker sends results back to the coordinator for the job it was assigned. This ensures that each domain is only scheduled to a single worker at a time. It is then the responsibility of the worker to respect the `robots.txt` file of the domain and to not overload the domain with requests.
+
+## Worker
+The worker is quite simple and is responsible for fetching data from urls scheduled by the coordinator. It is completely stateless and stores the fetched data directly to an S3 bucket while sending newly discovered urls back to the coordinator.
+
+When a worker is tasked to crawl a new site, it first checks the `robots.txt` file for the site to see which urls (if any) it is allowed to crawl.
+If the worker receives a `429 Too Many Requests` response from the site, it backs off for a while before trying again. The specific backoff time depends on how fast the server responds. Further details can be found [here](https://trystract.com/webmasters).

+ 10 - 0
docs/src/index.md

@@ -0,0 +1,10 @@
+# Introduction
+Stract is an open source web search engine written in Rust. It is designed to be fast, customizable, easy to use and scalable.
+
+This documentation provides a high-level overview of the project and its components. For more detailed information, please refer to the documentation directly in the source code.
+
+# Contributing
+We welcome and greatly appreciate contributions of all kinds. Please refer to [CONTRIBUTING.md](https://github.com/StractOrg/stract/blob/main/CONTRIBUTING.md) for more information on how you can contribute to the project.
+
+# License
+Stract is offered under the terms defined under the [LICENSE.md](https://github.com/StractOrg/stract/blob/main/LICENSE.md) file.

+ 11 - 0
docs/src/overview.md

@@ -0,0 +1,11 @@
+# Overview
+Stract (and most other web search engines) is composed of three main components: the crawler, the webgraph and the search index.
+
+## Crawler
+The crawler, often also referred to as a spider or bot, is the component responsible for collecting and scanning websites across the internet. It begins with a seed list of URLs, which it visits to fetch web pages. The crawler then parses these pages to extract additional URLs, which are then added to the list of URLs to be crawled in the future. This process repeats in a cycle, allowing the crawler to discover new web pages or updates to existing pages continuously. The content fetched by the crawler is passed on to the next components of the search engine: the webgraph and the search index.
+
+## Webgraph
+The webgraph is a data structure that represents the relationships between different web pages. Each node in the webgraph represents a unique web page, and each edge represents a hyperlink from one page to another. The webgraph helps the search engine understand the structure of the web and the authority of different web pages. Authority is determined by factors such as the number of other pages linking to a given page (also known as "backlinks"), which is an important factor in ranking search results. This concept is often referred to as "link analysis."
+
+## Search Index
+The search index is the component that facilitates fast and accurate search results. It is akin to the index at the back of a book, providing a direct mapping from words or phrases to the web pages in which they appear. This data structure is often referred to as an "inverted index". The search index is designed to handle complex search queries and return relevant results in a fraction of a second. The index uses the information gathered by the crawler and the structure of the webgraph to rank search results according to their relevance.

+ 1 - 0
docs/src/search_index.md

@@ -0,0 +1 @@
+# Search Index

+ 34 - 0
docs/src/webgraph.md

@@ -0,0 +1,34 @@
+# Webgraph
+The webgraph, often conceptualized as the "internet's map," provides a structured view of the interconnectedness of pages across the World Wide Web. With billions of pages linked together, the webgraph is a crucial tool for understanding the structure, pattern, and dynamics of the internet.
+
+There are two primary ways of constructing the webgraph:
+
+- **Page-Level Webgraph**: This method involves constructing the graph by analyzing individual pages and their outbound links. The nodes in this graph represent individual web pages, while the edges represent hyperlinks between them. This detailed view is especially helpful for understanding specific page connections.
+
+- **Host-Level Webgraph**: Instead of examining individual pages, this approach consolidates all the links associated with a particular host, effectively simplifying the webgraph. In this representation, nodes represent entire websites or hosts, and edges represent connections between them. This broader perspective is suitable for understanding the authority and influence of entire websites.
+
+## Segments
+Given the extreme size of the internet, managing the webgraph as a single monolithic structure in memory is neither efficient nor practical. Thus, it's segmented into smaller parts called segments. Each segment is essentially a portion of the overall webgraph stored in a [RocksDB](https://rocksdb.org/) database on disk. This allows us to create webgraphs that are much larger than what we would otherwise be able to fit in memory.
+
+## Webgraph Uses
+The structure of the web can provide highly valuable information when detemining the relevance of a page to a user's search query. PageRank, which is a centrality meassure developed by Larry Page and Sergey Brin, was one of the primary reasons why Google provided much better search results than their competitors in the early days.
+
+Stract uses a similar centrality meassure called Harmonic Centrality which has been shown to satisfy some useful axioms for centrality ([paper](https://arxiv.org/abs/1308.2140)).
+
+### Harmonic Centrality
+Harmonic centrality is a measure used to identify the importance of a node within a network. In the context of a webgraph, nodes (whether they be individual pages or entire hosts) that have a high harmonic centrality are ones that are, on average, closer to all other nodes in the network. The closeness of a node in this context refers to its average distance from all other nodes.
+
+In practical terms, a web page with high harmonic centrality might be seen as an influential page in the World Wide Web, indicating that it can be reached with fewer clicks, on average, from any other page on the internet. A page with high harmonic centrality therefore has higher likelihood for being relevant to a user's search query. 
+
+In formulaic terms, the harmonic centrality $C_{H}(u)$ of a node $u$ is calculated as the sum of the reciprocals of the shortest paths from all nodes to $u$:
+
+$$C_{H}(u) = \frac{1}{n-1} \sum_{v \neq u} \frac{1}{d(v,u)}$$
+
+Where $d(v,u)$ is the shortest path from node $v$ to node $u$. We normalize the harmonic centrality by dividing by the number of nodes in the network minus one.
+
+### Inbound Similarity
+Inbound similarity plays a crucial role in enhancing personalized search results. Based on whether a user likes or dislikes results from a certain site, we can adjust the ranking of results from similar sites based on their preferences. The idea is that the similarity between two sites can be estimated by which sites that links to those sites. Two sites that has a lot of incoming links in common are likely to be topically similar.
+
+Let's denote the set of inbound links for site $u$ as $I_{u}$ and $v$ for $I_{v}$. The similarity between two sites is calculated as the cosine similarity between their inbound link vectors:
+$$S(u, v) = \frac{I_{u} \cdot I_{v}}{\|\|I_{u}\|\|\|\|I_{v}\|\|}$$
+

Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů