13
0

Update submissions to parse using the backfill queue.

This commit is contained in:
Nate E TeBlunthuis 2020-08-11 22:37:36 -07:00
parent c92b50e050
commit 2d425600a8
4 changed files with 69 additions and 60 deletions

View File

@ -1,5 +1,9 @@
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash #!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh echo "#!/usr/bin/bash" > job_script.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh

View File

@ -1,10 +1,8 @@
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash #!/usr/bin/env bash
echo "!#/usr/bin/bash" > job_script.sh ./parse_submissions.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/submissions_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 job_script.sh
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@ -4,7 +4,6 @@
# 1. from gz to arrow parquet (this script) # 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
import json
from datetime import datetime from datetime import datetime
from multiprocessing import Pool from multiprocessing import Pool
from itertools import islice from itertools import islice
@ -12,19 +11,23 @@ from helper import find_dumps, open_fileset
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
import simdjson
import fire
import os
parser = simdjson.Parser()
def parse_submission(post, names = None): def parse_submission(post, names = None):
if names is None: if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try: try:
post = json.loads(post) post = parser.parse(post)
except (json.decoder.JSONDecodeError, UnicodeDecodeError) as e: except (ValueError) as e:
# print(e) # print(e)
# print(post) # print(post)
row = [None for _ in names] row = [None for _ in names]
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,post) row[-1] = "Error parsing json|{0}|{1}".format(e,post)
return tuple(row) return tuple(row)
row = [] row = []
@ -55,55 +58,61 @@ def parse_submission(post, names = None):
row.append(post[name]) row.append(post[name])
return tuple(row) return tuple(row)
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions" def parse_dump(partition):
files = list(find_dumps(dumpdir)) N=10000
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
rows = map(parse_submission,stream)
schema = pa.schema([
pa.field('id', pa.string(),nullable=True),
pa.field('author', pa.string(),nullable=True),
pa.field('subreddit', pa.string(),nullable=True),
pa.field('title', pa.string(),nullable=True),
pa.field('created_utc', pa.timestamp('ms'),nullable=True),
pa.field('permalink', pa.string(),nullable=True),
pa.field('url', pa.string(),nullable=True),
pa.field('domain', pa.string(),nullable=True),
pa.field('score', pa.int64(),nullable=True),
pa.field('ups', pa.int64(),nullable=True),
pa.field('downs', pa.int64(),nullable=True),
pa.field('over_18', pa.bool_(),nullable=True),
pa.field('has_media',pa.bool_(),nullable=True),
pa.field('selftext',pa.string(),nullable=True),
pa.field('retrieved_on', pa.timestamp('ms'),nullable=True),
pa.field('num_comments', pa.int64(),nullable=True),
pa.field('gilded',pa.int64(),nullable=True),
pa.field('edited',pa.bool_(),nullable=True),
pa.field('time_edited',pa.timestamp('ms'),nullable=True),
pa.field('subreddit_type',pa.string(),nullable=True),
pa.field('subreddit_id',pa.string(),nullable=True),
pa.field('subreddit_subscribers',pa.int64(),nullable=True),
pa.field('name',pa.string(),nullable=True),
pa.field('is_self',pa.bool_(),nullable=True),
pa.field('stickied',pa.bool_(),nullable=True),
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
pool = Pool(28) if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
stream = open_fileset(files) with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
N = 100000 writer.close()
rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28)) def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
schema = pa.schema([ with open("parse_submissions_task_list",'w') as of:
pa.field('id', pa.string(),nullable=True), for fpath in files:
pa.field('author', pa.string(),nullable=True), partition = os.path.split(fpath)[1]
pa.field('subreddit', pa.string(),nullable=True), of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
pa.field('title', pa.string(),nullable=True),
pa.field('created_utc', pa.timestamp('ms'),nullable=True),
pa.field('permalink', pa.string(),nullable=True),
pa.field('url', pa.string(),nullable=True),
pa.field('domain', pa.string(),nullable=True),
pa.field('score', pa.int64(),nullable=True),
pa.field('ups', pa.int64(),nullable=True),
pa.field('downs', pa.int64(),nullable=True),
pa.field('over_18', pa.bool_(),nullable=True),
pa.field('has_media',pa.bool_(),nullable=True),
pa.field('selftext',pa.string(),nullable=True),
pa.field('retrieved_on', pa.timestamp('ms'),nullable=True),
pa.field('num_comments', pa.int64(),nullable=True),
pa.field('gilded',pa.int64(),nullable=True),
pa.field('edited',pa.bool_(),nullable=True),
pa.field('time_edited',pa.timestamp('ms'),nullable=True),
pa.field('subreddit_type',pa.string(),nullable=True),
pa.field('subreddit_id',pa.string(),nullable=True),
pa.field('subreddit_subscribers',pa.int64(),nullable=True),
pa.field('name',pa.string(),nullable=True),
pa.field('is_self',pa.bool_(),nullable=True),
pa.field('stickied',pa.bool_(),nullable=True),
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
with pq.ParquetWriter("/gscratch/comdata/output/reddit_submissions.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
writer.close()
if __name__ == "__main__":
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@ -17,7 +17,7 @@ conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200) conf = conf.set('spark.debug.maxToStringFields',200)
sqlContext = pyspark.SQLContext(sc) sqlContext = pyspark.SQLContext(sc)
df = spark.read.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet") df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit') df = df.drop('subreddit')
@ -32,13 +32,11 @@ df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
df = df.repartition("subreddit") df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True) df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True) df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
# # we also want to have parquet files sorted by author then reddit. # # we also want to have parquet files sorted by author then reddit.
df = df.repartition("author") df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True) df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True) df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy') df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')
os.remove("/gscratch/comdata/output/reddit_submissions.parquet_temp")