1
0
cdsc_reddit/ngrams/term_frequencies.py
2024-12-04 07:47:47 -08:00

282 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
import os
import re
from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
from nltk.corpus import stopwords
from nltk.util import ngrams
import string
from random import random
from pathlib import Path
# remove urls
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
def remove_punct(sentence):
new_sentence = []
for token in sentence:
new_token = ''
for c in token:
if c not in string.punctuation:
new_token += c
if len(new_token) > 0:
new_sentence.append(new_token)
return new_sentence
def my_tokenizer(text, mwe_pass, mwe_tokenize, stopWords, ngram_output):
# remove stopwords, punctuation, urls, lower case
# lowercase
if text is None:
return ""
text = text.lower()
# remove urls
text = urlregex.sub("", text)
# sentence tokenize
sentences = sent_tokenize(text)
# wordpunct_tokenize
sentences = map(wordpunct_tokenize, sentences)
# remove punctuation
sentences = map(remove_punct, sentences)
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
# datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase.
# they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms
# here we take a 10 percent sample of sentences
if mwe_pass == 'first':
sentences = list(sentences)
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
Path(ngram_output).parent.mkdir(parents=True, exist_ok=True)
with open(ngram_output,'a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
if token not in stopWords:
yield token
else:
# remove stopWords
sentences = map(mwe_tokenize, sentences)
sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences)
for sentence in sentences:
for token in sentence:
yield token
def tf_comments(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, ngram_output):
for key, posts in subreddit_weeks:
subreddit, week = key
tfs = Counter([])
authors = Counter([])
for post in posts:
tokens = my_tokenizer(post.selftext, mwe_pass, mwe_tokenize, stopWords, ngram_output)
tfs.update(tokens)
authors.update([post.author])
for term, tf in tfs.items():
yield [True, subreddit, term, week, tf]
for author, tf in authors.items():
yield [False, subreddit, author, week, tf]
def tf_posts(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, ngram_output):
for key, posts in subreddit_weeks:
subreddit, week = key
tfs = Counter([])
authors = Counter([])
for post in posts:
title_tokens = my_tokenizer(post.title, mwe_pass, mwe_tokenize, stopWords, ngram_output)
tfs.update(title_tokens)
if post.selftext is not None and post.selftext != "":
selftext_tokens = my_tokenizer(post.selftext, mwe_pass, mwe_tokenize, stopWords, ngram_output)
tfs.update(selftext_tokens)
authors.update([post.author])
for term, tf in tfs.items():
yield [True, subreddit, term, week, tf]
for author, tf in authors.items():
yield [False, subreddit, author, week, tf]
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition,
mwe_pass = 'first',
input_parquet='/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/',
output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/",
temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/",
output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
reddit_dataset = 'comments',
limit = None):
if reddit_dataset == 'comments':
tf_func = tf_comments
nullable_schema = False
elif reddit_dataset == 'posts':
tf_func = tf_posts
nullable_schema = True
dataset = ds.dataset(f"{input_parquet}/{partition}", format='parquet')
Path(output_10p_sample_path).mkdir(parents=True, exist_ok=True)
Path(temp_output_tfidf_path).mkdir(parents=True, exist_ok=True)
ngram_output = partition.replace("parquet","txt")
if mwe_pass == 'first':
if os.path.exists(f"{output_10p_sample_path}/{ngram_output}"):
os.remove(f"{output_10p_sample_path}/{ngram_output}")
if reddit_dataset == 'comments':
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
if reddit_dataset == 'posts':
batches = dataset.to_batches(columns=['CreatedAt','subreddit','title','selftext','author'])
schema = pa.schema([pa.field('subreddit', pa.string(), nullable=nullable_schema),
pa.field('term', pa.string(), nullable=nullable_schema),
pa.field('week', pa.date32(), nullable=nullable_schema),
pa.field('tf', pa.int64(), nullable=nullable_schema)]
)
author_schema = pa.schema([pa.field('subreddit', pa.string(), nullable=nullable_schema),
pa.field('author', pa.string(), nullable=nullable_schema),
pa.field('week', pa.date32(), nullable=nullable_schema),
pa.field('tf', pa.int64(), nullable=nullable_schema)]
)
dfs = (b.to_pandas() for b in batches)
def add_week(df):
df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
return(df)
dfs = (add_week(df) for df in dfs)
def iterate_rows(dfs):
for df in dfs:
for row in df.itertuples():
yield row
rows = iterate_rows(dfs)
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
mwe_tokenizer = MWETokenizer(mwe_phrases)
mwe_tokenize = mwe_tokenizer.tokenize
else:
mwe_tokenize = MWETokenizer().tokenize
stopWords = set(stopwords.words('english'))
# we follow the approach described in datta, phelan, adar 2017
outrows = tf_func(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, Path(output_10p_sample_path) / ngram_output)
outchunksize = 100000
Path(output_terms_path).mkdir(parents=True, exist_ok=True)
Path(output_authors_path).mkdir(parents=True, exist_ok=True)
if limit is not None:
n_lines_out = 0
with pq.ParquetWriter(f"{output_terms_path}/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"{output_authors_path}/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
while True:
if limit is not None:
n_lines_left = limit - n_lines_out
if n_lines_left < outchunksize:
outchunksize = n_lines_left
chunk = islice(outrows,outchunksize)
chunk = (c for c in chunk if c[1] is not None)
pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
author_pddf = pddf.loc[pddf.is_token == False, schema.names]
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
do_break = True
if table.shape[0] != 0:
writer.write_table(table)
do_break = False
if author_table.shape[0] != 0:
author_writer.write_table(author_table)
do_break = False
if limit is not None:
if n_lines_out < limit:
n_lines_out += outchunksize
else:
do_break = True
if do_break:
break
writer.close()
author_writer.close()
def sort_tf(input_parquet="/gscratch/comdata/output/temp_reddit_comments_by_subreddit.parquet/",
output_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/",
tf_name='term'):
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(map={'spark.executor.memory':'900g'}).getOrCreate()
spark = SparkSession.builder.config(map={'spark.executor.cores':128}).getOrCreate()
df = spark.read.parquet(input_parquet)
df = df.repartition(2000,tf_name)
df = df.sort([tf_name,'week','subreddit'])
df = df.sortWithinPartitions([tf_name,'week','subreddit'])
df.write.parquet(output_parquet,mode='overwrite',compression='snappy')
def gen_task_list(mwe_pass='first',
input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/",
output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/",
temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/",
output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
dataset='comments'):
files = os.listdir(input_parquet)
curdir = Path('.')
if curdir.absolute().name == 'cdsc_reddit':
curdir = str(curdir.absolute()) / "ngrams"
else:
curdir = str(curdir.absolute() / "cdsc_reddit" / "ngrams")
with open("tf_task_list",'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"{curdir}/term_frequencies.py weekly_tf {f} --mwe-pass {mwe_pass} --input-parquet {input_parquet} --output-10p_sample-path {output_10p_sample_path} --temp-output-tfidf-path {temp_output_tfidf_path} --output-terms-path {output_terms_path} --output-authors-path {output_authors_path} --reddit-dataset {dataset}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,
"weekly_tf":weekly_tf,
"sort_tf":sort_tf})