18
0

9 Commits

61 changed files with 658 additions and 1324 deletions

View File

@@ -1,74 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import FloatType
import zlib
def zlib_entropy_rate(s):
sb = s.encode()
if len(sb) == 0:
return None
else:
return len(zlib.compress(s.encode(),level=6))/len(s.encode())
zlib_entropy_rate_udf = f.udf(zlib_entropy_rate,FloatType())
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet",compression='snappy')
df = df.withColumn("saidbot",f.lower(f.col("body")).like("%bot%"))
# df = df.filter(df.subreddit=='seattle')
# df = df.cache()
botreplies = df.filter(f.lower(df.body).rlike(".*[good|bad] bot.*"))
botreplies = botreplies.select([f.col("parent_id").substr(4,100).alias("bot_comment_id"),f.lower(f.col("body")).alias("good_bad_bot"),f.col("link_id").alias("gbbb_link_id")])
botreplies = botreplies.groupby(['bot_comment_id']).agg(f.count('good_bad_bot').alias("N_goodbad_votes"),
f.sum((f.lower(f.col('good_bad_bot')).like('%good bot%').astype("double"))).alias("n_good_votes"),
f.sum((f.lower(f.col('good_bad_bot')).like('%bad bot%').astype("double"))).alias("n_bad_votes"))
comments_by_author = df.select(['author','id','saidbot']).groupBy('author').agg(f.count('id').alias("N_comments"),
f.mean(f.col('saidbot').astype("double")).alias("prop_saidbot"),
f.sum(f.col('saidbot').astype("double")).alias("n_saidbot"))
# pd_comments_by_author = comments_by_author.toPandas()
# pd_comments_by_author['frac'] = 500 / pd_comments_by_author['N_comments']
# pd_comments_by_author.loc[pd_comments_by_author.frac > 1, 'frac'] = 1
# fractions = pd_comments_by_author.loc[:,['author','frac']]
# fractions = fractions.set_index('author').to_dict()['frac']
# sampled_author_comments = df.sampleBy("author",fractions).groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
df = df.withColumn("randn",f.randn(seed=1968))
win = Window.partitionBy("author").orderBy("randn")
df = df.withColumn("randRank",f.rank().over(win))
sampled_author_comments = df.filter(f.col("randRank") <= 1000)
sampled_author_comments = sampled_author_comments.groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
author_entropy_rates = sampled_author_comments.select(['author',zlib_entropy_rate_udf(f.col('comments')).alias("entropy_rate")])
parents = df.join(botreplies, on=df.id==botreplies.bot_comment_id,how='right_outer')
win1 = Window.partitionBy("author")
parents = parents.withColumn("first_bot_reply",f.min(f.col("CreatedAt")).over(win1))
first_bot_reply = parents.filter(f.col("first_bot_reply")==f.col("CreatedAt"))
first_bot_reply = first_bot_reply.withColumnRenamed("CreatedAt","FB_CreatedAt")
first_bot_reply = first_bot_reply.withColumnRenamed("id","FB_id")
comments_since_first_bot_reply = df.join(first_bot_reply,on = 'author',how='right_outer').filter(f.col("CreatedAt")>=f.col("first_bot_reply"))
comments_since_first_bot_reply = comments_since_first_bot_reply.groupBy("author").agg(f.count("id").alias("N_comments_since_firstbot"))
bots = parents.groupby(['author']).agg(f.sum('N_goodbad_votes').alias("N_goodbad_votes"),
f.sum(f.col('n_good_votes')).alias("n_good_votes"),
f.sum(f.col('n_bad_votes')).alias("n_bad_votes"),
f.count(f.col('author')).alias("N_bot_posts"))
bots = bots.join(comments_by_author,on="author",how='left_outer')
bots = bots.join(comments_since_first_bot_reply,on="author",how='left_outer')
bots = bots.join(author_entropy_rates,on='author',how='left_outer')
bots = bots.orderBy("N_goodbad_votes",ascending=False)
bots = bots.repartition(1)
bots.write.parquet("/gscratch/comdata/output/reddit_good_bad_bot.parquet",mode='overwrite')

View File

