|
@@ -19,7 +19,6 @@ use futures::StreamExt;
|
|
use indicatif::ProgressIterator;
|
|
use indicatif::ProgressIterator;
|
|
use itertools::Itertools;
|
|
use itertools::Itertools;
|
|
use rayon::{prelude::*, ThreadPoolBuilder};
|
|
use rayon::{prelude::*, ThreadPoolBuilder};
|
|
-use rustc_hash::FxHashSet;
|
|
|
|
use std::cmp::Reverse;
|
|
use std::cmp::Reverse;
|
|
use std::collections::BTreeMap;
|
|
use std::collections::BTreeMap;
|
|
use std::hash::{Hash, Hasher};
|
|
use std::hash::{Hash, Hasher};
|
|
@@ -49,7 +48,6 @@ use super::Domain;
|
|
const MAX_UNCOMMITTED_INSERTS_PER_GROUP: usize = 50_000;
|
|
const MAX_UNCOMMITTED_INSERTS_PER_GROUP: usize = 50_000;
|
|
const NUM_GROUPS: usize = 1024;
|
|
const NUM_GROUPS: usize = 1024;
|
|
const CONCURRENCY_LIMIT: usize = 32;
|
|
const CONCURRENCY_LIMIT: usize = 32;
|
|
-const CONVERT_HIGH_WANDER_BUDGET: bool = false;
|
|
|
|
|
|
|
|
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, Hash)]
|
|
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, Hash)]
|
|
struct StoredUrl(#[bincode(with_serde)] Url);
|
|
struct StoredUrl(#[bincode(with_serde)] Url);
|
|
@@ -205,7 +203,7 @@ impl CrawlPlanner {
|
|
.map(|id| self.host_centrality.get(&id).unwrap().unwrap_or_default())
|
|
.map(|id| self.host_centrality.get(&id).unwrap().unwrap_or_default())
|
|
.sum();
|
|
.sum();
|
|
|
|
|
|
- let mut wander_budget = ((wander_budget * host_centrality) / total_host_centralities)
|
|
|
|
|
|
+ let wander_budget = ((wander_budget * host_centrality) / total_host_centralities)
|
|
.max(1.0)
|
|
.max(1.0)
|
|
.round() as u64;
|
|
.round() as u64;
|
|
|
|
|
|
@@ -230,53 +228,6 @@ impl CrawlPlanner {
|
|
urls.sort_by(|a, b| a.weight.total_cmp(&b.weight));
|
|
urls.sort_by(|a, b| a.weight.total_cmp(&b.weight));
|
|
urls.reverse();
|
|
urls.reverse();
|
|
|
|
|
|
- if CONVERT_HIGH_WANDER_BUDGET {
|
|
|
|
- // convert some of the wander budget to scheduled urls if the wander budget is too large
|
|
|
|
- if wander_budget > urls.len() as u64 {
|
|
|
|
- // we should maybe convert the budget on a per-host basis
|
|
|
|
- // instead of aggregating all hosts for the domain. some blog sites
|
|
|
|
- // like wordpress/blogspot/github.io etc. will have lots of hosts,
|
|
|
|
- // and if we simply convert the budget for the domain, we might end up
|
|
|
|
- // with a lot of urls from the same host.
|
|
|
|
-
|
|
|
|
- let mut bloom = bloom::U64BloomFilter::new(urls.len() as u64, 0.05);
|
|
|
|
- let mut hosts = FxHashSet::default();
|
|
|
|
- for url in &urls {
|
|
|
|
- let node = Node::from(url.url.clone());
|
|
|
|
- bloom.insert(node.id().as_u64());
|
|
|
|
- hosts.insert(node.into_host().id());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- let budget_to_convert = wander_budget - urls.len() as u64;
|
|
|
|
-
|
|
|
|
- let hosts = hosts.into_iter().collect::<Vec<_>>();
|
|
|
|
- let nodes: Vec<_> = crate::block_on(self.page_graph.pages_by_hosts(&hosts))
|
|
|
|
- .unwrap_or_default()
|
|
|
|
- .into_iter()
|
|
|
|
- .filter(|node| !bloom.contains(node.as_u64()))
|
|
|
|
- .take(budget_to_convert as usize)
|
|
|
|
- .collect();
|
|
|
|
-
|
|
|
|
- let nodes =
|
|
|
|
- crate::block_on(self.page_graph.batch_get_node(&nodes)).unwrap_or_default();
|
|
|
|
-
|
|
|
|
- let mut new_scheduled = 0;
|
|
|
|
- for node in nodes.into_iter().flatten() {
|
|
|
|
- if let Ok(url) = Url::parse(&format!("https://{}", node.as_str())) {
|
|
|
|
- urls.push(WeightedUrl { url, weight: 0.0 });
|
|
|
|
- new_scheduled += 1;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- tracing::info!(
|
|
|
|
- "converted {} wander budget to scheduled urls",
|
|
|
|
- new_scheduled
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- wander_budget -= new_scheduled as u64;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
let job = Job {
|
|
let job = Job {
|
|
domain: domain.clone(),
|
|
domain: domain.clone(),
|
|
urls: urls.into_iter().collect(),
|
|
urls: urls.into_iter().collect(),
|