13
0

Merge branch 'master' of code:cdsc_reddit into master

This commit is contained in:
Nathan TeBlunthuis 2020-11-01 21:50:44 -08:00
commit 9075a8153c
16 changed files with 474 additions and 68 deletions

4
check_comments_shas.py Normal file → Executable file
View File

@ -5,8 +5,10 @@ import requests
from os import path from os import path
import hashlib import hashlib
shasums = requests.get("https://files.pushshift.io/reddit/comments/sha256sums.txt").text shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
for l in shasums.strip().split('\n'): for l in shasums.strip().split('\n'):

View File

@ -0,0 +1,22 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=tf_subreddit_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
module load parallel_sql
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash ## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh echo "#!/usr/bin/bash" > job_script.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh

View File

@ -26,4 +26,4 @@ df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet
df = df.repartition('author') df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite') df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')

View File

@ -40,6 +40,8 @@ def open_input_file(input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename] cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename): elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename] cmd = ['zstd','-dck', input_filename]
elif re.match(r'.*\.gz',input_filename):
cmd = ['gzip','-dc', input_filename]
try: try:
input_file = Popen(cmd, stdout=PIPE).stdout input_file = Popen(cmd, stdout=PIPE).stdout
except NameError as e: except NameError as e:

43
idf_authors.py Normal file
View File

@ -0,0 +1,43 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
max_subreddit_week_authors = df.groupby(['subreddit','week']).max('tf')
max_subreddit_week_authors = max_subreddit_week_authors.withColumnRenamed('max(tf)','sr_week_max_tf')
df = df.join(max_subreddit_week_authors, ['subreddit','week'])
df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf)
# group by term / week
idf = df.groupby(['author','week']).count()
idf = idf.withColumnRenamed('count','idf')
# output: term | week | df
#idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
# collect the dictionary to make a pydict of terms to indexes
authors = idf.select('author').distinct()
authors = authors.withColumn('author_id',f.monotonically_increasing_id())
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on='author')
idf = idf.join(terms,on='author')
# join on subreddit/term/week to create tf/dfs indexed by term
df = df.join(idf, on=['author_id','week','author'])
# agg terms by subreddit to make sparse tf/df vectors
df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf)
df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps'))
df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps'))
# output: subreddit | week | tf/df
df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy')

58
idf_comments.py Normal file
View File

@ -0,0 +1,58 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp")
max_subreddit_week_terms = df.groupby(['subreddit','week']).max('tf')
max_subreddit_week_terms = max_subreddit_week_terms.withColumnRenamed('max(tf)','sr_week_max_tf')
df = df.join(max_subreddit_week_terms, ['subreddit','week'])
df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf)
# group by term / week
idf = df.groupby(['term','week']).count()
idf = idf.withColumnRenamed('count','idf')
# output: term | week | df
#idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select('term').distinct()
terms = terms.withColumn('term_id',f.monotonically_increasing_id())
# print('collected terms')
# terms = [t.term for t in terms]
# NTerms = len(terms)
# term_id_map = {term:i for i,term in enumerate(sorted(terms))}
# term_id_map = spark.sparkContext.broadcast(term_id_map)
# print('term_id_map is broadcasted')
# def map_term(x):
# return term_id_map.value[x]
# map_term_udf = f.udf(map_term)
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on='term')
idf = idf.join(terms,on='term')
# join on subreddit/term/week to create tf/dfs indexed by term
df = df.join(idf, on=['term_id','week','term'])
# agg terms by subreddit to make sparse tf/df vectors
df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf)
df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps'))
df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps'))
# output: subreddit | week | tf/df
df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf.parquet',mode='overwrite',compression='snappy')

View File

@ -4,8 +4,11 @@ user_agent='nathante teblunthuis <nathante@uw.edu>'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments' output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments'
base_url='https://files.pushshift.io/reddit/comments/' base_url='https://files.pushshift.io/reddit/comments/'
wget -r --no-parent -A 'RC_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
./check_comment_shas.py # starting in 2020 we use daily dumps not monthly dumps
wget -r --no-parent -A 'RC_202*.gz' -U $user_agent -P $output_dir -nd -nc $base_url/daily/
./check_comments_shas.py

View File

@ -11,4 +11,4 @@ wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/ wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/ wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
./check_submissions_shas.py ./check_submission_shas.py

8
run_tf_jobs.sh Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
module load parallel_sql
source ./bin/activate
python3 tf_comments.py gen_task_list
psu --del --Y
cat tf_task_list | psu --load
for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;

13
sort_tf_comments.py Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/")
df = df.repartition(2000,'term')
df = df.sort(['term','week','subreddit'])
df = df.sortWithinPartitions(['term','week','subreddit'])
df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')

View File

