diff --git a/ngrams/term_frequencies.py b/ngrams/term_frequencies.py index 9cdf9a0..22b5a68 100755 --- a/ngrams/term_frequencies.py +++ b/ngrams/term_frequencies.py @@ -192,7 +192,7 @@ def weekly_tf(partition, outrows = tf_func(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords) - outchunksize = 10000 + outchunksize = 100000 Path(output_terms_path).mkdir(parents=True, exist_ok=True) @@ -239,6 +239,17 @@ def weekly_tf(partition, writer.close() author_writer.close() +def sort_tf_comments(input_parquet="/gscratch/comdata/output/temp_reddit_comments_by_subreddit.parquet/", + output_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/"): + + from pyspark.sql import functions as f + from pyspark.sql import SparkSession + spark = SparkSession.builder.getOrCreate() + df = spark.read.parquet(input_parquet) + df = df.repartition(2000,'term') + df = df.sort(['term','week','subreddit']) + df = df.sortWithinPartitions(['term','week','subreddit']) + df.write.parquet(output_parquet,mode='overwrite',compression='snappy') def gen_task_list(mwe_pass='first', input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", @@ -262,4 +273,5 @@ def gen_task_list(mwe_pass='first', if __name__ == "__main__": fire.Fire({"gen_task_list":gen_task_list, - "weekly_tf":weekly_tf}) + "weekly_tf":weekly_tf, + "sort":sort_tf_comments})