import re from logging import getLogger from operator import itemgetter from pathlib import Path from fastapi import FastAPI from starlette.responses import FileResponse from starlette.staticfiles import StaticFiles from tinysearchengine.indexer import TinyIndex, Document logger = getLogger(__name__) STATIC_FILES_PATH = Path(__file__).parent / 'static' SCORE_THRESHOLD = 0.25 def create(tiny_index: TinyIndex): app = FastAPI() @app.get("/search") def search(s: str): results, terms = get_results(s) formatted_results = [] for result in results: pattern = get_query_regex(terms) formatted_result = {} for content_type, content in [('title', result.title), ('extract', result.extract)]: matches = re.finditer(pattern, content, re.IGNORECASE) all_spans = [0] + sum((list(m.span()) for m in matches), []) + [len(content)] content_result = [] for i in range(len(all_spans) - 1): is_bold = i % 2 == 1 start = all_spans[i] end = all_spans[i + 1] content_result.append({'value': content[start:end], 'is_bold': is_bold}) formatted_result[content_type] = content_result formatted_result['url'] = result.url formatted_results.append(formatted_result) logger.info("Return results: %r", formatted_results) return formatted_results def get_query_regex(terms): term_patterns = [rf'\b{term}\b' for term in terms] pattern = '|'.join(term_patterns) return pattern def score_result(terms, result: Document): result_string = f"{result.title.strip()} {result.extract.strip()}" query_regex = get_query_regex(terms) matches = list(re.finditer(query_regex, result_string, flags=re.IGNORECASE)) match_strings = {x.group(0).lower() for x in matches} match_length = sum(len(x) for x in match_strings) last_match_char = 1 seen_matches = set() for match in matches: value = match.group(0).lower() if value not in seen_matches: last_match_char = match.span()[1] seen_matches.add(value) total_possible_match_length = sum(len(x) for x in terms) score = (match_length + 1./last_match_char) / (total_possible_match_length + 1) return score def order_results(terms: list[str], results: list[Document]): results_and_scores = [(score_result(terms, result), result) for result in results] ordered_results = sorted(results_and_scores, key=itemgetter(0), reverse=True) filtered_results = [result for score, result in ordered_results if score > SCORE_THRESHOLD] return filtered_results @app.get("/complete") def complete(q: str): ordered_results, terms = get_results(q) results = [item.title.replace("\n", "") + ' — ' + item.url.replace("\n", "") for item in ordered_results] if len(results) == 0: return [] return [q, results] def get_results(q): terms = [x.lower() for x in q.replace('.', ' ').split()] pages = [] seen_items = set() for term in terms: items = tiny_index.retrieve(term) if items is not None: for item in items: if term in item.title.lower() or term in item.extract.lower(): if item.title not in seen_items: pages.append(item) seen_items.add(item.title) ordered_results = order_results(terms, pages) return ordered_results, terms @app.get('/') def index(): return FileResponse(STATIC_FILES_PATH / 'index.html') app.mount('/', StaticFiles(directory=STATIC_FILES_PATH), name="static") return app