@@ -1,199 +1,36 @@
#srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
srun_singularity=srun -p compute-bigmem -A comdata --time=48:00:00 --mem=362G -c 40 /bin/bash -c
similarity_data=../../data/reddit_similarity
clustering_data=../../data/reddit_clustering
kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
authors_10k_input=$(similarity_data)/subreddit_comment_authors_10k.feather
authors_10k_input_lsi=$(similarity_data)/subreddit_comment_authors_10k_LSI
authors_10k_output=$(clustering_data)/subreddit_comment_authors_10k
authors_10k_output_lsi=$(clustering_data)/subreddit_comment_authors_10k_LSI
authors_tf_10k_input=$(similarity_data)/subreddit_comment_authors-tf_10k.feather
authors_tf_10k_input_lsi=$(similarity_data)/subreddit_comment_authors-tf_10k_LSI
authors_tf_10k_output=$(clustering_data)/subreddit_comment_authors-tf_10k
authors_tf_10k_output_lsi=$(clustering_data)/subreddit_comment_authors-tf_10k_LSI
terms_10k_input=$(similarity_data)/subreddit_comment_terms_10k.feather
terms_10k_input_lsi=$(similarity_data)/subreddit_comment_terms_10k_LSI
terms_10k_output=$(clustering_data)/subreddit_comment_terms_10k
terms_10k_output_lsi=$(clustering_data)/subreddit_comment_terms_10k_LSI
all:terms_10k authors_10k authors_tf_10k terms_10k_lsi authors_10k_lsi authors_tf_10k_lsi
terms_10k:${terms_10k_output}/kmeans/selection_data.csv ${terms_10k_output}/affinity/selection_data.csv ${terms_10k_output}/hdbscan/selection_data.csv
authors_10k:${authors_10k_output}/kmeans/selection_data.csv ${authors_10k_output}/hdbscan/selection_data.csv ${authors_10k_output}/affinity/selection_data.csv
authors_tf_10k:${authors_tf_10k_output}/kmeans/selection_data.csv ${authors_tf_10k_output}/hdbscan/selection_data.csv ${authors_tf_10k_output}/affinity/selection_data.csv
terms_10k_lsi:${terms_10k_output_lsi}/kmeans/selection_data.csv ${terms_10k_output_lsi}/affinity/selection_data.csv ${terms_10k_output_lsi}/hdbscan/selection_data.csv
authors_10k_lsi:${authors_10k_output_lsi}/kmeans/selection_data.csv ${authors_10k_output_lsi}/hdbscan/selection_data.csv ${authors_10k_output_lsi}/affinity/selection_data.csv
all:authors_tf_10k_lsi
authors_tf_10k_lsi:${authors_tf_10k_output_lsi}/kmeans/selection_data.csv ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
${authors_10k_output}/kmeans/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/kmeans --savefile=${authors_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${terms_10k_output}/kmeans/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/kmeans --savefile=${terms_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_tf_10k_output}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/kmeans --savefile=${authors_tf_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_10k_output}/affinity/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/affinity --savefile=${authors_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${terms_10k_output}/affinity/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/affinity --savefile=${terms_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_tf_10k_output}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/affinity --savefile=${authors_tf_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output}/hdbscan/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/hdbscan --savefile=${authors_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output}/hdbscan/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/hdbscan --savefile=${terms_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/hdbscan --savefile=${authors_tf_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
## LSI Models
${authors_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/kmeans --savefile=${authors_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${terms_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/kmeans --savefile=${terms_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_tf_10k_output_lsi}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/kmeans --savefile=${authors_tf_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_10k_output_lsi}/affinity/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/affinity --savefile=${authors_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${terms_10k_output_lsi}/affinity/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/affinity --savefile=${terms_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
$(srun_singularity) -c "source ~/.bashrc; python3 kmeans_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/kmeans --savefile=${authors_tf_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)"
${authors_tf_10k_output_lsi}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/affinity --savefile=${authors_tf_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/hdbscan --savefile=${authors_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/hdbscan --savefile=${terms_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
$(srun_singularity) -c "source ~/.bashrc; python3 affinity_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/affinity --savefile=${authors_tf_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)"
${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
$(srun_singularity) -c "source ~/.bashrc; python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)"
${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
$(srun_singularity) -c "source ~/.bashrc; python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2"
clean_affinity:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/affinity/selection_data.csv
${authors_tf_10k_input_lsi}:
$(MAKE) -C ../similarities
clean_kmeans:
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
clean_hdbscan:
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_authors:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
clean_authors_tf:
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
clean_terms:
rm -f ${terms_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_lsi_affinity:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
clean_lsi_kmeans:
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
clean_lsi_hdbscan:
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors_tf:
clean:
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_terms:
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean: clean_affinity clean_kmeans clean_hdbscan
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_terms_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_terms_30k.feather $(clustering_data)/subreddit_comment_terms_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
# $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS:clustering.py $(similarity_data)/subreddit_comment_authors-tf_30k.feather
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors-tf_30k.feather $(clustering_data)/subreddit_comment_authors-tf_30k $(selection_grid) -J 8 && touch $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_authors_100k.feather:clustering.py $(similarity_data)/subreddit_comment_authors_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/subreddit_comment_authors_100k.feather $(clustering_data)/subreddit_comment_authors_100k.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85
# $(clustering_data)/comment_terms_100k.feather:clustering.py $(similarity_data)/subreddit_comment_terms_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/comment_terms_10000.feather $(clustering_data)/comment_terms_10000.feather ---max_iter=1000 --convergence_iter=15 --preference_quantile=0.9 --damping=0.5
# $(clustering_data)/subreddit_comment_author-tf_100k.feather:clustering.py $(similarity_data)/subreddit_comment_author-tf_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/subreddit_comment_author-tf_100k.parquet $(clustering_data)/subreddit_comment_author-tf_100k.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.5 --damping=0.85
# it's pretty difficult to get a result that isn't one huge megacluster. A sign that it's bullcrap
# /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather
# ./clustering.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.9 --damping=0.85
# /gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather:fit_tsne.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
# start_spark_and_run.sh 1 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet --output=/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather
# /gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather:fit_tsne.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather
# python3 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather --output=/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather
# /gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# # $srun_cdsc python3
# start_spark_and_run.sh 1 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --output=/gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather
PHONY: clean

View File

@@ -1,3 +1,4 @@
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
@@ -20,10 +21,17 @@ class clustering_job:
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.clustering = self.call(self.mat, *self.args, **self.kwargs)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True
self.cleanup()
def cleanup(self):
self.cluster_data = None
self.mat = None
self.clustering=None
self.subreddits=None
def get_info(self):
if not self.hasrun:
@@ -54,11 +62,13 @@ class clustering_job:
else:
score = None
self.silsampout = None
return score
def read_distance_mat(self, similarities, use_threads=True):
print(similarities)
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
mat = np.array(df.drop('_subreddit',axis=1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,1-mat)
@@ -72,9 +82,13 @@ class clustering_job:
self.n_clusters = len(set(clusters))
print(f"found {self.n_clusters} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
self.score = self.silhouette()
print(f"silhouette_score:{self.score}")
cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
@@ -95,6 +109,38 @@ class clustering_job:
return cluster_data
class twoway_clustering_job(clustering_job):
def __init__(self, infile, outpath, name, call1, call2, args1, args2):
self.outpath = Path(outpath)
self.call1 = call1
self.args1 = args1
self.call2 = call2
self.args2 = args2
self.infile = Path(infile)
self.name = name
self.hasrun = False
self.args = args1|args2
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.step1 = self.call1(self.mat, **self.args1)
self.clustering = self.call2(self.mat, self.step1, **self.args2)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.hasrun = True
self.after_run()
self.cleanup()
def after_run(self):
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
print(self.outpath/(self.name+".feather"))
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
def cleanup(self):
super().cleanup()
self.step1 = None
@dataclass
class clustering_result:
outpath:Path

View File

@@ -1,34 +0,0 @@
import fire
import pyarrow
import pandas as pd
from numpy import random
import numpy as np
from sklearn.manifold import TSNE
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet"
def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=10000, early_exaggeration=20):
'''
similarities: feather file with a dataframe of similarity scores
learning_rate: parameter controlling how fast the model converges. Too low and you get outliers. Too high and you get a ball.
perplexity: number of neighbors to use. the default of 50 is often good.
'''
df = pd.read_feather(similarities)
n = df.shape[0]
mat = np.array(df.drop('_subreddit',1),dtype=np.float64)
mat[range(n),range(n)] = 1
mat[mat > 1] = 1
dist = 2*np.arccos(mat)/np.pi
tsne_model = TSNE(2,learning_rate=750,perplexity=50,n_iter=10000,metric='precomputed',early_exaggeration=20,n_jobs=-1)
tsne_fit_model = tsne_model.fit(dist)
tsne_fit_whole = tsne_fit_model.fit_transform(dist)
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], '_subreddit':df['_subreddit']})
plot_data.to_feather(output)
if __name__ == "__main__":
fire.Fire(fit_tsne)

View File

@@ -31,3 +31,19 @@ class grid_sweep:
outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv)
class twoway_grid_sweep(grid_sweep):
def __init__(self, jobtype, inpath, outpath, namer, args1, args2, *args, **kwargs):
self.jobtype = jobtype
self.namer = namer
prod1 = product(* args1.values())
prod2 = product(* args2.values())
grid1 = [dict(zip(args1.keys(), pargs)) for pargs in prod1]
grid2 = [dict(zip(args2.keys(), pargs)) for pargs in prod2]
grid = product(grid1, grid2)
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(**(g[0] | g[1])), g[0], g[1], *args) for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]

View File

@@ -1,5 +1,5 @@
from clustering_base import clustering_job, clustering_result
from grid_sweep import grid_sweep
from grid_sweep import grid_sweep, twoway_grid_sweep
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
@@ -27,3 +27,18 @@ class lsi_grid_sweep(grid_sweep):
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
class twoway_lsi_grid_sweep(twoway_grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, args1, args2):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*.feather"))
else:
lsi_paths = [inpath / (str(dim) + '.feather') for dim in lsi_dimensions]
lsi_nums = [int(p.stem) for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, args1, args2) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))

4
clustering/validation.py Normal file
View File

@@ -0,0 +1,4 @@
from sklearn import metrics
from sklearn.cluster import AffinityPropagation
from functools import partial
# sillouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.

28
datasets/Makefile Normal file
View File

@@ -0,0 +1,28 @@
all: ../../data/reddit_comments_by_subreddit.parquet ../../data/reddit_submissions_by_subreddit.parquet
../../data/reddit_comments_by_subreddit.parquet:../../data/temp/reddit_comments.parquet
../start_spark_and_run.sh 4 comments_2_parquet_part2.py
../../data/temp/reddit_comments.parquet: comments_task_list.sh run_comments_jobs.sbatch
mkdir -p comments_jobs
mkdir -p ../../data/temp/
sbatch --wait --array=1-$(shell cat comments_task_list.sh | wc -l) run_comments_jobs.sbatch 0
temp_reddit_comments.parquet: ../../data/temp/reddit_comments.parquet
comments_task_list.sh: comments_2_parquet_part1.py
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 bash -c "source ~/.bashrc && python3 comments_2_parquet_part1.py gen_task_list --overwrite=False"
submissions_task_list.sh: submissions_2_parquet_part1.py
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 submissions_2_parquet_part1.py gen_task_list
../../data/reddit_submissions_by_subreddit.parquet:../../data/temp/reddit_submissions.parquet
../start_spark_and_run.sh 4 submissions_2_parquet_part2.py
../../data/temp/reddit_submissions.parquet: submissions_task_list.sh run_submissions_jobs.sbatch
mkdir -p submissions_jobs
rm -rf ../../data/temp/reddit_submissions.parquet
mkdir -p ../../data/temp/
sbatch --wait --array=1-$(shell cat submissions_task_list.sh | wc -l) run_submissions_jobs.sbatch 0
temp_reddit_submissions.parquet: ../../data/temp/reddit_submissions.parquet

View File

@@ -1,26 +0,0 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=tf_subreddit_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
source ./bin/activate
module load parallel_sql
echo $(which perl)
conda list pyarrow
which python3
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env bash
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh
#echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py

View File

@@ -1,12 +1,15 @@
#!/usr/bin/env python3
import os
import json
from datetime import datetime
from multiprocessing import Pool
from itertools import islice
from helper import find_dumps, open_fileset
from helper import open_input_file, find_dumps
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import fire
def parse_comment(comment, names= None):
if names is None:
@@ -44,19 +47,14 @@ def parse_comment(comment, names= None):
return tuple(row)
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','../../data/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
def parse_dump(partition):
files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
dumpdir = f"../../data/reddit_dumps/comments/{partition}"
pool = Pool(28)
stream = open_fileset(files)
N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
stream = open_input_file(dumpdir)
rows = map(parse_comment, stream)
schema = pa.schema([
pa.field('id', pa.string(), nullable=True),
@@ -78,33 +76,16 @@ schema = pa.schema([
pa.field('error', pa.string(), nullable=True),
])
from pathlib import Path
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
p = Path("../../data/temp/reddit_comments.parquet")
p.mkdir(exist_ok=True,parents=True)
if not p.is_dir():
if p.exists():
p.unlink()
p.mkdir()
else:
list(map(Path.unlink,p.glob('*')))
part_size = int(1e7)
part = 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
N=10000
with pq.ParquetWriter(f"../../data/temp/reddit_comments.parquet/{partition}.parquet",
schema=schema,
compression='snappy',
flavor='spark') as writer:
while True:
if n_output > part_size:
if part > 1:
writer.close()
part = part + 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
n_output += N
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
@@ -112,4 +93,19 @@ while True:
break
writer.write_table(table)
writer.close()
def gen_task_list(dumpdir="../../data/raw_data/reddit_dumps/comments", overwrite=True):
files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
with open("comments_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
if (not Path(f"../../data/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
if __name__ == '__main__':
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@@ -2,12 +2,19 @@
# spark script to make sorted, and partitioned parquet files
import pyspark
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
conf = conf.set("spark.sql.shuffle.partitions",2400)
conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200)
sc = spark.sparkContext
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
@@ -18,12 +25,13 @@ df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
# df = df.repartition(1200,'subreddit')
# df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
# df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
# df2.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
#df = spark.read.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet")
df = df.repartition(2400,'author','subreddit',"Year","Month","Day")
df3 = df.sort(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
df3 = df3.sortWithinPartitions(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
df3.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')

View File

@@ -24,8 +24,7 @@ def open_fileset(files):
for fh in files:
print(fh)
lines = open_input_file(fh)
for line in lines:
yield line
yield from lines
def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename):
@@ -39,7 +38,7 @@ def open_input_file(input_filename):
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename]
cmd = ['/kloneusr/bin/zstd','-dck', input_filename, '--memory=2048MB --stdout']
elif re.match(r'.*\.gz',input_filename):
cmd = ['gzip','-dc', input_filename]
try:

View File

@@ -1,4 +0,0 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
stop-all.sh

View File

@@ -0,0 +1,24 @@
#!/bin/bash
## tf reddit comments
#SBATCH --job-name="cdsc_reddit; parse comment dumps"
## Allocation Definition
#SBATCH --account=comdata
#SBATCH --partition=compute-bigmem
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=24:00:00
## Memory per node
#SBATCH --mem=8G
#SBATCH --cpus-per-task=1
#SBATCH --ntasks=1
#SBATCH
#SBATCH --chdir /gscratch/comdata/users/nathante/partitioning_reddit/dataverse/cdsc_reddit/datasets
#SBATCH --output=comments_jobs/%A_%a.out
#SBATCH --error=comments_jobs/%A_%a.out
. /opt/ohpc/admin/lmod/lmod/init/profile
source ~/.bashrc
TASK_NUM=$(( SLURM_ARRAY_TASK_ID + $1))
TASK_CALL=$(sed -n ${TASK_NUM}p ./comments_task_list.sh)
${TASK_CALL}

View File

@@ -0,0 +1,23 @@
#!/bin/bash
## tf reddit comments
#SBATCH --job-name="cdsc_reddit; parse submission dumps"
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=24:00:00
## Memory per node
#SBATCH --mem=8G
#SBATCH --cpus-per-task=1
#SBATCH --ntasks=1
#SBATCH
#SBATCH --chdir /gscratch/comdata/users/nathante/cdsc_reddit/datasets
#SBATCH --output=submissions_jobs/%A_%a.out
#SBATCH --error=submissions_jobs/%A_%a.out
TASK_NUM=$(( SLURM_ARRAY_TASK_ID + $1))
TASK_CALL=$(sed -n ${TASK_NUM}p ./submissions_task_list.sh)
${TASK_CALL}

4
datasets/submissions_2_parquet.sh Normal file → Executable file
View File

@@ -1,8 +1,8 @@
#!/usr/bin/env bash
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash
./parse_submissions.sh
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@@ -3,26 +3,23 @@
# two stages:
# 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import simdjson
import fire
import os
parser = simdjson.Parser()
import json
def parse_submission(post, names = None):
if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try:
post = parser.parse(post)
post = json.loads(post)
except (ValueError) as e:
# print(e)
# print(post)
@@ -61,7 +58,7 @@ def parse_submission(post, names = None):
def parse_dump(partition):
N=10000
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
stream = open_fileset([f"/gscratch/comdata/raw_data/submissions/{partition}"])
rows = map(parse_submission,stream)
schema = pa.schema([
pa.field('id', pa.string(),nullable=True),
@@ -92,8 +89,7 @@ def parse_dump(partition):
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
@@ -106,9 +102,9 @@ def parse_dump(partition):
writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("parse_submissions_task_list",'w') as of:
with open("submissions_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')

View File

@@ -29,14 +29,14 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
# next we gotta resort it all.
df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df = df.repartition(800,"subreddit","Year","Month")
df2 = df.sort(["subreddit","Year","Month","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
# # we also want to have parquet files sorted by author then reddit.
df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df = df.repartition(800,"author","subreddit","Year","Month")
df3 = df.sort(["author","Year","Month","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')

View File

@@ -1,16 +1,7 @@
all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscratch/comdata/output/reddit_density/comment_authors_10000.feather /gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather
all: ../../data/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather
/gscratch/comdata/output/reddit_density/comment_terms_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
start_spark_and_run.sh 1 overlap_density.py terms --inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather" --agg=pd.DataFrame.sum
../../data/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather: overlap_density.py ../../data/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather
../start_spark_and_run.sh 1 overlap_density.py authors --inpath="../../data/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather" --outpath="../../data/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/comment_authors_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather" --agg=pd.DataFrame.sum
../../data/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather:
$(MAKE) -C ../similarities

View File

@@ -1,4 +1,6 @@
#!/usr/bin/bash
source ~/.bashrc
echo $(hostname)
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname).hyak.local:7077 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather --outpath=/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather --agg=pd.DataFrame.sum
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh
spark-submit --verbose --master spark://$(hostname):43015 overlap_density.py authors --inpath=../../data/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather --outpath=../../data/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather --agg=pd.DataFrame.sum
stop-all.sh

View File

@@ -4,9 +4,9 @@ from pathlib import Path
import fire
import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf
# sys.path.append("..")
# sys.path.append("../similarities")
# from similarities.similarities_helper import pull_tfidf
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member

View File

@@ -8,7 +8,7 @@ import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
#shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
shasums = shasums1
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
for l in shasums.strip().split('\n'):

View File

@@ -0,0 +1,34 @@
from pathlib import Path
from itertools import chain, groupby
dumpdir = Path("/gscratch/comdata/raw_data/reddit_dumps/comments")
zst_files = dumpdir.glob("*.zst")
bz2_files = dumpdir.glob("*.bz2")
xz_files = dumpdir.glob("*.xz")
all_files = sorted(list(chain(zst_files, bz2_files, xz_files)))
groups = groupby(all_files, key = lambda p: p.stem)
kept_paths = []
removed_paths = []
priority = ['.zst','.xz','.bz2']
for stem, files in groups:
keep_file = None
remove_files = []
for f in files:
if keep_file is None:
keep_file = f
elif priority.index(keep_file.suffix) > priority.index(f.suffix):
remove_files.append(keep_file)
keep_file = f
else:
remove_files.append(f)
kept_paths.append(keep_file)
removed_paths.extend(remove_files)
(dumpdir / "to_remove").mkdir()
for f in removed_paths:
f.rename(f.parent / "to_remove" / f.name)

View File

@@ -0,0 +1,34 @@
from pathlib import Path
from itertools import chain, groupby
dumpdir = Path("/gscratch/comdata/raw_data/reddit_dumps/submissions")
zst_files = dumpdir.glob("*.zst")
bz2_files = dumpdir.glob("*.bz2")
xz_files = dumpdir.glob("*.xz")
all_files = sorted(list(chain(zst_files, bz2_files, xz_files)))
groups = groupby(all_files, key = lambda p: p.stem)
kept_paths = []
removed_paths = []
priority = ['.zst','.xz','.bz2']
for stem, files in groups:
keep_file = None
remove_files = []
for f in files:
if keep_file is None:
keep_file = f
elif priority.index(keep_file.suffix) > priority.index(f.suffix):
remove_files.append(keep_file)
keep_file = f
else:
remove_files.append(f)
kept_paths.append(keep_file)
removed_paths.extend(remove_files)
(dumpdir / "to_remove").mkdir()
for f in removed_paths:
f.rename(f.parent / "to_remove" / f.name)

View File

@@ -1,17 +0,0 @@
import pyarrow.dataset as ds
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
dataset = ds.dataset('/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/', format='parquet')
# let's get all the comments to two subreddits:
subreddits_to_pull = ['seattle','seattlewa']
# a table is a low-level structured data format. This line pulls data into memory. Setting metadata_n_threads > 1 gives a little speed boost.
table = dataset.to_table(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
# Since data from just these 2 subreddits fits in memory we can just turn our table into a pandas dataframe.
df = table.to_pandas()
# We should save this smaller dataset so we don't have to wait 15 min to pull from parquet next time.
df.to_csv("mydataset.csv")

View File

@@ -1,38 +0,0 @@
import pyarrow.dataset as ds
from itertools import groupby
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
# let's get all the comments to two subreddits:
subreddits_to_pull = ['seattlewa','seattle']
# instead of loading the data into a pandas dataframe all at once we can stream it.
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
# simple function to execute scantasks and generate rows
def iterate_rows(scan_tasks):
for st in scan_tasks:
for rb in st.execute():
df = rb.to_pandas()
for t in df.itertuples():
yield t
row_iter = iterate_rows(scan_tasks)
# now we can use python's groupby function to read one author at a time
# note that the same author can appear more than once since the record batches may not be in the correct order.
author_submissions = groupby(row_iter, lambda row: row.author)
count_dict = {}
for auth, posts in author_submissions:
if auth in count_dict:
count_dict[auth] = count_dict[auth] + 1
else:
count_dict[auth] = 1
# since it's partitioned and sorted by author, we get one group for each author
any([ v != 1 for k,v in count_dict.items()])

25
ngrams/Makefile Normal file
View File

@@ -0,0 +1,25 @@
outputdir=../../data/reddit_ngrams/
inputdir=../../data/reddit_comments_by_subreddit.parquet
authors_tfdir=${outputdir}/comment_authors.parquet
srun=sbatch --wait --verbose run_job.sbatch
all: ${outputdir}/comment_authors_sorted.parquet/_SUCCESS
tf_task_list_1: tf_comments.py
${srun} bash -c "python3 tf_comments.py gen_task_list --mwe_pass='first' --outputdir=${outputdir} --tf_task_list=$@ --inputdir=${inputdir}"
${outputdir}/comment_terms.parquet:tf_task_list_1
mkdir -p sbatch_log
sbatch --wait --verbose --array=1-$(shell cat $< | wc -l) run_array.sbatch 0 $<
${outputdir}/comment_authors.parquet:${outputdir}/comment_terms.parquet
-
${outputdir}/comment_authors_sorted.parquet:${outputdir}/comment_authors.parquet sort_tf_comments.py
../start_spark_and_run.sh 3 sort_tf_comments.py --inparquet=$< --outparquet=$@ --colname=author
${outputdir}/comment_authors_sorted.parquet/_SUCCESS:${outputdir}/comment_authors_sorted.parquet
${inputdir}:
$(MAKE) -C ../datasets

19
ngrams/run_array.sbatch Executable file
View File

@@ -0,0 +1,19 @@
#!/bin/bash
#SBATCH --job-name=reddit_comment_term_frequencies
#SBATCH --account=comdata
#SBATCH --partition=compute-bigmem
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=9g
#SBATCH --ntasks=1
#SBATCH --export=ALL
#SBATCH --time=48:00:00
#SBATCH --chdir=/gscratch/comdata/users/nathante/partitioning_reddit/dataverse/cdsc_reddit/ngrams
#SBATCH --error="sbatch_log/%A_%a.out"
#SBATCH --output="sbatch_log/%A_%a.out"
TASK_NUM=$(($SLURM_ARRAY_TASK_ID + $1))
TASK_CALL=$(sed -n ${TASK_NUM}p $2)
${TASK_CALL}

18
ngrams/run_job.sbatch Normal file
View File

@@ -0,0 +1,18 @@
#!/bin/bash
#SBATCH --job-name="simulate measurement error models"
## Allocation Definition
#SBATCH --account=comdata
#SBATCH --partition=compute-bigmem
## Resources
#SBATCH --nodes=1
## Walltime (4 hours)
#SBATCH --time=4:00:00
## Memory per node
#SBATCH --mem=4G
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=1
#SBATCH --chdir /gscratch/comdata/users/nathante/partitioning_reddit/dataverse/cdsc_reddit/ngrams/
#SBATCH --output=sbatch_log/%A_%a.out
#SBATCH --error=sbatch_log/%A_%a.err
echo "$@"
"$@"

View File

@@ -1,8 +1,6 @@
#!/usr/bin/env bash
module load parallel_sql
source ./bin/activate
python3 tf_comments.py gen_task_list
psu --del --Y
cat tf_task_list | psu --load
for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;

View File

@@ -2,12 +2,17 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import fire
def main(inparquet, outparquet, colname):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/")
df = spark.read.parquet(inparquet)
df = df.repartition(2000,'term')
df = df.sort(['term','week','subreddit'])
df = df.sortWithinPartitions(['term','week','subreddit'])
df = df.repartition(2000,colname)
df = df.sort([colname,'week','subreddit'])
df = df.sortWithinPartitions([colname,'week','subreddit'])
df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
df.write.parquet(outparquet,mode='overwrite',compression='snappy')
if __name__ == '__main__':
fire.Fire(main)

View File

@@ -3,6 +3,7 @@ import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.compute as pc
from itertools import groupby, islice, chain
import fire
from collections import Counter
@@ -14,22 +15,32 @@ from nltk.util import ngrams
import string
from random import random
from redditcleaner import clean
from pathlib import Path
from datetime import datetime
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
dataset = ds.dataset(Path(inputdir)/partition, format='parquet')
outputdir = Path(outputdir)
samppath = outputdir / "reddit_comment_ngrams_10p_sample"
if not samppath.exists():
samppath.mkdir(parents=True, exist_ok=True)
ngram_output = partition.replace("parquet","txt")
if mwe_pass == 'first':
if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
if excluded_users is not None:
excluded_users = set(map(str.strip,open(excluded_users)))
df = df.filter(~ (f.col("author").isin(excluded_users)))
ngram_path = samppath / ngram_output
if mwe_pass == 'first':
if ngram_path.exists():
ngram_path.unlink()
dataset = dataset.filter(pc.field("CreatedAt") <= pa.scalar(datetime(2020,4,13)))
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@@ -62,8 +73,10 @@ def weekly_tf(partition, mwe_pass = 'first'):
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
mwe_path = outputdir / "multiword_expressions.feather"
if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
mwe_dataset = pd.read_feather(mwe_path)
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -115,7 +128,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
with open(ngram_path,'a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
@@ -150,7 +163,14 @@ def weekly_tf(partition, mwe_pass = 'first'):
outchunksize = 10000
with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
termtf_outputdir = (outputdir / "comment_terms.parquet")
termtf_outputdir.mkdir(parents=True, exist_ok=True)
authortf_outputdir = (outputdir / "comment_authors.parquet")
authortf_outputdir.mkdir(parents=True, exist_ok=True)
termtf_path = termtf_outputdir / partition
authortf_path = authortf_outputdir / partition
with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \
pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer:
while True:
@@ -179,12 +199,12 @@ def weekly_tf(partition, mwe_pass = 'first'):
author_writer.close()
def gen_task_list(mwe_pass='first'):
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
with open("tf_task_list",'w') as outfile:
def gen_task_list(mwe_pass='first', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
files = os.listdir(inputdir)
with open(tf_task_list,'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --inputdir {inputdir} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,

View File

@@ -1,58 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.getOrCreate()
df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
df = df.withColumnRenamed("value","phrase")
# count phrase occurrances
phrases = df.groupby('phrase').count()
phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
# count overall
N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
print(f'analyzing PMI on a sample of {N} phrases')
logN = np.log(N)
phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
# count term occurrances
phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
win = Window.partitionBy('term')
terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
terms = terms.withColumnRenamed('count','termCount')
terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
# join phrases to term counts
df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
df = df.sort(['phrasePWMI'],descending=True)
df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/")
df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet")
df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
#
df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas()
df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather")
df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")

22
run_array.sbatch Normal file
View File

@@ -0,0 +1,22 @@
#!/bin/bash
## tf reddit comments
#SBATCH --job-name="wikia ecology; fit var models"
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=24:00:00
## Memory per node
#SBATCH --mem=8G
#SBATCH --cpus-per-task=1
#SBATCH --ntasks=1
#SBATCH
#SBATCH --chdir /gscratch/comdata/users/nathante/wikia_ecology
#SBATCH --output=var_jobs/%A_%a.out
#SBATCH --error=var_jobs/%A_%a.out
TASK_NUM=$(( SLURM_ARRAY_TASK_ID + $1))
TASK_CALL=$(sed -n ${TASK_NUM}p ./var_jobs.sh)
${TASK_CALL}

View File

@@ -1,130 +1,28 @@
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output
similarity_data=${base_data}/reddit_similarity
srun=srun -p compute-bigmem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40
srun_huge=srun -p compute-hugemem -A comdata --mem=724g --time=200:00:00 -c 40
similarity_data=../../data/reddit_similarity
tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly
similarity_weekly_data=${similarity_data}/weekly
lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
lsi_components=[10,50,100,200,300,400,500,600,700,850]
lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
lsi_similarities: ${similarity_data}/subreddit_comment_authors-tf_10k_LSI
all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
all: ${similarity_data}/subreddit_comment_authors-tf_10k.feather
#${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
${srun_huge} /bin/bash -c "source ~/.bashrc; python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<"
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_weekly_130k.parquet
${similarity_data}/subreddits_by_num_comments_nonsfw.csv: ../../data/reddit_submissions_by_subreddit.parquet ../../data/reddit_comments_by_subreddit.parquet
../start_spark_and_run.sh 3 top_subreddits_by_comments.py
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
${tfidf_data}/comment_authors_100k.parquet: ../../data/reddit_ngrams/comment_authors_sorted.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
../start_spark_and_run.sh 3 tfidf.py authors --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
../../data/reddit_ngrams/comment_authors_sorted.parquet:
$(MAKE) -C ../ngrams
${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
../../data/reddit_submissions_by_subreddit.parquet:
$(MAKE) -C ../datasets
${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200
${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
# start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
# /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
# start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py ${tfidf_weekly_data}/comment_authors.parquet
# start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
../../data/reddit_comments_by_subreddit.parquet:
$(MAKE) -C ../datasets

View File

@@ -1,4 +1,6 @@
#!/usr/bin/bash
source ~/.bashrc
echo $(hostname)
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh
spark-submit --verbose --master spark://$(hostname):43015 tfidf.py authors --topN=100000 --inpath=../../data/reddit_ngrams/comment_authors_sorted.parquet --outpath=../../data/reddit_similarity/tfidf/comment_authors_100k.parquet
stop-all.sh

View File

@@ -5,19 +5,20 @@ from similarities_helper import *
#from similarities_helper import similarities, lsi_column_similarities
from functools import partial
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/"
term_colname='term'
outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI'
n_components=[10,50,100]
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
n_iter=5
random_state=1968
algorithm='arpack'
topN = None
from_date=None
to_date=None
min_df=None
max_df=None
# inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
# term_colname='authors'
# outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_test_compex_LSI'
# n_components=[10,50,100]
# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
# n_iter=5
# random_state=1968
# algorithm='randomized'
# topN = None
# from_date=None
# to_date=None
# min_df=None
# max_df=None
def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
print(n_components,flush=True)
@@ -62,7 +63,7 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t
n_components=n_components
)
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968):
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
'author',
outfile,

View File

@@ -43,7 +43,7 @@ def reindex_tfidf(*args, **kwargs):
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.drop("subreddit_id",axis=1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
return(df, subreddit_names)
@@ -51,8 +51,9 @@ def pull_tfidf(*args, **kwargs):
df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
return df
def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
print(f"loading tfidf {infile}", flush=True)
def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=None, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
print(f"loading tfidf {infile}, week {week}, min_df {min_df}, max_df {max_df}", flush=True)
if week is not None:
tfidf_ds = ds.dataset(infile, partitioning='hive')
else:
@@ -97,20 +98,21 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
'relative_tf':ds.field('relative_tf').cast('float32'),
'tf_idf':ds.field('tf_idf').cast('float32')}
print(projection)
print(projection, flush=True)
print(ds_filter, flush=True)
df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
df = df.to_pandas(split_blocks=True,self_destruct=True)
print("assigning indexes",flush=True)
if reindex:
df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
print("assigning indexes",flush=True)
df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup() + 1
else:
df['subreddit_id_new'] = df['subreddit_id']
if reindex:
grouped = df.groupby(term_id)
df[term_id_new] = grouped.ngroup()
df[term_id_new] = grouped.ngroup() + 1
else:
df[term_id_new] = df[term_id]
@@ -126,17 +128,6 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
return (df, tfidf_ds, ds_filter)
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
subreddit_names = subreddit_names.set_index("subreddit_id")
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
return(df, subreddit_names)
def pull_names(batch):
return(batch.to_pandas().drop_duplicates())
@@ -170,7 +161,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non
term_id_new = term + '_id_new'
entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
print("loading matrix")
@@ -256,21 +247,20 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196
else:
print("running LSI",flush=True)
svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
mod = svd.fit(tfidfmat.T)
lsimat = mod.transform(tfidfmat.T)
if lsi_model_save is not None:
Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
pickle.dump(mod, open(lsi_model_save,'wb'))
sims_list = []
print(n_components, flush=True)
lsimat = mod.transform(tfidfmat.T)
for n_dims in n_components:
print("computing similarities", flush=True)
sims = column_similarities(lsimat[:,np.arange(n_dims)])
if len(n_components) > 1:
yield (sims, n_dims)
else:
return sims
def column_similarities(mat):
@@ -326,11 +316,11 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
df = df.repartition(400,'subreddit','week')
df = df.repartition('week')
dfwriter = df.write.partitionBy("week")
return dfwriter
def _calc_tfidf(df, term_colname, tf_family):
def _calc_tfidf(df, term_colname, tf_family, min_df=None, max_df=None):
term = term_colname
term_id = term + '_id'
@@ -348,7 +338,13 @@ def _calc_tfidf(df, term_colname, tf_family):
idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select(term).distinct() # terms are distinct
terms = idf
if min_df is not None:
terms = terms.filter(f.col('count')>=min_df)
if max_df is not None:
terms = terms.filter(f.col('count')<=max_df)
terms = terms.select(term).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
@@ -358,12 +354,12 @@ def _calc_tfidf(df, term_colname, tf_family):
df = df.join(subreddits,on='subreddit')
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=term) # subreddit-term-id is unique
df = df.join(terms,on=term,how='inner') # subreddit-term-id is unique
idf = idf.join(terms,on=term)
idf = idf.join(terms,on=term,how='inner')
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term])
df = df.join(idf, on=[term_id, term],how='inner')
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
@@ -374,19 +370,19 @@ def _calc_tfidf(df, term_colname, tf_family):
return df
def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05, min_df=None, max_df=None):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
df = _calc_tfidf(df, term_colname, tf_family)
df = _calc_tfidf(df, term_colname, tf_family, min_df, max_df)
df = df.repartition('subreddit')
dfwriter = df.write
return dfwriter
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
def select_topN_subreddits(topN, path="../../data/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
rankdf = pd.read_csv(path)
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
return included_subreddits

View File

@@ -2,9 +2,12 @@ import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
from functools import partial
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits):
spark = SparkSession.builder.getOrCreate()y
inpath = '/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet'
# include_terms is a path to a parquet file that contains a column of term_colname + '_id' to include.
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
@@ -15,50 +18,71 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
else:
include_subs = select_topN_subreddits(topN)
dfwriter = func(df, include_subs, term_colname)
include_subs = spark.sparkContext.broadcast(include_subs)
# term_id = term_colname + "_id"
if included_terms is not None:
terms_df = spark.read.parquet(included_terms)
terms_df = terms_df.select(term_colname).distinct()
df = df.join(terms_df, on=term_colname, how='left_semi')
dfwriter = func(df, include_subs.value, term_colname)
dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits, min_df, max_df):
tfidf_func = partial(tfidf_dataset, max_df=max_df, min_df=min_df)
return _tfidf_wrapper(tfidf_func, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, static_tfidf_path, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=static_tfidf_path)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=None,
included_subreddits=None):
included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath,
outpath,
topN,
'author',
['[deleted]','AutoModerator'],
included_subreddits=included_subreddits
included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
)
def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=None,
included_subreddits=None):
included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath,
outpath,
topN,
'term',
[],
included_subreddits=included_subreddits
included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
)
def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=None,
included_subreddits=None):
return tfidf_weekly(inpath,
outpath,
static_tfidf_path,
topN,
'author',
['[deleted]','AutoModerator'],
@@ -66,6 +90,7 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_
)
def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=None,
included_subreddits=None):
@@ -73,6 +98,7 @@ def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_te
return tfidf_weekly(inpath,
outpath,
static_tfidf_path,
topN,
'term',
[],

View File

@@ -1,23 +1,27 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from datetime import datetime
from pathlib import Path
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
submissions = spark.read.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet")
submissions = spark.read.parquet("../../data/reddit_submissions_by_subreddit.parquet")
submissions = submissions.filter(f.col("CreatedAt") <= datetime(2020,4,13))
prop_nsfw = submissions.select(['subreddit','over_18']).groupby('subreddit').agg(f.mean(f.col('over_18').astype('double')).alias('prop_nsfw'))
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
df = spark.read.parquet("../../data/reddit_comments_by_subreddit.parquet")
df = df.filter(f.col("CreatedAt") <= datetime(2020,4,13))
# remove /u/ pages
df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit')
#df = df.filter(df.prop_nsfw < 0.5)
df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +30,6 @@ df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False)
outpath = Path("../../data/reddit_similarity/subreddits_by_num_comments_nonsfw.csv")
outpath.parent.mkdir(exist_ok=True, parents=True)
df.to_csv(str(outpath), index=False)

View File

@@ -1,18 +0,0 @@
from similarities_helper import similarities
import numpy as np
import fire
def wang_similarity(mat):
non_zeros = (mat != 0).astype(np.float32)
intersection = non_zeros.T @ non_zeros
return intersection
infile="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet"; outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather"; min_df=1; included_subreddits=None; topN=10000; exclude_phrases=False; from_date=None; to_date=None
def wang_overlaps(infile, outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather", min_df=1, max_df=None, included_subreddits=None, topN=10000, exclude_phrases=False, from_date=None, to_date=None):
return similarities(infile=infile, simfunc=wang_similarity, term_colname='author', outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases, from_date=from_date, to_date=to_date)
if __name__ == "__main__":
fire.Fire(wang_overlaps)

View File

@@ -1,143 +0,0 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pyarrow.dataset as ds
import pandas as pd
import fire
from itertools import islice, chain
from pathlib import Path
from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities
from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count
from functools import partial
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
min_df=None
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None
topN=100
term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week,
rescale_idf=False)
tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
print('computing similarities')
sims = simfunc(mat)
del mat
sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, subreddit_names)
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs):
term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities):
print(outfile)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(tfidf_path)
# load subreddits + topN
subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id")
nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max
weeks = df.select(f.col("week")).distinct().toPandas().week.values
spark.stop()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms)
pool = Pool(cpu_count())
list(pool.imap(week_similarities_helper,weeks))
pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly
})

21
start_spark_and_run.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env bash
# Script to start a spark cluster and run a script on klone
source $SPARK_CONF_DIR/spark-env.sh
echo "#!/usr/bin/bash" > job_script.sh
echo "source ~/.bashrc" >> job_script.sh
echo "export PYSPARK_PYTHON=python3" >> job.script.sh
echo "export JAVA_HOME=/gscratch/comdata/local/open-jdk" >> job.script.sh
echo "export SPARK_CONF_DIR=/gscratch/comdata/local/spark_config" >> job.script.sh
echo "echo \$(hostname)" >> job_script.sh
echo "source $SPARK_CONF_DIR/spark-env.sh" >> job.script.sh
echo "start_spark_cluster.sh" >> job_script.sh
echo "spark-submit --verbose --master spark://\$(hostname):$SPARK_MASTER_PORT $2 ${@:3}" >> job_script.sh
echo "stop-all.sh" >> job_script.sh
#echo "singularity instance stop --all" >> job_script.sh
chmod +x job_script.sh
let "cpus = $1 * 40"
salloc -p compute-bigmem -A comdata --nodes=$1 --time=48:00:00 -c 40 --mem=362G --exclusive srun -n1 job_script.sh

26
start_spark_cluster.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/env bash
nodes="$(scontrol show hostnames)"
export SPARK_MASTER_HOST=$(hostname)
echo $SPARK_MASTER_HOST
# singularity instance stop spark-boss
# rm -r $HOME/.singularity/instances/sing/$(hostname)/nathante/spark-boss
# for node in $nodes
# dol
# echo $node
# ssh $node "singularity instance stop --all -F"
# done
# singularity instance start /gscratch/comdata/users/nathante/cdsc_base.sif spark-boss
#apptainer exec /gscratch/comdata/users/nathante/containers/nathante.sif
start-master.sh
for node in $nodes
do
# if [ "$node" != "$SPARK_BOSS" ]
# then
echo $node
ssh -t $node start_spark_worker.sh $SPARK_MASTER_HOST
# fi
done

18
start_spark_worker.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env bash
# runs on worker node
# instance_name=spark-worker-$(hostname)
# echo $hostname
# instance_url="instance://$instance_name"
# singularity instance list
# singularity instance stop -F "$instance_name"
# singularity instance list
# sleep 5
# ls $HOME/.singularity/instances/sing/$(hostname)/nathante/$instance_name
# rm -r $HOME/.singularity/instances/sing/$(hostname)/nathante/$instance_name
# singularity instance start /gscratch/comdata/users/nathante/cdsc_base.sif $instance_name
source /gscratch/comdata/env/cdsc_klone_bashrc
source $SPARK_CONF_DIR/spark-env.sh
echo $(which python3)
echo $PYSPARK_PYTHON
echo "start-worker.sh spark://$1:$SPARK_MASTER_PORT"
start-worker.sh spark://$1:$SPARK_MASTER_PORT

View File

@@ -1,2 +0,0 @@
from .choose_clusters import load_clusters, load_densities
from .cluster_timeseries import build_cluster_timeseries

View File

@@ -1,96 +0,0 @@
from pyarrow import dataset as ds
import numpy as np
import pandas as pd
import plotnine as pn
random = np.random.RandomState(1968)
def load_densities(term_density_file="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_density_file="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather"):
term_density = pd.read_feather(term_density_file)
author_density = pd.read_feather(author_density_file)
term_density.rename({'overlap_density':'term_density','index':'subreddit'},axis='columns',inplace=True)
author_density.rename({'overlap_density':'author_density','index':'subreddit'},axis='columns',inplace=True)
density = term_density.merge(author_density,on='subreddit',how='inner')
return density
def load_clusters(term_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather"):
term_clusters = pd.read_feather(term_clusters_file)
author_clusters = pd.read_feather(author_clusters_file)
# rename, join and return
term_clusters.rename({'cluster':'term_cluster'},axis='columns',inplace=True)
author_clusters.rename({'cluster':'author_cluster'},axis='columns',inplace=True)
clusters = term_clusters.merge(author_clusters,on='subreddit',how='inner')
return clusters
if __name__ == '__main__':
df = load_densities()
cl = load_clusters()
df['td_rank'] = df.term_density.rank()
df['ad_rank'] = df.author_density.rank()
df['td_percentile'] = df.td_rank / df.shape[0]
df['ad_percentile'] = df.ad_rank / df.shape[0]
df = df.merge(cl, on='subreddit',how='inner')
term_cluster_density = df.groupby('term_cluster').agg({'td_rank':['mean','min','max'],
'ad_rank':['mean','min','max'],
'td_percentile':['mean','min','max'],
'ad_percentile':['mean','min','max'],
'subreddit':['count']})
author_cluster_density = df.groupby('author_cluster').agg({'td_rank':['mean','min','max'],
'ad_rank':['mean','min','max'],
'td_percentile':['mean','min','max'],
'ad_percentile':['mean','min','max'],
'subreddit':['count']})
# which clusters have the most term_density?
term_cluster_density.iloc[term_cluster_density.td_rank['mean'].sort_values().index]
# which clusters have the most author_density?
term_cluster_density.iloc[term_cluster_density.ad_rank['mean'].sort_values(ascending=False).index].loc[term_cluster_density.subreddit['count'] >= 5][0:20]
high_density_term_clusters = term_cluster_density.loc[(term_cluster_density.td_percentile['mean'] > 0.75) & (term_cluster_density.subreddit['count'] > 5)]
# let's just use term density instead of author density for now. We can do a second batch with author density next.
chosen_clusters = high_density_term_clusters.sample(3,random_state=random)
cluster_info = df.loc[df.term_cluster.isin(chosen_clusters.index.values)]
chosen_subreddits = cluster_info.subreddit.values
dataset = ds.dataset("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",format='parquet')
comments = dataset.to_table(filter=ds.field("subreddit").isin(chosen_subreddits),columns=['id','subreddit','author','CreatedAt'])
comments = comments.to_pandas()
comments['week'] = comments.CreatedAt.dt.date - pd.to_timedelta(comments['CreatedAt'].dt.dayofweek, unit='d')
author_timeseries = comments.loc[:,['subreddit','author','week']].drop_duplicates().groupby(['subreddit','week']).count().reset_index()
for clid in chosen_clusters.index.values:
ts = pd.read_feather(f"data/ts_term_cluster_{clid}.feather")
pn.options.figure_size = (11.7,8.27)
p = pn.ggplot(ts)
p = p + pn.geom_line(pn.aes('week','value',group='subreddit'))
p = p + pn.facet_wrap('~ subreddit')
p.save(f"plots/ts_term_cluster_{clid}.png")
fig, ax = pyplot.subplots(figsize=(11.7,8.27))
g = sns.FacetGrid(ts,row='subreddit')
g.map_dataframe(sns.scatterplot,'week','value',data=ts,ax=ax)

View File

@@ -1,37 +0,0 @@
import pandas as pd
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from .choose_clusters import load_clusters, load_densities
import fire
from pathlib import Path
def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"):
clusters = load_clusters(term_clusters_path, author_clusters_path)
densities = load_densities(term_densities_path, author_densities_path)
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt")))
# time of unique authors by series by week
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit')
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":
fire.Fire(build_cluster_timeseries)

View File

@@ -1 +0,0 @@
/annex/objects/SHA256E-s60874--d536adb0ec637fca262c4e1ec908dd8b4a5d1464047b583cd1a99cc6dba87191

View File

@@ -1,11 +0,0 @@
all: subreddit_author_tf_similarities_10000.html #comment_authors_10000.html
# wang_tsne_10000.html
# wang_tsne_10000.html:/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather tsne_vis.py
# python3 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather --output=wang_tsne_10000.html
# comment_authors_10000.html:/gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather /gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather tsne_vis.py
# python3 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather --output=comment_authors_10000.html
subreddit_author_tf_similarities_10000.html:/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather /gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather tsne_vis.py
start_spark_and_run.sh 1 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather --output=subreddit_author_tf_similarities_10000.html

View File

@@ -1 +0,0 @@
../../.git/annex/objects/Qk/wG/SHA256E-s145210--14a2ad6660d1e4015437eff556ec349dd10a115a4f96594152a29e83d00aa784/SHA256E-s145210--14a2ad6660d1e4015437eff556ec349dd10a115a4f96594152a29e83d00aa784

View File

@@ -1 +0,0 @@
../../.git/annex/objects/w7/2f/SHA256E-s44458--f1c5247775ecf06514a0ff9e523e944bc8fcd9d0fdb6f214cc1329b759d4354e/SHA256E-s44458--f1c5247775ecf06514a0ff9e523e944bc8fcd9d0fdb6f214cc1329b759d4354e

View File

@@ -1 +0,0 @@
../../.git/annex/objects/WX/v3/SHA256E-s190874--c2aea719f989dde297ca5f13371e156693c574e44acd9a0e313e5e3a3ad4b543/SHA256E-s190874--c2aea719f989dde297ca5f13371e156693c574e44acd9a0e313e5e3a3ad4b543

View File

@@ -1 +0,0 @@
../../.git/annex/objects/mq/2z/SHA256E-s58834--2e7b3ee11f47011fd9b34bddf8f1e788d35ab9c9e0bb6a1301b0b916135400cf/SHA256E-s58834--2e7b3ee11f47011fd9b34bddf8f1e788d35ab9c9e0bb6a1301b0b916135400cf

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,187 +0,0 @@
import pyarrow
import altair as alt
alt.data_transformers.disable_max_rows()
alt.data_transformers.enable('default')
from sklearn.neighbors import NearestNeighbors
import pandas as pd
from numpy import random
import fire
import numpy as np
def base_plot(plot_data):
# base = base.encode(alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')))
cluster_dropdown = alt.binding_select(options=[str(c) for c in sorted(set(plot_data.cluster))])
# subreddit_dropdown = alt.binding_select(options=sorted(plot_data.subreddit))
cluster_click_select = alt.selection_single(on='click',fields=['cluster'], bind=cluster_dropdown, name=' ')
# cluster_select = alt.selection_single(fields=['cluster'], bind=cluster_dropdown, name='cluster')
# cluster_select_and = cluster_click_select & cluster_select
#
# subreddit_select = alt.selection_single(on='click',fields=['subreddit'],bind=subreddit_dropdown,name='subreddit_click')
base_scale = alt.Scale(scheme={"name":'category10',
"extent":[0,100],
"count":10})
color = alt.condition(cluster_click_select ,
alt.Color(field='color',type='nominal',scale=base_scale),
alt.value("lightgray"))
base = alt.Chart(plot_data).mark_text().encode(
alt.X('x',axis=alt.Axis(grid=False),scale=alt.Scale(domain=(-65,65))),
alt.Y('y',axis=alt.Axis(grid=False),scale=alt.Scale(domain=(-65,65))),
color=color,
text='subreddit')
base = base.add_selection(cluster_click_select)
return base
def zoom_plot(plot_data):
chart = base_plot(plot_data)
chart = chart.interactive()
chart = chart.properties(width=1275,height=800)
return chart
def viewport_plot(plot_data):
selector1 = alt.selection_interval(encodings=['x','y'],init={'x':(-65,65),'y':(-65,65)})
selectorx2 = alt.selection_interval(encodings=['x'],init={'x':(30,40)})
selectory2 = alt.selection_interval(encodings=['y'],init={'y':(-20,0)})
base = base_plot(plot_data)
viewport = base.mark_point(fillOpacity=0.2,opacity=0.2).encode(
alt.X('x',axis=alt.Axis(grid=False)),
alt.Y('y',axis=alt.Axis(grid=False)),
)
viewport = viewport.properties(width=600,height=400)
viewport1 = viewport.add_selection(selector1)
viewport2 = viewport.encode(
alt.X('x',axis=alt.Axis(grid=False),scale=alt.Scale(domain=selector1)),
alt.Y('y',axis=alt.Axis(grid=False),scale=alt.Scale(domain=selector1))
)
viewport2 = viewport2.add_selection(selectorx2)
viewport2 = viewport2.add_selection(selectory2)
sr = base.encode(alt.X('x',axis=alt.Axis(grid=False),scale=alt.Scale(domain=selectorx2)),
alt.Y('y',axis=alt.Axis(grid=False),scale=alt.Scale(domain=selectory2))
)
sr = sr.properties(width=1275,height=600)
chart = (viewport1 | viewport2) & sr
return chart
def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
isolate_color = 101
cluster_sizes = clusters.groupby('cluster').count()
singletons = set(cluster_sizes.loc[cluster_sizes.subreddit == 1].reset_index().cluster)
tsne_data = tsne_data.merge(clusters,on='subreddit')
centroids = tsne_data.groupby('cluster').agg({'x':np.mean,'y':np.mean})
color_ids = np.arange(n_colors)
distances = np.empty(shape=(centroids.shape[0],centroids.shape[0]))
groups = tsne_data.groupby('cluster')
points = np.array(tsne_data.loc[:,['x','y']])
centers = np.array(centroids.loc[:,['x','y']])
# point x centroid
point_center_distances = np.linalg.norm((points[:,None,:] - centers[None,:,:]),axis=-1)
# distances is cluster x point
for gid, group in groups:
c_dists = point_center_distances[group.index.values,:].min(axis=0)
distances[group.cluster.values[0],] = c_dists
# nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(centroids)
# distances, indices = nbrs.kneighbors()
nearest = distances.argpartition(n_neighbors,0)
indices = nearest[:n_neighbors,:].T
# neighbor_distances = np.copy(distances)
# neighbor_distances.sort(0)
# neighbor_distances = neighbor_distances[0:n_neighbors,:]
# nbrs = NearestNeighbors(n_neighbors=n_neighbors,metric='precomputed').fit(distances)
# distances, indices = nbrs.kneighbors()
color_assignments = np.repeat(-1,len(centroids))
for i in range(len(centroids)):
if (centroids.iloc[i].name == -1) or (i in singletons):
color_assignments[i] = isolate_color
else:
knn = indices[i]
knn_colors = color_assignments[knn]
available_colors = color_ids[list(set(color_ids) - set(knn_colors))]
if(len(available_colors) > 0):
color_assignments[i] = available_colors[0]
else:
raise Exception("Can't color this many neighbors with this many colors")
centroids = centroids.reset_index()
colors = centroids.loc[:,['cluster']]
colors['color'] = color_assignments
tsne_data = tsne_data.merge(colors,on='cluster')
return(tsne_data)
def build_visualization(tsne_data, clusters, output):
# tsne_data = "/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather"
# clusters = "/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather"
tsne_data = pd.read_feather(tsne_data)
tsne_data = tsne_data.rename(columns={'_subreddit':'subreddit'})
clusters = pd.read_feather(clusters)
tsne_data = assign_cluster_colors(tsne_data,clusters,10,8)
sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
tsne_data = tsne_data.merge(sr_per_cluster,on='cluster')
term_zoom_plot = zoom_plot(tsne_data)
term_zoom_plot.save(output)
term_viewport_plot = viewport_plot(tsne_data)
term_viewport_plot.save(output.replace(".html","_viewport.html"))
if __name__ == "__main__":
fire.Fire(build_visualization)
# commenter_data = pd.read_feather("tsne_author_fit.feather")
# clusters = pd.read_feather('author_3000_clusters.feather')
# commenter_data = assign_cluster_colors(commenter_data,clusters,10,8)
# commenter_zoom_plot = zoom_plot(commenter_data)
# commenter_viewport_plot = viewport_plot(commenter_data)
# commenter_zoom_plot.save("subreddit_commenters_tsne_3000.html")
# commenter_viewport_plot.save("subreddit_commenters_tsne_3000_viewport.html")
# chart = chart.properties(width=10000,height=10000)
# chart.save("test_tsne_whole.svg")