1
0

changes from dirty branch.

This commit is contained in:
2023-05-18 10:29:08 -07:00
parent c190791364
commit 811a0d87c4
7 changed files with 30 additions and 22 deletions

View File

@@ -9,7 +9,7 @@ from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
conf = conf.set("spark.sql.shuffle.partitions",2000)
conf = conf.set("spark.sql.shuffle.partitions",2400)
conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200)
sc = spark.sparkContext
@@ -25,12 +25,13 @@ df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
# df = df.repartition(1200,'subreddit')
# df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
# df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
# df2.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
#df = spark.read.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet")
df = df.repartition(2400,'author','subreddit',"Year","Month","Day")
df3 = df.sort(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
df3 = df3.sortWithinPartitions(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
df3.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')

View File

@@ -1,4 +1,6 @@
#!/usr/bin/bash
source ~/.bashrc
echo $(hostname)
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh
spark-submit --verbose --master spark://$(hostname):43015 submissions_2_parquet_part2.py
stop-all.sh

View File

@@ -58,7 +58,7 @@ def parse_submission(post, names = None):
def parse_dump(partition):
N=10000
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
stream = open_fileset([f"/gscratch/comdata/raw_data/submissions/{partition}"])
rows = map(parse_submission,stream)
schema = pa.schema([
pa.field('id', pa.string(),nullable=True),
@@ -102,7 +102,7 @@ def parse_dump(partition):
writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("submissions_task_list.sh",'w') as of:
for fpath in files:

View File

@@ -29,14 +29,14 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
# next we gotta resort it all.
df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df = df.repartition(800,"subreddit","Year","Month")
df2 = df.sort(["subreddit","Year","Month","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
# # we also want to have parquet files sorted by author then reddit.
df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df = df.repartition(800,"author","subreddit","Year","Month")
df3 = df.sort(["author","Year","Month","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')