index.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """
  2. Create a search index
  3. """
  4. import json
  5. import os
  6. from abc import ABC, abstractmethod
  7. from collections import Counter
  8. from dataclasses import dataclass, fields, asdict, astuple
  9. from itertools import islice
  10. from mmap import mmap, PROT_READ
  11. from typing import List, Iterator, TypeVar, Generic, Iterable
  12. from urllib.parse import unquote
  13. import justext
  14. import mmh3
  15. import pandas as pd
  16. from zstandard import ZstdCompressor, ZstdDecompressor, ZstdError
  17. NUM_PAGES = 8192
  18. PAGE_SIZE = 512
  19. NUM_INITIAL_TOKENS = 50
  20. HTTP_START = 'http://'
  21. HTTPS_START = 'https://'
  22. BATCH_SIZE = 100
  23. def is_content_token(nlp, token):
  24. lexeme = nlp.vocab[token.orth]
  25. return (lexeme.is_alpha or lexeme.is_digit) and not token.is_stop
  26. def tokenize(nlp, cleaned_text):
  27. tokens = nlp.tokenizer(cleaned_text)
  28. content_tokens = [token for token in tokens[:NUM_INITIAL_TOKENS]
  29. if is_content_token(nlp, token)]
  30. lowered = {nlp.vocab[token.orth].text.lower() for token in content_tokens}
  31. return lowered
  32. def clean(content):
  33. text = justext.justext(content, justext.get_stoplist("English"))
  34. pars = [par.text for par in text if not par.is_boilerplate]
  35. cleaned_text = ' '.join(pars)
  36. return cleaned_text
  37. @dataclass
  38. class Document:
  39. title: str
  40. url: str
  41. @dataclass
  42. class TokenizedDocument(Document):
  43. tokens: List[str]
  44. class TinyIndexBase:
  45. def __init__(self, item_type: type, num_pages: int, page_size: int):
  46. self.item_type = item_type
  47. self.num_pages = num_pages
  48. self.page_size = page_size
  49. self.decompressor = ZstdDecompressor()
  50. self.mmap = None
  51. def retrieve(self, key: str):
  52. index = self._get_key_page_index(key)
  53. page = self.get_page(index)
  54. if page is None:
  55. return []
  56. print("REtrieve", self.index_path, page)
  57. return self.convert_items(page)
  58. def _get_key_page_index(self, key):
  59. key_hash = mmh3.hash(key, signed=False)
  60. return key_hash % self.num_pages
  61. def get_page(self, i):
  62. """
  63. Get the page at index i, decompress and deserialise it using JSON
  64. """
  65. page_data = self.mmap[i * self.page_size:(i + 1) * self.page_size]
  66. try:
  67. decompressed_data = self.decompressor.decompress(page_data)
  68. except ZstdError:
  69. return None
  70. return json.loads(decompressed_data.decode('utf8'))
  71. def convert_items(self, items):
  72. converted = [self.item_type(*item) for item in items]
  73. # print("Converted", items, converted)
  74. return converted
  75. class TinyIndex(TinyIndexBase):
  76. def __init__(self, index_path, num_pages, page_size):
  77. super().__init__(Document, num_pages, page_size)
  78. # print("REtrieve path", index_path)
  79. self.index_path = index_path
  80. self.index_file = open(self.index_path, 'rb')
  81. self.mmap = mmap(self.index_file.fileno(), 0, prot=PROT_READ)
  82. class TinyIndexer(TinyIndexBase):
  83. def __init__(self, item_type: type, index_path: str, num_pages: int, page_size: int):
  84. super().__init__(item_type, num_pages, page_size)
  85. self.index_path = index_path
  86. self.compressor = ZstdCompressor()
  87. self.decompressor = ZstdDecompressor()
  88. self.index_file = None
  89. self.mmap = None
  90. def __enter__(self):
  91. self.create_if_not_exists()
  92. self.index_file = open(self.index_path, 'r+b')
  93. self.mmap = mmap(self.index_file.fileno(), 0)
  94. return self
  95. def __exit__(self, exc_type, exc_val, exc_tb):
  96. self.mmap.close()
  97. self.index_file.close()
  98. # def index(self, documents: List[TokenizedDocument]):
  99. # for document in documents:
  100. # for token in document.tokens:
  101. # self._index_document(document, token)
  102. def index(self, key: str, value):
  103. print("Index", value)
  104. assert type(value) == self.item_type, f"Can only index the specified type" \
  105. f" ({self.item_type.__name__})"
  106. page_index = self._get_key_page_index(key)
  107. current_page = self.get_page(page_index)
  108. if current_page is None:
  109. current_page = []
  110. value_tuple = astuple(value)
  111. print("Value tuple", value_tuple)
  112. current_page.append(value_tuple)
  113. try:
  114. # print("Page", current_page)
  115. self._write_page(current_page, page_index)
  116. except ValueError:
  117. pass
  118. def _write_page(self, data, i):
  119. """
  120. Serialise the data using JSON, compress it and store it at index i.
  121. If the data is too big, it will raise a ValueError and not store anything
  122. """
  123. serialised_data = json.dumps(data)
  124. compressed_data = self.compressor.compress(serialised_data.encode('utf8'))
  125. page_length = len(compressed_data)
  126. if page_length > self.page_size:
  127. raise ValueError(f"Data is too big ({page_length}) for page size ({self.page_size})")
  128. padding = b'\x00' * (self.page_size - page_length)
  129. self.mmap[i * self.page_size:(i+1) * self.page_size] = compressed_data + padding
  130. def create_if_not_exists(self):
  131. if not os.path.isfile(self.index_path):
  132. file_length = self.num_pages * self.page_size
  133. with open(self.index_path, 'wb') as index_file:
  134. index_file.write(b'\x00' * file_length)
  135. def prepare_url_for_tokenizing(url: str):
  136. if url.startswith(HTTP_START):
  137. url = url[len(HTTP_START):]
  138. elif url.startswith(HTTPS_START):
  139. url = url[len(HTTPS_START):]
  140. for c in '/._':
  141. if c in url:
  142. url = url.replace(c, ' ')
  143. return url
  144. def get_pages(nlp, titles_and_urls) -> Iterable[TokenizedDocument]:
  145. for i, (title_cleaned, url) in enumerate(titles_and_urls):
  146. title_tokens = tokenize(nlp, title_cleaned)
  147. prepared_url = prepare_url_for_tokenizing(unquote(url))
  148. url_tokens = tokenize(nlp, prepared_url)
  149. tokens = title_tokens | url_tokens
  150. yield TokenizedDocument(tokens=list(tokens), url=url, title=title_cleaned)
  151. if i % 1000 == 0:
  152. print("Processed", i)
  153. def grouper(n: int, iterator: Iterator):
  154. while True:
  155. chunk = tuple(islice(iterator, n))
  156. if not chunk:
  157. return
  158. yield chunk
  159. def index_titles_and_urls(indexer: TinyIndexer, nlp, titles_and_urls, terms_path):
  160. indexer.create_if_not_exists()
  161. terms = Counter()
  162. pages = get_pages(nlp, titles_and_urls)
  163. for page in pages:
  164. for token in page.tokens:
  165. indexer.index(token, Document(url=page.url, title=page.title))
  166. terms.update([t.lower() for t in page.tokens])
  167. term_df = pd.DataFrame({
  168. 'term': terms.keys(),
  169. 'count': terms.values(),
  170. })
  171. term_df.to_csv(terms_path)