From 271cbea7d9433a7e209121f95831e7c53a82a0b6 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Sun, 1 Dec 2024 09:51:49 -0800 Subject: [PATCH] add a 'limit' parameter for testing. --- ngrams/term_frequencies.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/ngrams/term_frequencies.py b/ngrams/term_frequencies.py index c6d0be3..1ca68ca 100755 --- a/ngrams/term_frequencies.py +++ b/ngrams/term_frequencies.py @@ -118,7 +118,8 @@ def weekly_tf(partition, temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/", output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", - reddit_dataset = 'comments'): + reddit_dataset = 'comments', + limit = None): if reddit_dataset == 'comments': tf_func = tf_comments @@ -195,10 +196,18 @@ def weekly_tf(partition, Path(output_terms_path).mkdir(parents=True, exist_ok=True) + if limit is not None: + n_lines_out = 0 + with pq.ParquetWriter(f"{output_terms_path}/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"{output_authors_path}/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: while True: + if limit is not None: + n_lines_left = limit - n_lines_out + if n_lines_left < outchunksize: + outchunksize = n_lines_left + chunk = islice(outrows,outchunksize) chunk = (c for c in chunk if c[1] is not None) pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names) @@ -218,6 +227,12 @@ def weekly_tf(partition, author_writer.write_table(author_table) do_break = False + if limit is not None: + if n_lines_out < limit: + n_lines_out += outchunksize + else: + do_break = True + if do_break: break