support posts in ngrams
This commit is contained in:
parent
53f5b8c03c
commit
dd894ebf61
@ -18,24 +18,68 @@ from random import random
|
|||||||
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
|
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
|
||||||
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
|
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
|
||||||
|
|
||||||
# compute term frequencies for comments in each subreddit by week
|
def tf_comments(subreddit_weeks):
|
||||||
def weekly_tf(partition, mwe_pass = 'first'):
|
for key, posts in subreddit_weeks:
|
||||||
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
|
subreddit, week = key
|
||||||
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
|
tfs = Counter([])
|
||||||
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
|
authors = Counter([])
|
||||||
|
for post in posts:
|
||||||
|
tokens = my_tokenizer(post.body)
|
||||||
|
tfs.update(tokens)
|
||||||
|
authors.update([post.author])
|
||||||
|
|
||||||
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"):
|
for term, tf in tfs.items():
|
||||||
os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
|
yield [True, subreddit, term, week, tf]
|
||||||
|
|
||||||
|
for author, tf in authors.items():
|
||||||
|
yield [False, subreddit, author, week, tf]
|
||||||
|
|
||||||
|
def tf_posts(subreddit_weeks):
|
||||||
|
for key, posts in subreddit_weeks:
|
||||||
|
subreddit, week = key
|
||||||
|
tfs = Counter([])
|
||||||
|
authors = Counter([])
|
||||||
|
for post in posts:
|
||||||
|
tokens = my_tokenizer(post.title)
|
||||||
|
tfs.update(tokens)
|
||||||
|
authors.update([post.author])
|
||||||
|
|
||||||
|
for term, tf in tfs.items():
|
||||||
|
yield [True, subreddit, term, week, tf]
|
||||||
|
|
||||||
|
for author, tf in authors.items():
|
||||||
|
yield [False, subreddit, author, week, tf]
|
||||||
|
|
||||||
|
# compute term frequencies for comments in each subreddit by week
|
||||||
|
def weekly_tf(partition,
|
||||||
|
mwe_pass = 'first',
|
||||||
|
input_parquet='/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/',
|
||||||
|
output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/",
|
||||||
|
temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/",
|
||||||
|
output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
|
||||||
|
output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
|
||||||
|
reddit_dataset = 'comments'):
|
||||||
|
|
||||||
|
if reddit_dataset == 'comments':
|
||||||
|
tf_func = tf_comments
|
||||||
|
elif reddit_dataset == 'posts':
|
||||||
|
tf_func = tf_posts
|
||||||
|
|
||||||
|
dataset = ds.dataset(f"{input_parquet}/{partition}", format='parquet')
|
||||||
|
if not os.path.exists(output_10p_sample_path):
|
||||||
|
os.mkdir(output_10p_sample_path)
|
||||||
|
|
||||||
|
if not os.path.exists(temp_output_tfidf_path):
|
||||||
|
os.mkdir(temp_output_tfidf_path)
|
||||||
|
|
||||||
ngram_output = partition.replace("parquet","txt")
|
ngram_output = partition.replace("parquet","txt")
|
||||||
|
|
||||||
if mwe_pass == 'first':
|
if mwe_pass == 'first':
|
||||||
if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
|
if os.path.exists(f"{output_10p_sample_path}/{ngram_output}"):
|
||||||
os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
|
os.remove(f"{output_10p_sample_path}/{ngram_output}")
|
||||||
|
|
||||||
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
|
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
|
||||||
|
|
||||||
|
|
||||||
schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
|
schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
|
||||||
pa.field('term', pa.string(), nullable=False),
|
pa.field('term', pa.string(), nullable=False),
|
||||||
pa.field('week', pa.date32(), nullable=False),
|
pa.field('week', pa.date32(), nullable=False),
|
||||||
@ -134,27 +178,12 @@ def weekly_tf(partition, mwe_pass = 'first'):
|
|||||||
for token in sentence:
|
for token in sentence:
|
||||||
yield token
|
yield token
|
||||||
|
|
||||||
def tf_comments(subreddit_weeks):
|
|
||||||
for key, posts in subreddit_weeks:
|
|
||||||
subreddit, week = key
|
|
||||||
tfs = Counter([])
|
|
||||||
authors = Counter([])
|
|
||||||
for post in posts:
|
|
||||||
tokens = my_tokenizer(post.body)
|
|
||||||
tfs.update(tokens)
|
|
||||||
authors.update([post.author])
|
|
||||||
|
|
||||||
for term, tf in tfs.items():
|
outrows = tf_func(subreddit_weeks)
|
||||||
yield [True, subreddit, term, week, tf]
|
|
||||||
|
|
||||||
for author, tf in authors.items():
|
|
||||||
yield [False, subreddit, author, week, tf]
|
|
||||||
|
|
||||||
outrows = tf_comments(subreddit_weeks)
|
|
||||||
|
|
||||||
outchunksize = 10000
|
outchunksize = 10000
|
||||||
|
|
||||||
with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
|
with pq.ParquetWriter(f"{output_terms_path}/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"{output_authors_path}/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
@ -183,12 +212,19 @@ def weekly_tf(partition, mwe_pass = 'first'):
|
|||||||
author_writer.close()
|
author_writer.close()
|
||||||
|
|
||||||
|
|
||||||
def gen_task_list(mwe_pass='first'):
|
def gen_task_list(mwe_pass='first',
|
||||||
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
|
input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/",
|
||||||
|
output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/",
|
||||||
|
temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/",
|
||||||
|
output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
|
||||||
|
output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
|
||||||
|
dataset='comments'):
|
||||||
|
files = os.listdir(input_parquet)
|
||||||
|
|
||||||
with open("tf_task_list",'w') as outfile:
|
with open("tf_task_list",'w') as outfile:
|
||||||
for f in files:
|
for f in files:
|
||||||
if f.endswith(".parquet"):
|
if f.endswith(".parquet"):
|
||||||
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
|
outfile.write(f"./tf_comments.py weekly_tf {f} --mwe-pass {mwe_pass} --input-parquet {input_parquet} --output-01p-sample-path {output_10p_sample_path} --temp-output-tfidf-path {temp_output_tfidf_path} --output-terms-path {output_terms_path} --output-authors-path {output_terms_path} --dataset {dataset}\n")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
fire.Fire({"gen_task_list":gen_task_list,
|
fire.Fire({"gen_task_list":gen_task_list,
|
||||||
|
Loading…
Reference in New Issue
Block a user