index.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. """
  2. Create a search index
  3. """
  4. import json
  5. import os
  6. from abc import ABC, abstractmethod
  7. from collections import Counter
  8. from dataclasses import dataclass, fields, asdict, astuple
  9. from itertools import islice
  10. from mmap import mmap, PROT_READ
  11. from typing import List, Iterator, TypeVar, Generic, Iterable, Callable
  12. from urllib.parse import unquote
  13. import justext
  14. import mmh3
  15. import pandas as pd
  16. from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError
  17. # NUM_PAGES = 8192
  18. # PAGE_SIZE = 512
  19. NUM_PAGES = 25600
  20. PAGE_SIZE = 4096
  21. NUM_INITIAL_TOKENS = 50
  22. HTTP_START = 'http://'
  23. HTTPS_START = 'https://'
  24. BATCH_SIZE = 100
  25. def is_content_token(nlp, token):
  26. lexeme = nlp.vocab[token.orth]
  27. return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop
  28. def tokenize(nlp, cleaned_text):
  29. tokens = nlp.tokenizer(cleaned_text)
  30. content_tokens = [token for token in tokens[:NUM_INITIAL_TOKENS]
  31. if is_content_token(nlp, token)]
  32. lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens}
  33. return lowered
  34. def clean(content):
  35. text = justext.justext(content, justext.get_stoplist("English"))
  36. pars = [par.text for par in text if not par.is_boilerplate]
  37. cleaned_text = ' '.join(pars)
  38. return cleaned_text
  39. @dataclass
  40. class Document:
  41. title: str
  42. url: str
  43. extract: str
  44. @dataclass
  45. class TokenizedDocument(Document):
  46. tokens: List[str]
  47. T = TypeVar('T')
  48. class TinyIndexBase(Generic[T]):
  49. def __init__(self, item_factory: Callable[..., T], num_pages: int, page_size: int):
  50. self.item_factory = item_factory
  51. self.num_pages = num_pages
  52. self.page_size = page_size
  53. self.decompressor = ZstdDecompressor()
  54. self.mmap = None
  55. def retrieve(self, key: str) -> List[T]:
  56. index = self._get_key_page_index(key)
  57. page = self.get_page(index)
  58. if page is None:
  59. return []
  60. # print("REtrieve", self.index_path, page)
  61. return self.convert_items(page)
  62. def _get_key_page_index(self, key):
  63. key_hash = mmh3.hash(key, signed=False)
  64. return key_hash % self.num_pages
  65. def get_page(self, i):
  66. """
  67. Get the page at index i, decompress and deserialise it using JSON
  68. """
  69. page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
  70. try:
  71. decompressed_data = self.decompressor.decompress(page_data)
  72. except ZstdError:
  73. return None
  74. return json.loads(decompressed_data.decode('utf8'))
  75. def convert_items(self, items) -> List[T]:
  76. converted = [self.item_factory(*item) for item in items]
  77. # print("Converted", items, converted)
  78. return converted
  79. class TinyIndex(TinyIndexBase[T]):
  80. def __init__(self, item_factory: Callable[..., T], index_path, num_pages, page_size):
  81. super().__init__(item_factory, num_pages, page_size)
  82. # print("REtrieve path", index_path)
  83. self.index_path = index_path
  84. self.index_file = open(self.index_path, 'rb')
  85. self.mmap = mmap(self.index_file.fileno(), 0, prot=PROT_READ)
  86. class TinyIndexer(TinyIndexBase[T]):
  87. def __init__(self, item_factory: Callable[..., T], index_path: str, num_pages: int, page_size: int):
  88. super().__init__(item_factory, num_pages, page_size)
  89. self.index_path = index_path
  90. self.compressor = ZstdCompressor()
  91. self.decompressor = ZstdDecompressor()
  92. self.index_file = None
  93. self.mmap = None
  94. def __enter__(self):
  95. self.create_if_not_exists()
  96. self.index_file = open(self.index_path, 'r+b')
  97. self.mmap = mmap(self.index_file.fileno(), 0)
  98. return self
  99. def __exit__(self, exc_type, exc_val, exc_tb):
  100. self.mmap.close()
  101. self.index_file.close()
  102. # def index(self, documents: List[TokenizedDocument]):
  103. # for document in documents:
  104. # for token in document.tokens:
  105. # self._index_document(document, token)
  106. def index(self, key: str, value: T):
  107. # print("Index", value)
  108. assert type(value) == self.item_factory, f"Can only index the specified type" \
  109. f" ({self.item_factory.__name__})"
  110. page_index = self._get_key_page_index(key)
  111. current_page = self.get_page(page_index)
  112. if current_page is None:
  113. current_page = []
  114. value_tuple = astuple(value)
  115. # print("Value tuple", value_tuple)
  116. current_page.append(value_tuple)
  117. try:
  118. # print("Page", current_page)
  119. self._write_page(current_page, page_index)
  120. except ValueError:
  121. pass
  122. def _write_page(self, data, i):
  123. """
  124. Serialise the data using JSON, compress it and store it at index i.
  125. If the data is too big, it will raise a ValueError and not store anything
  126. """
  127. serialised_data = json.dumps(data)
  128. compressed_data = self.compressor.compress(serialised_data.encode('utf8'))
  129. page_length = len(compressed_data)
  130. if page_length > self.page_size:
  131. raise ValueError(f"Data is too big ({page_length}) for page size ({self.page_size})")
  132. padding = b'\x00' * (self.page_size - page_length)
  133. self.mmap[i * self.page_size:(i+1) * self.page_size] = compressed_data + padding
  134. def create_if_not_exists(self):
  135. if not os.path.isfile(self.index_path):
  136. file_length = self.num_pages * self.page_size
  137. with open(self.index_path, 'wb') as index_file:
  138. index_file.write(b'\x00' * file_length)
  139. def prepare_url_for_tokenizing(url: str):
  140. if url.startswith(HTTP_START):
  141. url = url[len(HTTP_START):]
  142. elif url.startswith(HTTPS_START):
  143. url = url[len(HTTPS_START):]
  144. for c in '/._':
  145. if c in url:
  146. url = url.replace(c, ' ')
  147. return url
  148. def get_pages(nlp, titles_urls_and_extracts) -> Iterable[TokenizedDocument]:
  149. for i, (title_cleaned, url, extract) in enumerate(titles_urls_and_extracts):
  150. title_tokens = tokenize(nlp, title_cleaned)
  151. prepared_url = prepare_url_for_tokenizing(unquote(url))
  152. url_tokens = tokenize(nlp, prepared_url)
  153. extract_tokens = tokenize(nlp, extract)
  154. print("Extract tokens", extract_tokens)
  155. tokens = title_tokens | url_tokens | extract_tokens
  156. yield TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned, extract=extract)
  157. if i % 1000 == 0:
  158. print("Processed", i)
  159. def grouper(n: int, iterator: Iterator):
  160. while True:
  161. chunk = tuple(islice(iterator, n))
  162. if not chunk:
  163. return
  164. yield chunk
  165. def index_titles_urls_and_extracts(indexer: TinyIndexer, nlp, titles_urls_and_extracts, terms_path):
  166. indexer.create_if_not_exists()
  167. terms = Counter()
  168. pages = get_pages(nlp, titles_urls_and_extracts)
  169. for page in pages:
  170. for token in page.tokens:
  171. indexer.index(token, Document(url=page.url, title=page.title, extract=page.extract))
  172. terms.update([t.lower() for t in page.tokens])
  173. term_df = pd.DataFrame({
  174. 'term': terms.keys(),
  175. 'count': terms.values(),
  176. })
  177. term_df.to_csv(terms_path)