From c151fe377766b06c3489817539bd932812d27429 Mon Sep 17 00:00:00 2001 From: Daoud Clarke Date: Sun, 5 Dec 2021 21:42:23 +0000 Subject: [PATCH] Extract archive info --- deploy.sh | 2 +- extract.py | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/deploy.sh b/deploy.sh index 63e8364..b17ddb2 100644 --- a/deploy.sh +++ b/deploy.sh @@ -14,7 +14,7 @@ aws emr create-cluster \ --bootstrap-actions '{"Path": "s3://tinysearch/code/bootstrap.sh"}' \ --steps '[{"Args":["spark-submit","--deploy-mode","cluster","s3n://tinysearch/code/runextract.py"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"command-runner.jar","Properties":"","Name":"Spark application"}]' \ --name 'TinySearch' \ - --instance-groups '[{"InstanceCount":5,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"CORE","InstanceType":"m4.large","Name":"Core Instance Group"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"MASTER","InstanceType":"m4.large","Name":"Master Instance Group"}]' \ + --instance-groups '[{"InstanceCount":2,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"CORE","InstanceType":"m4.large","Name":"Core Instance Group"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"MASTER","InstanceType":"m4.large","Name":"Master Instance Group"}]' \ --configurations '[{"Classification":"spark","Properties":{}}]' \ --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 \ --auto-terminate diff --git a/extract.py b/extract.py index c1f8ba4..11a968e 100644 --- a/extract.py +++ b/extract.py @@ -15,9 +15,10 @@ from langdetect import detect from lxml.etree import ParserError from pyspark.sql import SparkSession, SQLContext from pyspark.sql.functions import col -from pyspark.sql.types import StructType, StructField, StringType, LongType +from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType from warcio import ArchiveIterator +RECORDS_PATH = 's3://tinysearch/outputs/records' OUTPUT_PATH = 's3://tinysearch/outputs/index' MAX_URI_LENGTH = 150 @@ -35,6 +36,22 @@ index_schema = StructType([ StructField("top", StringType(), False), ]) + +output_schema = StructType([ + StructField("uri", StringType(), False), + StructField("title", StringType(), False), + StructField("extract", StringType(), False), +]) + + +record_schema = StructType([ + StructField("url", StringType(), False), + StructField("warc_filename", StringType(), False), + StructField("warc_record_offset", IntegerType(), False), + StructField("warc_record_length", IntegerType(), False), +]) + + spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ @@ -43,7 +60,7 @@ spark = SparkSession \ def run(): - sqlc = SQLContext(sparkContext=spark) + # sqlc = SQLContext(sparkContext=spark) df = spark.read.load('s3://commoncrawl/cc-index/table/cc-main/warc/') df.createOrReplaceTempView('ccindex') @@ -56,11 +73,13 @@ def run(): sqldf = sqldf.filter(col('url_host_name').isin(list(DOMAINS.keys()))) print("Got rows", sqldf.take(10)) print("Num rows", sqldf.count()) - sqldf = sqldf.sample(fraction=0.001) - warc_recs = sqldf.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd - rdd = warc_recs.mapPartitions(fetch_process_warc_records) - output = sqlc.createDataFrame(rdd, schema=output_schema) - output.write.option('compression', 'gzip').format('json').mode('overwrite').save(OUTPUT_PATH) + sqldf = sqldf.sample(fraction=0.01) + sqldf.write.option('compression', 'gzip').format('json').mode('overwrite').save(RECORDS_PATH) + + # warc_recs = sqldf.select("url", "warc_filename", "warc_record_offset", "warc_record_length").rdd + # rdd = warc_recs.mapPartitions(fetch_process_warc_records) + # output = sqlc.createDataFrame(rdd, schema=output_schema) + # output.write.option('compression', 'gzip').format('json').mode('overwrite').save(OUTPUT_PATH) def fetch_process_warc_records(rows): @@ -132,13 +151,6 @@ def justext(html_text, stoplist, length_low=LENGTH_LOW_DEFAULT, return paragraphs, title -output_schema = StructType([ - StructField("uri", StringType(), False), - StructField("title", StringType(), False), - StructField("extract", StringType(), False), -]) - - def process_record(record): # print("Record", record.format, record.rec_type, record.rec_headers, record.raw_stream, # record.http_headers, record.content_type, record.length)