index.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """
  2. Create a search index
  3. """
  4. import gzip
  5. import json
  6. import os
  7. import sqlite3
  8. from dataclasses import dataclass
  9. from glob import glob
  10. from itertools import chain, count, islice
  11. from mmap import mmap, PROT_READ
  12. from typing import List, Iterator
  13. from urllib.parse import unquote
  14. import bs4
  15. import justext
  16. import mmh3
  17. from spacy.lang.en import English
  18. from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError
  19. from paths import CRAWL_GLOB, INDEX_PATH
  20. NUM_PAGES = 8192
  21. PAGE_SIZE = 512
  22. NUM_INITIAL_TOKENS = 50
  23. HTTP_START = 'http://'
  24. HTTPS_START = 'https://'
  25. BATCH_SIZE = 100
  26. def is_content_token(nlp, token):
  27. lexeme = nlp.vocab[token.orth]
  28. return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop
  29. def tokenize(nlp, cleaned_text):
  30. tokens = nlp.tokenizer(cleaned_text)
  31. content_tokens = [token for token in tokens[:NUM_INITIAL_TOKENS]
  32. if is_content_token(nlp, token)]
  33. lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens}
  34. return lowered
  35. def clean(content):
  36. text = justext.justext(content, justext.get_stoplist("English"))
  37. pars = [par.text for par in text if not par.is_boilerplate]
  38. cleaned_text = ' '.join(pars)
  39. return cleaned_text
  40. @dataclass
  41. class Document:
  42. url: str
  43. title: str
  44. @dataclass
  45. class TokenizedDocument(Document):
  46. tokens: List[str]
  47. class TinyIndexBase:
  48. def __init__(self, num_pages, page_size):
  49. self.num_pages = num_pages
  50. self.page_size = page_size
  51. self.decompressor = ZstdDecompressor()
  52. self.mmap = None
  53. def retrieve(self, token):
  54. index = self._get_token_page_index(token)
  55. return self.get_page(index)
  56. def _get_token_page_index(self, token):
  57. token_hash = mmh3.hash(token, signed=False)
  58. return token_hash % self.num_pages
  59. def get_page(self, i):
  60. """
  61. Get the page at index i, decompress and deserialise it using JSON
  62. """
  63. page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
  64. try:
  65. decompressed_data = self.decompressor.decompress(page_data)
  66. except ZstdError:
  67. return None
  68. return json.loads(decompressed_data.decode('utf8'))
  69. class TinyIndex(TinyIndexBase):
  70. def __init__(self, index_path, num_pages, page_size):
  71. super().__init__(num_pages, page_size)
  72. self.index_path = index_path
  73. self.index_file = open(self.index_path, 'rb')
  74. self.mmap = mmap(self.index_file.fileno(), 0, prot=PROT_READ)
  75. class TinyIndexer(TinyIndexBase):
  76. def __init__(self, index_path, num_pages, page_size):
  77. super().__init__(num_pages, page_size)
  78. self.index_path = index_path
  79. self.compressor = ZstdCompressor()
  80. self.decompressor = ZstdDecompressor()
  81. self.index_file = None
  82. self.mmap = None
  83. def __enter__(self):
  84. self.create_if_not_exists()
  85. self.index_file = open(self.index_path, 'r+b')
  86. self.mmap = mmap(self.index_file.fileno(), 0)
  87. return self
  88. def __exit__(self, exc_type, exc_val, exc_tb):
  89. self.mmap.close()
  90. self.index_file.close()
  91. def index(self, documents: List[TokenizedDocument]):
  92. for document in documents:
  93. for token in document.tokens:
  94. self._index_document(document, token)
  95. def _index_document(self, document: Document, token: str):
  96. page_index = self._get_token_page_index(token)
  97. current_page = self.get_page(page_index)
  98. if current_page is None:
  99. current_page = []
  100. current_page.append([document.title, document.url])
  101. try:
  102. self._write_page(current_page, page_index)
  103. except ValueError:
  104. pass
  105. def _write_page(self, data, i):
  106. """
  107. Serialise the data using JSON, compress it and store it at index i.
  108. If the data is too big, it will raise a ValueError and not store anything
  109. """
  110. serialised_data = json.dumps(data)
  111. compressed_data = self.compressor.compress(serialised_data.encode('utf8'))
  112. page_length = len(compressed_data)
  113. if page_length > self.page_size:
  114. raise ValueError(f"Data is too big ({page_length}) for page size ({self.page_size})")
  115. padding = b'\x00' * (self.page_size - page_length)
  116. self.mmap[i * self.page_size:(i+1) * self.page_size] = compressed_data + padding
  117. def create_if_not_exists(self):
  118. if not os.path.isfile(self.index_path):
  119. file_length = self.num_pages * self.page_size
  120. with open(self.index_path, 'wb') as index_file:
  121. index_file.write(b'\x00' * file_length)
  122. def document_indexed(self, url):
  123. raise NotImplementedError()
  124. def get_num_tokens(self):
  125. raise NotImplementedError()
  126. def get_random_terms(self, n):
  127. raise NotImplementedError()
  128. def run():
  129. indexer = TinyIndexer(INDEX_PATH, NUM_PAGES, PAGE_SIZE)
  130. indexer.create_if_not_exists()
  131. nlp = English()
  132. for path in glob(CRAWL_GLOB):
  133. print("Path", path)
  134. with gzip.open(path, 'rt') as html_file:
  135. url = html_file.readline().strip()
  136. content = html_file.read()
  137. if indexer.document_indexed(url):
  138. print("Page exists, skipping", url)
  139. continue
  140. cleaned_text = clean(content)
  141. try:
  142. title = bs4.BeautifulSoup(content, features="lxml").find('title').string
  143. except AttributeError:
  144. title = cleaned_text[:80]
  145. tokens = tokenize(nlp, cleaned_text)
  146. print("URL", url)
  147. print("Tokens", tokens)
  148. print("Title", title)
  149. indexer.index(tokens, url, title)
  150. def prepare_url_for_tokenizing(url: str):
  151. if url.startswith(HTTP_START):
  152. url = url[len(HTTP_START):]
  153. elif url.startswith(HTTPS_START):
  154. url = url[len(HTTPS_START):]
  155. for c in '/._':
  156. if c in url:
  157. url = url.replace(c, ' ')
  158. return url
  159. def get_pages(nlp, titles_and_urls):
  160. for i, (title_cleaned, url) in enumerate(titles_and_urls):
  161. title_tokens = tokenize(nlp, title_cleaned)
  162. prepared_url = prepare_url_for_tokenizing(unquote(url))
  163. url_tokens = tokenize(nlp, prepared_url)
  164. tokens = title_tokens | url_tokens
  165. yield TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned)
  166. if i % 1000 == 0:
  167. print("Processed", i)
  168. def grouper(n: int, iterator: Iterator):
  169. while True:
  170. chunk = tuple(islice(iterator, n))
  171. if not chunk:
  172. return
  173. yield chunk
  174. def index_titles_and_urls(indexer: TinyIndexer, nlp, titles_and_urls):
  175. indexer.create_if_not_exists()
  176. pages = get_pages(nlp, titles_and_urls)
  177. for chunk in grouper(BATCH_SIZE, pages):
  178. indexer.index(list(chunk))
  179. if __name__ == '__main__':
  180. run()