#!/usr/bin/env python3 import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.parquet as pq from itertools import groupby, islice, chain import fire from collections import Counter import os import re from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize from nltk.corpus import stopwords from nltk.util import ngrams import string from random import random from pathlib import Path # remove urls # taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)") def remove_punct(sentence): new_sentence = [] for token in sentence: new_token = '' for c in token: if c not in string.punctuation: new_token += c if len(new_token) > 0: new_sentence.append(new_token) return new_sentence def my_tokenizer(text, mwe_pass, mwe_tokenize, stopWords, ngram_output): # remove stopwords, punctuation, urls, lower case # lowercase if text is None: return "" text = text.lower() # remove urls text = urlregex.sub("", text) # sentence tokenize sentences = sent_tokenize(text) # wordpunct_tokenize sentences = map(wordpunct_tokenize, sentences) # remove punctuation sentences = map(remove_punct, sentences) # remove sentences with less than 2 words sentences = filter(lambda sentence: len(sentence) > 2, sentences) # datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase. # they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms # here we take a 10 percent sample of sentences if mwe_pass == 'first': sentences = list(sentences) for sentence in sentences: if random() <= 0.1: grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4)))) Path(ngram_output).parent.mkdir(parents=True, exist_ok=True) with open(ngram_output,'a') as gram_file: for ng in grams: gram_file.write(' '.join(ng) + '\n') for token in sentence: if token not in stopWords: yield token else: # remove stopWords sentences = map(mwe_tokenize, sentences) sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences) for sentence in sentences: for token in sentence: yield token def tf_comments(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, ngram_output): for key, comments in subreddit_weeks: subreddit, week = key tfs = Counter([]) authors = Counter([]) for comment in comments: tokens = my_tokenizer(comment.body, mwe_pass, mwe_tokenize, stopWords, ngram_output) tfs.update(tokens) authors.update([comment.author]) for term, tf in tfs.items(): yield [True, subreddit, term, week, tf] for author, tf in authors.items(): yield [False, subreddit, author, week, tf] def tf_posts(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, ngram_output): for key, posts in subreddit_weeks: subreddit, week = key tfs = Counter([]) authors = Counter([]) for post in posts: title_tokens = my_tokenizer(post.title, mwe_pass, mwe_tokenize, stopWords, ngram_output) tfs.update(title_tokens) if post.selftext is not None and post.selftext != "": selftext_tokens = my_tokenizer(post.selftext, mwe_pass, mwe_tokenize, stopWords, ngram_output) tfs.update(selftext_tokens) authors.update([post.author]) for term, tf in tfs.items(): yield [True, subreddit, term, week, tf] for author, tf in authors.items(): yield [False, subreddit, author, week, tf] # compute term frequencies for comments in each subreddit by week def weekly_tf(partition, mwe_pass = 'first', input_parquet='/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/', output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/", temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/", output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", reddit_dataset = 'comments', limit = None): if reddit_dataset == 'comments': tf_func = tf_comments nullable_schema = False elif reddit_dataset == 'posts': tf_func = tf_posts nullable_schema = True dataset = ds.dataset(f"{input_parquet}/{partition}", format='parquet') Path(output_10p_sample_path).mkdir(parents=True, exist_ok=True) Path(temp_output_tfidf_path).mkdir(parents=True, exist_ok=True) ngram_output = partition.replace("parquet","txt") if mwe_pass == 'first': if os.path.exists(f"{output_10p_sample_path}/{ngram_output}"): os.remove(f"{output_10p_sample_path}/{ngram_output}") if reddit_dataset == 'comments': batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) if reddit_dataset == 'posts': batches = dataset.to_batches(columns=['CreatedAt','subreddit','title','selftext','author']) schema = pa.schema([pa.field('subreddit', pa.string(), nullable=nullable_schema), pa.field('term', pa.string(), nullable=nullable_schema), pa.field('week', pa.date32(), nullable=nullable_schema), pa.field('tf', pa.int64(), nullable=nullable_schema)] ) author_schema = pa.schema([pa.field('subreddit', pa.string(), nullable=nullable_schema), pa.field('author', pa.string(), nullable=nullable_schema), pa.field('week', pa.date32(), nullable=nullable_schema), pa.field('tf', pa.int64(), nullable=nullable_schema)] ) dfs = (b.to_pandas() for b in batches) def add_week(df): df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date return(df) dfs = (add_week(df) for df in dfs) def iterate_rows(dfs): for df in dfs: for row in df.itertuples(): yield row rows = iterate_rows(dfs) subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) if mwe_pass != 'first': mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/reddit_multiword_expressions.feather') mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False) mwe_phrases = list(mwe_dataset.phrase) mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases] mwe_tokenizer = MWETokenizer(mwe_phrases) mwe_tokenize = mwe_tokenizer.tokenize else: mwe_tokenize = MWETokenizer().tokenize stopWords = set(stopwords.words('english')) # we follow the approach described in datta, phelan, adar 2017 outrows = tf_func(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords, Path(output_10p_sample_path) / ngram_output) outchunksize = 10000 Path(output_terms_path).mkdir(parents=True, exist_ok=True) Path(output_authors_path).mkdir(parents=True, exist_ok=True) if limit is not None: n_lines_out = 0 with pq.ParquetWriter(f"{output_terms_path}/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"{output_authors_path}/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: while True: if limit is not None: n_lines_left = limit - n_lines_out if n_lines_left < outchunksize: outchunksize = n_lines_left chunk = islice(outrows,outchunksize) chunk = (c for c in chunk if c[1] is not None) pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names) author_pddf = pddf.loc[pddf.is_token == False, schema.names] pddf = pddf.loc[pddf.is_token == True, schema.names] author_pddf = author_pddf.rename({'term':'author'}, axis='columns') author_pddf = author_pddf.loc[:,author_schema.names] table = pa.Table.from_pandas(pddf,schema=schema) author_table = pa.Table.from_pandas(author_pddf,schema=author_schema) do_break = True if table.shape[0] != 0: writer.write_table(table) do_break = False if author_table.shape[0] != 0: author_writer.write_table(author_table) do_break = False if limit is not None: if n_lines_out < limit: n_lines_out += outchunksize else: do_break = True if do_break: break writer.close() author_writer.close() def sort_tf(input_parquet="/gscratch/comdata/output/temp_reddit_comments_by_subreddit.parquet/", output_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", tf_name='term'): from pyspark.sql import functions as f from pyspark.sql import SparkSession spark = SparkSession.builder.config(map={'spark.executor.memory':'900g','spark.executor.cores':128}).getOrCreate() df = spark.read.parquet(input_parquet) df = df.repartition(2000,tf_name) df = df.sort([tf_name,'week','subreddit']) df = df.sortWithinPartitions([tf_name,'week','subreddit']) df.write.parquet(output_parquet,mode='overwrite',compression='snappy') def gen_task_list(mwe_pass='first', input_parquet="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", output_10p_sample_path="/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/", temp_output_tfidf_path="/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/", output_terms_path="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", output_authors_path="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", reddit_dataset='comments'): files = os.listdir(input_parquet) curdir = Path('.') if curdir.absolute().name == 'cdsc_reddit': curdir = str(curdir.absolute()) / "ngrams" else: curdir = str(curdir.absolute() / "cdsc_reddit" / "ngrams") with open("tf_task_list",'w') as outfile: for f in files: if f.endswith(".parquet"): outfile.write(f"{curdir}/term_frequencies.py weekly_tf {f} --mwe-pass {mwe_pass} --input-parquet {input_parquet} --output-10p_sample-path {output_10p_sample_path} --temp-output-tfidf-path {temp_output_tfidf_path} --output-terms-path {output_terms_path} --output-authors-path {output_authors_path} --reddit-dataset {reddit_dataset}\n") if __name__ == "__main__": fire.Fire({"gen_task_list":gen_task_list, "weekly_tf":weekly_tf, "sort_tf":sort_tf})