Speed up domain parsing
This commit is contained in:
parent
2b36f2ccc1
commit
66700f8a3e
5 changed files with 57 additions and 6 deletions
35
analyse/url_queue.py
Normal file
35
analyse/url_queue.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import pickle
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from queue import Queue
|
||||||
|
|
||||||
|
from mwmbl.url_queue import URLQueue
|
||||||
|
|
||||||
|
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT)
|
||||||
|
|
||||||
|
|
||||||
|
def run_url_queue():
|
||||||
|
data = pickle.load(open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "rb"))
|
||||||
|
print("First URLs", [x.url for x in data[:1000]])
|
||||||
|
|
||||||
|
new_item_queue = Queue()
|
||||||
|
queued_batches = Queue()
|
||||||
|
queue = URLQueue(new_item_queue, queued_batches)
|
||||||
|
|
||||||
|
new_item_queue.put(data)
|
||||||
|
|
||||||
|
start = datetime.now()
|
||||||
|
queue.update()
|
||||||
|
total_time = (datetime.now() - start).total_seconds()
|
||||||
|
print(f"Total time: {total_time}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
run_url_queue()
|
Binary file not shown.
|
@ -24,6 +24,10 @@ def run(batch_cache: BatchCache, start_status: BatchStatus, end_status: BatchSta
|
||||||
batch_data = batch_cache.get_cached([batch.url for batch in batches])
|
batch_data = batch_cache.get_cached([batch.url for batch in batches])
|
||||||
logger.info(f"Got {len(batch_data)} cached batches")
|
logger.info(f"Got {len(batch_data)} cached batches")
|
||||||
|
|
||||||
|
missing_batches = {batch.url for batch in batches} - batch_data.keys()
|
||||||
|
logger.info(f"Got {len(missing_batches)} missing batches")
|
||||||
|
index_db.update_batch_status(list(missing_batches), BatchStatus.REMOTE)
|
||||||
|
|
||||||
process(batch_data.values(), *args)
|
process(batch_data.values(), *args)
|
||||||
|
|
||||||
index_db.update_batch_status(list(batch_data.keys()), end_status)
|
index_db.update_batch_status(list(batch_data.keys()), end_status)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
from multiprocessing import Process, Queue
|
from multiprocessing import Process, Queue
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -8,7 +7,7 @@ from pathlib import Path
|
||||||
import uvicorn
|
import uvicorn
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
from mwmbl import background, url_queue
|
from mwmbl import background
|
||||||
from mwmbl.crawler import app as crawler
|
from mwmbl.crawler import app as crawler
|
||||||
from mwmbl.indexer.batch_cache import BatchCache
|
from mwmbl.indexer.batch_cache import BatchCache
|
||||||
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
|
from mwmbl.indexer.paths import INDEX_NAME, BATCH_DIR_NAME
|
||||||
|
@ -16,9 +15,10 @@ from mwmbl.tinysearchengine import search
|
||||||
from mwmbl.tinysearchengine.completer import Completer
|
from mwmbl.tinysearchengine.completer import Completer
|
||||||
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
|
from mwmbl.tinysearchengine.indexer import TinyIndex, Document, PAGE_SIZE
|
||||||
from mwmbl.tinysearchengine.rank import HeuristicRanker
|
from mwmbl.tinysearchengine.rank import HeuristicRanker
|
||||||
from mwmbl.url_queue import URLQueue, update_queue_continuously
|
from mwmbl.url_queue import update_queue_continuously
|
||||||
|
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
FORMAT = '%(levelname)s %(name)s %(asctime)s %(message)s'
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=FORMAT)
|
||||||
|
|
||||||
|
|
||||||
MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
|
MODEL_PATH = Path(__file__).parent / 'resources' / 'model.pickle'
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
|
import os
|
||||||
|
import pickle
|
||||||
import random
|
import random
|
||||||
|
import re
|
||||||
import time
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from multiprocessing import Queue
|
from multiprocessing import Queue
|
||||||
|
from pathlib import Path
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import KeysView, Union
|
from typing import KeysView, Union
|
||||||
|
@ -29,6 +33,8 @@ MAX_URLS_PER_TOP_DOMAIN = 100
|
||||||
MAX_URLS_PER_OTHER_DOMAIN = 5
|
MAX_URLS_PER_OTHER_DOMAIN = 5
|
||||||
MAX_OTHER_DOMAINS = 10000
|
MAX_OTHER_DOMAINS = 10000
|
||||||
|
|
||||||
|
DOMAIN_REGEX = re.compile(r".*://([^/]*)")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class URLScore:
|
class URLScore:
|
||||||
|
@ -66,6 +72,11 @@ class URLQueue:
|
||||||
return num_processed
|
return num_processed
|
||||||
|
|
||||||
def _process_found_urls(self, found_urls: list[FoundURL]):
|
def _process_found_urls(self, found_urls: list[FoundURL]):
|
||||||
|
logger.info("Processing found URLs")
|
||||||
|
# with open(Path(os.environ["HOME"]) / "data" / "mwmbl" / "found-urls.pickle", "wb") as output_file:
|
||||||
|
# pickle.dump(found_urls, output_file)
|
||||||
|
# logger.info("Dumped")
|
||||||
|
|
||||||
min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
|
min_updated_date = datetime.utcnow() - timedelta(hours=REASSIGN_MIN_HOURS)
|
||||||
|
|
||||||
logger.info(f"Found URLS: {len(found_urls)}")
|
logger.info(f"Found URLS: {len(found_urls)}")
|
||||||
|
@ -87,10 +98,12 @@ class URLQueue:
|
||||||
|
|
||||||
def _sort_urls(self, valid_urls: list[FoundURL]):
|
def _sort_urls(self, valid_urls: list[FoundURL]):
|
||||||
for found_url in valid_urls:
|
for found_url in valid_urls:
|
||||||
domain = urlparse(found_url.url).hostname
|
domain = DOMAIN_REGEX.search(found_url.url)[0]
|
||||||
url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
|
url_store = self._top_urls if domain in TOP_DOMAINS else self._other_urls
|
||||||
url_store[domain].append(URLScore(found_url.url, found_url.score))
|
url_store[domain].append(URLScore(found_url.url, found_url.score))
|
||||||
|
|
||||||
|
logger.info(f"URL store updated: {len(self._top_urls)} top domains, {len(self._other_urls)} other domains")
|
||||||
|
|
||||||
_sort_and_limit_urls(self._top_urls, MAX_TOP_URLS)
|
_sort_and_limit_urls(self._top_urls, MAX_TOP_URLS)
|
||||||
_sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS)
|
_sort_and_limit_urls(self._other_urls, MAX_OTHER_URLS)
|
||||||
|
|
||||||
|
@ -125,7 +138,6 @@ def _sort_and_limit_urls(domain_urls: dict[str, list[str]], max_urls: int):
|
||||||
def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int):
|
def _add_urls(domains: Union[set[str], KeysView], domain_urls: dict[str, list[URLScore]], urls: list[str], max_urls: int):
|
||||||
for domain in list(domains & domain_urls.keys()):
|
for domain in list(domains & domain_urls.keys()):
|
||||||
new_urls = domain_urls[domain][:max_urls]
|
new_urls = domain_urls[domain][:max_urls]
|
||||||
logger.info(f"Adding URLs {new_urls}")
|
|
||||||
urls += [url_score.url for url_score in new_urls]
|
urls += [url_score.url for url_score in new_urls]
|
||||||
new_domain_urls = domain_urls[domain][max_urls:]
|
new_domain_urls = domain_urls[domain][max_urls:]
|
||||||
if len(new_domain_urls) > 0:
|
if len(new_domain_urls) > 0:
|
||||||
|
|
Loading…
Reference in a new issue