Просмотр исходного кода

Merge pull request #112 from mwmbl/stats

Stats
Daoud Clarke 1 год назад
Родитель
Сommit
826d3d6ba9
4 измененных файлов с 606 добавлено и 348 удалено
  1. 14 2
      mwmbl/crawler/app.py
  2. 112 0
      mwmbl/crawler/stats.py
  3. 479 346
      poetry.lock
  4. 1 0
      pyproject.toml

+ 14 - 2
mwmbl/crawler/app.py

@@ -1,6 +1,7 @@
 import gzip
 import hashlib
 import json
+import os
 from datetime import datetime, timezone, date
 from queue import Queue, Empty
 from typing import Union
@@ -13,8 +14,10 @@ from fastapi import HTTPException, APIRouter
 from justext.core import html_to_dom, ParagraphMaker, classify_paragraphs, revise_paragraph_classification, \
     LENGTH_LOW_DEFAULT, STOPWORDS_LOW_DEFAULT, MAX_LINK_DENSITY_DEFAULT, NO_HEADINGS_DEFAULT, LENGTH_HIGH_DEFAULT, \
     STOPWORDS_HIGH_DEFAULT, MAX_HEADING_DISTANCE_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, preprocessor
+from redis import Redis
 
 from mwmbl.crawler.batch import Batch, NewBatchRequest, HashedBatch
+from mwmbl.crawler.stats import MwmblStats, StatsManager
 from mwmbl.crawler.urls import URLDatabase, FoundURL, URLStatus
 from mwmbl.database import Database
 from mwmbl.format import format_result
@@ -31,9 +34,11 @@ from mwmbl.settings import (
     PUBLIC_URL_PREFIX,
     PUBLIC_USER_ID_LENGTH,
     FILE_NAME_SUFFIX,
-    DATE_REGEX, NUM_EXTRACT_CHARS, NUM_TITLE_CHARS)
+    DATE_REGEX, NUM_EXTRACT_CHARS)
 from mwmbl.tinysearchengine.indexer import Document
-from mwmbl.url_queue import URLQueue
+
+
+stats_manager = StatsManager(Redis.from_url(os.environ.get("REDIS_URL")))
 
 
 def get_bucket(name):
