extract_process.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. from io import BytesIO
  2. import boto3
  3. from justext import get_stoplist
  4. from justext.core import LENGTH_LOW_DEFAULT, LENGTH_HIGH_DEFAULT, STOPWORDS_LOW_DEFAULT, STOPWORDS_HIGH_DEFAULT, \
  5. MAX_LINK_DENSITY_DEFAULT, MAX_HEADING_DISTANCE_DEFAULT, NO_HEADINGS_DEFAULT, DEFAULT_ENCODING, DEFAULT_ENC_ERRORS, \
  6. preprocessor, html_to_dom, ParagraphMaker, classify_paragraphs, revise_paragraph_classification
  7. from langdetect import detect
  8. from lxml.etree import ParserError
  9. from warcio import ArchiveIterator
  10. MAX_URI_LENGTH = 150
  11. NUM_CHARS_TO_ANALYSE = 1000
  12. NUM_TITLE_CHARS = 65
  13. NUM_EXTRACT_CHARS = 155
  14. def fetch_process_warc_records(rows):
  15. """Fetch all WARC records defined by filenames and offsets in rows,
  16. parse the records and the contained HTML, split the text into words
  17. and emit pairs <word, 1>"""
  18. s3client = boto3.client('s3')
  19. for row in rows:
  20. warc_path = row['warc_filename']
  21. offset = int(row['warc_record_offset'])
  22. length = int(row['warc_record_length'])
  23. rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
  24. response = s3client.get_object(Bucket='commoncrawl',
  25. Key=warc_path,
  26. Range=rangereq)
  27. record_stream = BytesIO(response["Body"].read())
  28. for record in ArchiveIterator(record_stream):
  29. for result in process_record(record):
  30. yield result
  31. def is_html(record):
  32. """Return true if (detected) MIME type of a record is HTML"""
  33. html_types = ['text/html', 'application/xhtml+xml']
  34. if (('WARC-Identified-Payload-Type' in record.rec_headers) and
  35. (record.rec_headers['WARC-Identified-Payload-Type'] in
  36. html_types)):
  37. return True
  38. content_type = record.http_headers.get_header('content-type', None)
  39. if content_type:
  40. for html_type in html_types:
  41. if html_type in content_type:
  42. return True
  43. return False
  44. def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT,
  45. length_high=LENGTH_HIGH_DEFAULT, stopwords_low=STOPWORDS_LOW_DEFAULT,
  46. stopwords_high=STOPWORDS_HIGH_DEFAULT, max_link_density=MAX_LINK_DENSITY_DEFAULT,
  47. max_heading_distance=MAX_HEADING_DISTANCE_DEFAULT, no_headings=NO_HEADINGS_DEFAULT,
  48. encoding=None, default_encoding=DEFAULT_ENCODING,
  49. enc_errors=DEFAULT_ENC_ERRORS, preprocessor=preprocessor):
  50. """
  51. Converts an HTML page into a list of classified paragraphs. Each paragraph
  52. is represented as instance of class ˙˙justext.paragraph.Paragraph˙˙.
  53. """
  54. dom = html_to_dom(html_text, default_encoding, encoding, enc_errors)
  55. print("Parsed HTML")
  56. try:
  57. title = dom.find(".//title").text
  58. except AttributeError:
  59. title = None
  60. preprocessed_dom = preprocessor(dom)
  61. paragraphs = ParagraphMaker.make_paragraphs(preprocessed_dom)
  62. print("Got paragraphs")
  63. classify_paragraphs(paragraphs, stoplist, length_low, length_high,
  64. stopwords_low, stopwords_high, max_link_density, no_headings)
  65. revise_paragraph_classification(paragraphs, max_heading_distance)
  66. return paragraphs, title
  67. def process_record(record):
  68. # print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream,
  69. # record.http_headers, record.content_type, record.length)
  70. if record.rec_type != 'response':
  71. # skip over WARC request or metadata records
  72. return
  73. if not is_html(record):
  74. return
  75. uri = record.rec_headers.get_header('WARC-Target-URI')
  76. if len(uri) > MAX_URI_LENGTH:
  77. print("URI too long", len(uri))
  78. return
  79. # rating = get_domain_rating(uri)
  80. # print("Rating", rating)
  81. # if rating is None:
  82. # return
  83. content = record.content_stream().read().strip()
  84. # print("Content", uri, content[:100])
  85. if not content:
  86. return
  87. try:
  88. all_paragraphs, full_title = justext(content, get_stoplist('English'))
  89. except UnicodeDecodeError:
  90. print("Unable to decode unicode")
  91. return
  92. except ParserError:
  93. print("Unable to parse")
  94. return
  95. if full_title is None:
  96. print("Missing title")
  97. return
  98. title = full_title[:NUM_TITLE_CHARS] + '…' \
  99. if len(full_title) > NUM_TITLE_CHARS else full_title
  100. text = '\n'.join([p.text for p in all_paragraphs
  101. if not p.is_boilerplate])[:NUM_CHARS_TO_ANALYSE]
  102. print("Paragraphs", text)
  103. if len(text) < NUM_EXTRACT_CHARS:
  104. return
  105. language = detect(text)
  106. print("Got language", language)
  107. if language != 'en':
  108. return
  109. extract = text[:NUM_EXTRACT_CHARS]
  110. yield uri, title, extract