1
0

sort and partition the term frequencies using spark.

This commit is contained in:
Nathan TeBlunthuis 2024-12-01 13:42:13 -08:00
parent 2b023fea8d
commit 3fea1f9388

View File

@ -192,7 +192,7 @@ def weekly_tf(partition,
outrows = tf_func(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords)
outchunksize = 10000
outchunksize = 100000
Path(output_terms_path).mkdir(parents=True, exist_ok=True)
@ -239,6 +239,17 @@ def weekly_tf(partition,
writer.close()
author_writer.close()
def sort_tf_comments(input_parquet="/gscratch/comdata/output/temp_reddit_comments_by_subreddit.parquet/",
output_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/"):
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(input_parquet)
df = df.repartition(2000,'term')
df = df.sort(['term','week','subreddit'])
df = df.sortWithinPartitions(['term','week','subreddit'])
df.write.parquet(output_parquet,mode='overwrite',compression='snappy')
def gen_task_list(mwe_pass='first',
input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/",
@ -262,4 +273,5 @@ def gen_task_list(mwe_pass='first',
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,
"weekly_tf":weekly_tf})
"weekly_tf":weekly_tf,
"sort":sort_tf_comments})