1
0

repartition for parallelism.

This commit is contained in:
Nathan TeBlunthuis 2024-12-19 17:53:27 -08:00
parent c6c9ec173b
commit 189330198c

View File

@ -8,7 +8,7 @@ from functools import partial
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None): def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None):
spark = SparkSession.builder.config(map={'spark.executor.memory':'900g','spark.executor.cores':128}).getOrCreate() spark = SparkSession.builder.config(map={'spark.executor.memory':'900g','spark.executor.cores':128}).getOrCreate()
df = spark.read.parquet(inpath) df = spark.read.parquet(inpath)
df = df.repartition(1280*15, cols=['subreddit',term_colname])
df = df.filter(~ f.col(term_colname).isin(exclude)) df = df.filter(~ f.col(term_colname).isin(exclude))
if included_subreddits is not None: if included_subreddits is not None: