From dd894ebf6137c1882a6c9b6a07476ebeae7ac8fe Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Wed, 27 Nov 2024 11:51:22 -0800 Subject: [PATCH] support posts in ngrams --- ngrams/tf_comments.py | 96 +++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 30 deletions(-) diff --git a/ngrams/tf_comments.py b/ngrams/tf_comments.py index f86548a..ea2b37b 100755 --- a/ngrams/tf_comments.py +++ b/ngrams/tf_comments.py @@ -18,24 +18,68 @@ from random import random # taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)") -# compute term frequencies for comments in each subreddit by week -def weekly_tf(partition, mwe_pass = 'first'): - dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') - if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"): - os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") +def tf_comments(subreddit_weeks): + for key, posts in subreddit_weeks: + subreddit, week = key + tfs = Counter([]) + authors = Counter([]) + for post in posts: + tokens = my_tokenizer(post.body) + tfs.update(tokens) + authors.update([post.author]) - if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"): - os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") + for term, tf in tfs.items(): + yield [True, subreddit, term, week, tf] + + for author, tf in authors.items(): + yield [False, subreddit, author, week, tf] + +def tf_posts(subreddit_weeks): + for key, posts in subreddit_weeks: + subreddit, week = key + tfs = Counter([]) + authors = Counter([]) + for post in posts: + tokens = my_tokenizer(post.title) + tfs.update(tokens) + authors.update([post.author]) + + for term, tf in tfs.items(): + yield [True, subreddit, term, week, tf] + + for author, tf in authors.items(): + yield [False, subreddit, author, week, tf] + +# compute term frequencies for comments in each subreddit by week +def weekly_tf(partition, + mwe_pass = 'first', + input_parquet='/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/', + output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/", + temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/", + output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", + output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", + reddit_dataset = 'comments'): + + if reddit_dataset == 'comments': + tf_func = tf_comments + elif reddit_dataset == 'posts': + tf_func = tf_posts + + dataset = ds.dataset(f"{input_parquet}/{partition}", format='parquet') + if not os.path.exists(output_10p_sample_path): + os.mkdir(output_10p_sample_path) + + if not os.path.exists(temp_output_tfidf_path): + os.mkdir(temp_output_tfidf_path) ngram_output = partition.replace("parquet","txt") if mwe_pass == 'first': - if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"): - os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}") + if os.path.exists(f"{output_10p_sample_path}/{ngram_output}"): + os.remove(f"{output_10p_sample_path}/{ngram_output}") batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) - schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False), pa.field('term', pa.string(), nullable=False), pa.field('week', pa.date32(), nullable=False), @@ -134,27 +178,12 @@ def weekly_tf(partition, mwe_pass = 'first'): for token in sentence: yield token - def tf_comments(subreddit_weeks): - for key, posts in subreddit_weeks: - subreddit, week = key - tfs = Counter([]) - authors = Counter([]) - for post in posts: - tokens = my_tokenizer(post.body) - tfs.update(tokens) - authors.update([post.author]) - for term, tf in tfs.items(): - yield [True, subreddit, term, week, tf] - - for author, tf in authors.items(): - yield [False, subreddit, author, week, tf] - - outrows = tf_comments(subreddit_weeks) + outrows = tf_func(subreddit_weeks) outchunksize = 10000 - with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: + with pq.ParquetWriter(f"{output_terms_path}/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"{output_authors_path}/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: while True: @@ -183,12 +212,19 @@ def weekly_tf(partition, mwe_pass = 'first'): author_writer.close() -def gen_task_list(mwe_pass='first'): - files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") +def gen_task_list(mwe_pass='first', + input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", + output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/", + temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/", + output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", + output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", + dataset='comments'): + files = os.listdir(input_parquet) + with open("tf_task_list",'w') as outfile: for f in files: if f.endswith(".parquet"): - outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n") + outfile.write(f"./tf_comments.py weekly_tf {f} --mwe-pass {mwe_pass} --input-parquet {input_parquet} --output-01p-sample-path {output_10p_sample_path} --temp-output-tfidf-path {temp_output_tfidf_path} --output-terms-path {output_terms_path} --output-authors-path {output_terms_path} --dataset {dataset}\n") if __name__ == "__main__": fire.Fire({"gen_task_list":gen_task_list,