@@ -128,6 +133,9 @@ def get_router(batch_cache: BatchCache, queued_batches: Queue):
         # Using an approach from https://stackoverflow.com/a/30476450
         epoch_time = (now - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()
         hashed_batch = HashedBatch(user_id_hash=user_id_hash, timestamp=epoch_time, items=batch.items)
+
+        stats_manager.record_batch(hashed_batch)
+
         data = gzip.compress(hashed_batch.json().encode('utf8'))
         upload(data, filename)
 
@@ -191,6 +199,10 @@ def get_router(batch_cache: BatchCache, queued_batches: Queue):
         prefix = f'1/{VERSION}/{date_str}/1/'
         return get_subfolders(prefix)
 
+    @router.get('/stats')
+    def get_stats() -> MwmblStats:
+        return stats_manager.get_stats()
+
     @router.get('/')
     def status():
         return {

+ 112 - 0
mwmbl/crawler/stats.py

@@ -0,0 +1,112 @@
+import gzip
+from datetime import datetime
+from glob import glob
+from itertools import islice
+from logging import getLogger
+from urllib.parse import urlparse
+
+from pydantic import BaseModel
+from redis import Redis
+
+from mwmbl.crawler.batch import HashedBatch
+from mwmbl.indexer.update_urls import get_datetime_from_timestamp
+
+logger = getLogger(__name__)
+
+URL_DATE_COUNT_KEY = "url-count-{date}"
+URL_HOUR_COUNT_KEY = "url-count-hour-{hour}"
+USER_COUNT_KEY = "user-count-{date}"
+HOST_COUNT_KEY = "host-count-{date}"
+EXPIRE_SECONDS = 60*60*24
+
+
+class MwmblStats(BaseModel):
+    urls_crawled_today: int
+    urls_crawled_hourly: list[int]
+    top_users: dict[str, int]
+    top_domains: dict[str, int]
+
+
+class StatsManager:
+    def __init__(self, redis: Redis):
+        self.redis = redis
+
+    def record_batch(self, hashed_batch: HashedBatch):
+        date_time = get_datetime_from_timestamp(hashed_batch.timestamp)
+
+        num_crawled_urls = sum(1 for item in hashed_batch.items if item.content is not None)
+
+        url_count_key = URL_DATE_COUNT_KEY.format(date=date_time.date())
+        self.redis.incrby(url_count_key, num_crawled_urls)
+        self.redis.expire(url_count_key, EXPIRE_SECONDS)
+
+        hour = datetime(date_time.year, date_time.month, date_time.day, date_time.hour)
+        hour_key = URL_HOUR_COUNT_KEY.format(hour=hour)
+        self.redis.incrby(hour_key, num_crawled_urls)
+        self.redis.expire(hour_key, EXPIRE_SECONDS)
+
+        user_count_key = USER_COUNT_KEY.format(date=date_time.date())
+        self.redis.zincrby(user_count_key, num_crawled_urls, hashed_batch.user_id_hash)
+        self.redis.expire(user_count_key, EXPIRE_SECONDS)
+
+        host_key = HOST_COUNT_KEY.format(date=date_time.date())
+        for item in hashed_batch.items:
+            if item.content is None:
+                continue
+
+            host = urlparse(item.url).netloc
+            self.redis.zincrby(host_key, 1, host)
+        self.redis.expire(host_key, EXPIRE_SECONDS)
+
+    def get_stats(self) -> MwmblStats:
+        date_time = datetime.now()
+        date = date_time.date()
+        url_count_key = URL_DATE_COUNT_KEY.format(date=date)
+        url_count = self.redis.get(url_count_key)
+
+        if url_count is None:
+            url_count = 0
+
+        hour_counts = []
+        for i in range(date_time.hour + 1):
+            hour = datetime(date_time.year, date_time.month, date_time.day, i)
+            hour_key = URL_HOUR_COUNT_KEY.format(hour=hour)
+            hour_count = self.redis.get(hour_key)
+            if hour_count is None:
+                hour_count = 0
+            hour_counts.append(hour_count)
+
+        user_count_key = USER_COUNT_KEY.format(date=date_time.date())
+        user_counts = self.redis.zrevrange(user_count_key, 0, 100, withscores=True)
+
+        host_key = HOST_COUNT_KEY.format(date=date_time.date())
+        host_counts = self.redis.zrevrange(host_key, 0, 100, withscores=True)
+
+        return MwmblStats(
+            urls_crawled_today=url_count,
+            urls_crawled_hourly=hour_counts,
+            top_users=user_counts,
+            top_domains=host_counts,
+        )
+
+
+def get_test_batches():
+    for path in glob("./devdata/batches/**/*.json.gz", recursive=True):
+        print("Processing path", path)
+        with gzip.open(path) as gzip_file:
+            yield HashedBatch.parse_raw(gzip_file.read())
+
+
+if __name__ == '__main__':
+    redis = Redis(host='localhost', port=6379, decode_responses=True)
+    stats = StatsManager(redis)
+    batches = get_test_batches()
+    start = datetime.now()
+    processed = 0
+    for batch in islice(batches, 100):
+        stats.record_batch(batch)
+        processed += 1
+    total_time = (datetime.now() - start).total_seconds()
+    print("Processed", processed)
+    print("Total time", total_time)
+    print("Time per batch", total_time/processed)

Разница между файлами не показана из-за своего большого размера
+ 479 - 346
poetry.lock


+ 1 - 0
pyproject.toml

@@ -34,6 +34,7 @@ pyarrow = {version= "==6.0.0", optional = true}
 pyspark = {version= "==3.2.0", optional = true}
 Levenshtein = {version= "==0.16.0", optional = true}
 requests-cache = "^1.1.0"
+redis = {extras = ["hiredis"], version = "^5.0.1"}
 
 [tool.poetry.extras]
 indexer = [

Некоторые файлы не были показаны из-за большого количества измененных файлов