@ -1,10 +1,8 @@
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash #!/usr/bin/env bash
echo "!#/usr/bin/bash" > job_script.sh ./parse_submissions.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/submissions_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 job_script.sh
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@ -4,7 +4,6 @@
# 1. from gz to arrow parquet (this script) # 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
import json
from datetime import datetime from datetime import datetime
from multiprocessing import Pool from multiprocessing import Pool
from itertools import islice from itertools import islice
@ -12,19 +11,23 @@ from helper import find_dumps, open_fileset
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
import simdjson
import fire
import os
parser = simdjson.Parser()
def parse_submission(post, names = None): def parse_submission(post, names = None):
if names is None: if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try: try:
post = json.loads(post) post = parser.parse(post)
except (json.decoder.JSONDecodeError, UnicodeDecodeError) as e: except (ValueError) as e:
# print(e) # print(e)
# print(post) # print(post)
row = [None for _ in names] row = [None for _ in names]
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,post) row[-1] = "Error parsing json|{0}|{1}".format(e,post)
return tuple(row) return tuple(row)
row = [] row = []
@ -55,18 +58,11 @@ def parse_submission(post, names = None):
row.append(post[name]) row.append(post[name])
return tuple(row) return tuple(row)
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions" def parse_dump(partition):
files = list(find_dumps(dumpdir))
pool = Pool(28)
stream = open_fileset(files)
N = 100000
rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28))
N=10000
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
rows = map(parse_submission,stream)
schema = pa.schema([ schema = pa.schema([
pa.field('id', pa.string(),nullable=True), pa.field('id', pa.string(),nullable=True),
pa.field('author', pa.string(),nullable=True), pa.field('author', pa.string(),nullable=True),
@ -96,7 +92,10 @@ schema = pa.schema([
pa.field('quarantine',pa.bool_(),nullable=True), pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)]) pa.field('error',pa.string(),nullable=True)])
with pq.ParquetWriter("/gscratch/comdata/output/reddit_submissions.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer: if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True: while True:
chunk = islice(rows,N) chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names) pddf = pd.DataFrame(chunk, columns=schema.names)
@ -107,3 +106,13 @@ with pq.ParquetWriter("/gscratch/comdata/output/reddit_submissions.parquet_temp
writer.close() writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("parse_submissions_task_list",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
if __name__ == "__main__":
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@ -17,7 +17,7 @@ conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200) conf = conf.set('spark.debug.maxToStringFields',200)
sqlContext = pyspark.SQLContext(sc) sqlContext = pyspark.SQLContext(sc)
df = spark.read.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet") df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit') df = df.drop('subreddit')
@ -32,13 +32,11 @@ df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
df = df.repartition("subreddit") df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True) df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True) df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
# # we also want to have parquet files sorted by author then reddit. # # we also want to have parquet files sorted by author then reddit.
df = df.repartition("author") df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True) df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True) df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy') df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')
os.remove("/gscratch/comdata/output/reddit_submissions.parquet_temp")

191
tf_comments.py Executable file
View File

@ -0,0 +1,191 @@
#!/usr/bin/env python3
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
import pandas as pd
import os
import datetime
import re
from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
from nltk.corpus import stopwords
from nltk.util import ngrams
import string
from random import random
# remove urls
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
ngram_output = partition.replace("parquet","txt")
if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
pa.field('term', pa.string(), nullable=False),
pa.field('week', pa.date32(), nullable=False),
pa.field('tf', pa.int64(), nullable=False)]
)
author_schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
pa.field('author', pa.string(), nullable=False),
pa.field('week', pa.date32(), nullable=False),
pa.field('tf', pa.int64(), nullable=False)]
)
dfs = (b.to_pandas() for b in batches)
def add_week(df):
df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
return(df)
dfs = (add_week(df) for df in dfs)
def iterate_rows(dfs):
for df in dfs:
for row in df.itertuples():
yield row
rows = iterate_rows(dfs)
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
mwe_tokenizer = MWETokenizer(mwe_phrases)
mwe_tokenize = mwe_tokenizer.tokenize
else:
mwe_tokenize = MWETokenizer().tokenize
def remove_punct(sentence):
new_sentence = []
for token in sentence:
new_token = ''
for c in token:
if c not in string.punctuation:
new_token += c
if len(new_token) > 0:
new_sentence.append(new_token)
return new_sentence
stopWords = set(stopwords.words('english'))
# we follow the approach described in datta, phelan, adar 2017
def my_tokenizer(text):
# remove stopwords, punctuation, urls, lower case
# lowercase
text = text.lower()
# remove urls
text = urlregex.sub("", text)
# sentence tokenize
sentences = sent_tokenize(text)
# wordpunct_tokenize
sentences = map(wordpunct_tokenize, sentences)
# remove punctuation
sentences = map(remove_punct, sentences)
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
# datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase.
# they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms
# here we take a 10 percent sample of sentences
if mwe_pass == 'first':
sentences = list(sentences)
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
if token not in stopWords:
yield token
else:
# remove stopWords
sentences = map(mwe_tokenize, sentences)
sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences)
for sentence in sentences:
for token in sentence:
yield token
def tf_comments(subreddit_weeks):
for key, posts in subreddit_weeks:
subreddit, week = key
tfs = Counter([])
authors = Counter([])
for post in posts:
tokens = my_tokenizer(post.body)
tfs.update(tokens)
authors.update([post.author])
for term, tf in tfs.items():
yield [True, subreddit, term, week, tf]
for author, tf in authors.items():
yield [False, subreddit, author, week, tf]
outrows = tf_comments(subreddit_weeks)
outchunksize = 10000
with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
while True:
chunk = islice(outrows,outchunksize)
chunk = (c for c in chunk if c[1] is not None)
pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
author_pddf = pddf.loc[pddf.is_token == False, schema.names]
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
if table.shape[0] == 0:
break
writer.write_table(table)
author_writer.write_table(author_table)
writer.close()
author_writer.close()
def gen_task_list(mwe_pass='first'):
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
with open("tf_task_list",'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,
"weekly_tf":weekly_tf})

58
top_comment_phrases.py Normal file
View File

@ -0,0 +1,58 @@
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.getOrCreate()
df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
df = df.withColumnRenamed("value","phrase")
# count phrase occurrances
phrases = df.groupby('phrase').count()
phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
# count overall
N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
print(f'analyzing PMI on a sample of {N} phrases')
logN = np.log(N)
phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
# count term occurrances
phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
win = Window.partitionBy('term')
terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
terms = terms.withColumnRenamed('count','termCount')
terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
# join phrases to term counts
df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
df = df.sort(['phrasePWMI'],descending=True)
df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/")
df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet")
df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
#
df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas()
df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather")
df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")