13
0
cdsc_reddit/tf_reddit_comments.py

80 lines
2.6 KiB
Python
Raw Normal View History

2020-08-04 05:43:57 +00:00
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
import pandas as pd
import os
import datetime
2020-08-04 05:55:10 +00:00
from nltk import wordpunct_tokenize, MWETokenizer
2020-08-04 05:43:57 +00:00
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body'])
schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
pa.field('term', pa.string(), nullable=False),
pa.field('week', pa.date32(), nullable=False),
pa.field('tf', pa.int64(), nullable=False)]
)
dfs = (b.to_pandas() for b in batches)
def add_week(df):
df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
return(df)
dfs = (add_week(df) for df in dfs)
def iterate_rows(dfs):
for df in dfs:
for row in df.itertuples():
yield row
rows = iterate_rows(dfs)
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
2020-08-04 05:55:10 +00:00
tokenizer = MWETokenizer()
2020-08-04 05:43:57 +00:00
def tf_comments(subreddit_weeks):
for key, posts in subreddit_weeks:
subreddit, week = key
tfs = Counter([])
for post in posts:
2020-08-04 05:55:10 +00:00
tfs.update(tokenizer.tokenize(wordpunct_tokenize(post.body.lower())))
2020-08-04 05:43:57 +00:00
for term, tf in tfs.items():
yield [subreddit, term, week, tf]
outrows = tf_comments(subreddit_weeks)
outchunksize = 10000
with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
chunk = islice(outrows,outchunksize)
pddf = pd.DataFrame(chunk, columns=schema.names)
2020-08-04 05:55:10 +00:00
print(pddf)
2020-08-04 05:43:57 +00:00
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
writer.close()
def gen_task_list():
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
with open("tf_task_list",'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,
"weekly_tf":weekly_tf})