18
0

6 Commits

28 changed files with 789 additions and 310 deletions

View File

@@ -1,8 +1,11 @@
#srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity=srun -p compute-bigmem -A comdata --time=48:00:00 --mem=362G -c 40
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10]
hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
@@ -91,12 +94,28 @@ ${terms_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${terms_10k_inpu
${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv:umap_hdbscan_clustering_lsi.py
$(srun_singularity) python3 umap_hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/umap_hdbscan --savefile=${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv $(umap_hdbscan_selection_grid)
${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_umap_hdbscan_2.feather:${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
best_umap_hdbscan.feather:${authors_tf_10k_output_lsi}/best_umap_hdbscan_2.feather
# {'lsi_dimensions': 700, 'outpath': '/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/umap_hdbscan', 'silhouette_score': 0.27616957, 'name': 'mcs-2_ms-5_cse-0.05_csm-leaf_nn-15_lr-1.0_md-0.1_lc-1_lsi-700', 'n_clusters': 547, 'n_isolates': 2093, 'silhouette_samples': '/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/umap_hdbscan/silhouette_samples-mcs-2_ms-5_cse-0.05_csm-leaf_nn-15_lr-1.0_md-0.1_lc-1_lsi-700.feather', 'min_cluster_size': 2, 'min_samples': 5, 'cluster_selection_epsilon': 0.05, 'cluster_selection_method': 'leaf', 'n_neighbors': 15, 'learning_rate': 1.0, 'min_dist': 0.1, 'local_connectivity': 1, 'n_isolates_str': '2093', 'n_isolates_0': False}
best_umap_grid=--min_cluster_sizes=[2] --min_samples=[5] --cluster_selection_epsilons=[0.05] --cluster_selection_methods=[leaf] --n_neighbors=[15] --learning_rate=[1] --min_dist=[0.1] --local_connectivity=[1] --save_step1=True
umap_hdbscan_coords:
python3 umap_hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/umap_hdbscan --savefile=/dev/null ${best_umap_grid}
clean_affinity:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
@@ -159,7 +178,7 @@ clean_lsi_terms:
clean: clean_affinity clean_kmeans clean_hdbscan
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k best_umap_hdbscan.feather umap_hdbscan_coords
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS

View File

@@ -1,3 +1,4 @@
import pickle
from pathlib import Path
import numpy as np
import pandas as pd
@@ -24,6 +25,13 @@ class clustering_job:
self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True
self.cleanup()
def cleanup(self):
self.cluster_data = None
self.mat = None
self.clustering=None
self.subreddits=None
def get_info(self):
if not self.hasrun:
@@ -57,6 +65,7 @@ class clustering_job:
return score
def read_distance_mat(self, similarities, use_threads=True):
print(similarities)
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
@@ -95,6 +104,38 @@ class clustering_job:
return cluster_data
class twoway_clustering_job(clustering_job):
def __init__(self, infile, outpath, name, call1, call2, args1, args2):
self.outpath = Path(outpath)
self.call1 = call1
self.args1 = args1
self.call2 = call2
self.args2 = args2
self.infile = Path(infile)
self.name = name
self.hasrun = False
self.args = args1|args2
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.step1 = self.call1(self.mat, **self.args1)
self.clustering = self.call2(self.mat, self.step1, **self.args2)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.hasrun = True
self.after_run()
self.cleanup()
def after_run():
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
print(self.outpath/(self.name+".feather"))
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
def cleanup(self):
super().cleanup()
self.step1 = None
@dataclass
class clustering_result:
outpath:Path

View File

@@ -31,3 +31,19 @@ class grid_sweep:
outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv)
class twoway_grid_sweep(grid_sweep):
def __init__(self, jobtype, inpath, outpath, namer, args1, args2, *args, **kwargs):
self.jobtype = jobtype
self.namer = namer
prod1 = product(* args1.values())
prod2 = product(* args2.values())
grid1 = [dict(zip(args1.keys(), pargs)) for pargs in prod1]
grid2 = [dict(zip(args2.keys(), pargs)) for pargs in prod2]
grid = product(grid1, grid2)
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(**(g[0] | g[1])), g[0], g[1], *args) for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]

View File

@@ -1,5 +1,5 @@
from clustering_base import clustering_job, clustering_result
from grid_sweep import grid_sweep
from grid_sweep import grid_sweep, twoway_grid_sweep
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
@@ -27,3 +27,18 @@ class lsi_grid_sweep(grid_sweep):
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
class twoway_lsi_grid_sweep(twoway_grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, args1, args2):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*.feather"))
else:
lsi_paths = [inpath / (str(dim) + '.feather') for dim in lsi_dimensions]
lsi_nums = [int(p.stem) for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, args1, args2) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))

View File

@@ -0,0 +1,230 @@
from clustering_base import clustering_result, clustering_job, twoway_clustering_job
from hdbscan_clustering import hdbscan_clustering_result
import umap
from grid_sweep import twoway_grid_sweep
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap, chain
import pandas as pd
from multiprocessing import cpu_count
import fire
def test_select_hdbscan_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI"
outpath = "test_umap_hdbscan_lsi"
min_cluster_sizes=[2,3,4]
min_samples=[1,2,3]
cluster_selection_epsilons=[0,0.1,0.3,0.5]
cluster_selection_methods=[1]
lsi_dimensions='all'
n_neighbors = [5,10,15,25,35,70,100]
learning_rate = [0.1,0.5,1,2]
min_dist = [0.5,1,1.5,2]
local_connectivity = [1,2,3,4,5]
hdbscan_params = {"min_cluster_sizes":min_cluster_sizes, "min_samples":min_samples, "cluster_selection_epsilons":cluster_selection_epsilons, "cluster_selection_methods":cluster_selection_methods}
umap_params = {"n_neighbors":n_neighbors, "learning_rate":learning_rate, "min_dist":min_dist, "local_connectivity":local_connectivity}
gs = umap_hdbscan_grid_sweep(inpath, "all", outpath, hdbscan_params,umap_params)
# gs.run(20)
# gs.save("test_hdbscan/lsi_sweep.csv")
# job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
# job1.run()
# print(job1.get_info())
# df = pd.read_csv("test_hdbscan/selection_data.csv")
# test_select_hdbscan_clustering()
# check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
# silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
# c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
class umap_hdbscan_grid_sweep(twoway_grid_sweep):
def __init__(self,
inpath,
outpath,
umap_params,
hdbscan_params):
super().__init__(umap_hdbscan_job, inpath, outpath, self.namer, umap_params, hdbscan_params)
def namer(self,
min_cluster_size,
min_samples,
cluster_selection_epsilon,
cluster_selection_method,
n_components,
n_neighbors,
learning_rate,
min_dist,
local_connectivity,
densmap
):
return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}_nc-{n_components}_nn-{n_neighbors}_lr-{learning_rate}_md-{min_dist}_lc-{local_connectivity}_dm-{densmap}"
@dataclass
class umap_hdbscan_clustering_result(hdbscan_clustering_result):
n_components:int
n_neighbors:int
learning_rate:float
min_dist:float
local_connectivity:int
densmap:bool
class umap_hdbscan_job(twoway_clustering_job):
def __init__(self, infile, outpath, name,
umap_args = {"n_components":2,"n_neighbors":15, "learning_rate":1, "min_dist":1, "local_connectivity":1,'densmap':False},
hdbscan_args = {"min_cluster_size":2, "min_samples":1, "cluster_selection_epsilon":0, "cluster_selection_method":'eom'},
*args,
**kwargs):
super().__init__(infile,
outpath,
name,
call1=umap_hdbscan_job._umap_embedding,
call2=umap_hdbscan_job._hdbscan_clustering,
args1=umap_args,
args2=hdbscan_args,
*args,
**kwargs
)
self.n_components = umap_args['n_components']
self.n_neighbors = umap_args['n_neighbors']
self.learning_rate = umap_args['learning_rate']
self.min_dist = umap_args['min_dist']
self.local_connectivity = umap_args['local_connectivity']
self.densmap = umap_args['densmap']
self.min_cluster_size = hdbscan_args['min_cluster_size']
self.min_samples = hdbscan_args['min_samples']
self.cluster_selection_epsilon = hdbscan_args['cluster_selection_epsilon']
self.cluster_selection_method = hdbscan_args['cluster_selection_method']
def after_run(self):
coords = self.step1.emedding_
self.cluster_data['x'] = coords[:,0]
self.cluster_data['y'] = coords[:,1]
super().after_run()
def _umap_embedding(mat, **umap_args):
print(f"running umap embedding. umap_args:{umap_args}")
umapmodel = umap.UMAP(metric='precomputed', **umap_args)
umapmodel = umapmodel.fit(mat)
return umapmodel
def _hdbscan_clustering(mat, umapmodel, **hdbscan_args):
print(f"running hdbascan clustering. hdbscan_args:{hdbscan_args}")
umap_coords = umapmodel.transform(mat)
clusterer = hdbscan.HDBSCAN(metric='euclidean',
core_dist_n_jobs=cpu_count(),
**hdbscan_args
)
clustering = clusterer.fit(umap_coords)
return(clustering)
def get_info(self):
result = super().get_info()
self.result = umap_hdbscan_clustering_result(**result.__dict__,
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
cluster_selection_epsilon=self.cluster_selection_epsilon,
cluster_selection_method=self.cluster_selection_method,
n_components = self.n_components,
n_neighbors = self.n_neighbors,
learning_rate = self.learning_rate,
min_dist = self.min_dist,
local_connectivity=self.local_connectivity,
densmap=self.densmap
)
return self.result
def run_umap_hdbscan_grid_sweep(savefile, inpath, outpath, n_neighbors = [15], n_components=[2], learning_rate=[1], min_dist=[1], local_connectivity=[1],
densmap=[False],
min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom']):
"""Run umap + hdbscan clustering once or more with different parameters.
Usage:
umap_hdbscan_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --n_neighbors=<csv> --learning_rate=<csv> --min_dist=<csv> --local_connectivity=<csv> --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=<csv "eom"|"leaf">
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
n_neighbors: umap parameter takes integers greater than 1
learning_rate: umap parameter takes positive real values
min_dist: umap parameter takes positive real values
local_connectivity: umap parameter takes positive integers
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilon: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_method: "eom" or "leaf" eom gives larger clusters.
"""
umap_args = {'n_neighbors':list(map(int, n_neighbors)),
'learning_rate':list(map(float,learning_rate)),
'min_dist':list(map(float,min_dist)),
'local_connectivity':list(map(int,local_connectivity)),
'n_components':list(map(int, n_components)),
'densmap':list(map(bool,densmap))
}
hdbscan_args = {'min_cluster_size':list(map(int,min_cluster_sizes)),
'min_samples':list(map(int,min_samples)),
'cluster_selection_epsilon':list(map(float,cluster_selection_epsilons)),
'cluster_selection_method':cluster_selection_methods}
obj = umap_hdbscan_grid_sweep(inpath,
outpath,
umap_args,
hdbscan_args)
obj.run(cores=10)
obj.save(savefile)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
distances, indices = nbrs.kneighbors(mat)
d2 = distances[:,-1]
df = pd.DataFrame({'dist':d2})
df = df.sort_values("dist",ascending=False)
df['idx'] = np.arange(0,d2.shape[0]) + 1
p = pn.qplot(x='idx',y='dist',data=df,geom='line') + pn.scales.scale_y_continuous(minor_breaks = np.arange(0,50)/50,
breaks = np.arange(0,10)/10)
p.save(outname,width=16,height=10)
def make_KNN_plots():
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='terms_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
fire.Fire(run_umap_hdbscan_grid_sweep)
# test_select_hdbscan_clustering()
#fire.Fire(select_hdbscan_clustering)

View File

@@ -0,0 +1,113 @@
from umap_hdbscan_clustering import umap_hdbscan_job, umap_hdbscan_grid_sweep, umap_hdbscan_clustering_result
from lsi_base import twoway_lsi_grid_sweep, lsi_mixin, lsi_result_mixin
from grid_sweep import twoway_grid_sweep
import fire
from dataclasses import dataclass
@dataclass
class umap_hdbscan_clustering_result_lsi(umap_hdbscan_clustering_result, lsi_result_mixin):
pass
class umap_hdbscan_lsi_job(umap_hdbscan_job, lsi_mixin):
def __init__(self, infile, outpath, name, umap_args, hdbscan_args, lsi_dims):
super().__init__(
infile,
outpath,
name,
umap_args,
hdbscan_args
)
super().set_lsi_dims(lsi_dims)
def get_info(self):
partial_result = super().get_info()
self.result = umap_hdbscan_clustering_result_lsi(**partial_result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class umap_hdbscan_lsi_grid_sweep(twoway_lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
umap_args,
hdbscan_args
):
super().__init__(umap_hdbscan_lsi_job,
_umap_hdbscan_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
umap_args,
hdbscan_args
)
class _umap_hdbscan_lsi_grid_sweep(twoway_grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
umap_args,
hdbscan_args,
):
self.lsi_dim = lsi_dim
self.jobtype = umap_hdbscan_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, umap_args, hdbscan_args, lsi_dim)
def namer(self, *args, **kwargs):
s = umap_hdbscan_grid_sweep.namer(self, *args, **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_umap_hdbscan_lsi_grid_sweep(savefile, inpath, outpath, n_neighbors = [15], n_components=[2], learning_rate=[1], min_dist=[1], local_connectivity=[1],
densmap=[False],
min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom'], lsi_dimensions='all'):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering_lsi --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=[eom]> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilons: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_methods: one or more of "eom" or "leaf" eom gives larger clusters.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
umap_args = {'n_neighbors':list(map(int, n_neighbors)),
'learning_rate':list(map(float,learning_rate)),
'min_dist':list(map(float,min_dist)),
'local_connectivity':list(map(int,local_connectivity)),
'n_components':list(map(int, n_components)),
'densmap':list(map(bool,densmap))
}
hdbscan_args = {'min_cluster_size':list(map(int,min_cluster_sizes)),
'min_samples':list(map(int,min_samples)),
'cluster_selection_epsilon':list(map(float,cluster_selection_epsilons)),
'cluster_selection_method':cluster_selection_methods}
obj = umap_hdbscan_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
umap_args,
hdbscan_args
)
obj.run(10)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_umap_hdbscan_lsi_grid_sweep)

View File

@@ -1,26 +0,0 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=tf_subreddit_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
source ./bin/activate
module load parallel_sql
echo $(which perl)
conda list pyarrow
which python3
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env bash
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh
#echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py

View File

@@ -1,12 +1,15 @@
#!/usr/bin/env python3
import os
import json
from datetime import datetime
from multiprocessing import Pool
from itertools import islice
from helper import find_dumps, open_fileset
from helper import open_input_file, find_dumps
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import fire
def parse_comment(comment, names= None):
if names is None:
@@ -46,17 +49,12 @@ def parse_comment(comment, names= None):
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
def parse_dump(partition):
files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
pool = Pool(28)
stream = open_fileset(files)
N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
stream = open_input_file(dumpdir)
rows = map(parse_comment, stream)
schema = pa.schema([
pa.field('id', pa.string(), nullable=True),
@@ -78,33 +76,16 @@ schema = pa.schema([
pa.field('error', pa.string(), nullable=True),
])
from pathlib import Path
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
p.mkdir(exist_ok=True,parents=True)
if not p.is_dir():
if p.exists():
p.unlink()
p.mkdir()
else:
list(map(Path.unlink,p.glob('*')))
part_size = int(1e7)
part = 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
N=10000
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
schema=schema,
compression='snappy',
flavor='spark') as writer:
while True:
if n_output > part_size:
if part > 1:
writer.close()
part = part + 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
n_output += N
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
@@ -112,4 +93,19 @@ while True:
break
writer.write_table(table)
writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
with open("comments_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
if __name__ == '__main__':
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@@ -2,12 +2,19 @@
# spark script to make sorted, and partitioned parquet files
import pyspark
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
conf = conf.set("spark.sql.shuffle.partitions",2000)
conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200)
sc = spark.sparkContext
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
@@ -21,9 +28,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')

View File

@@ -24,8 +24,7 @@ def open_fileset(files):
for fh in files:
print(fh)
lines = open_input_file(fh)
for line in lines:
yield line
yield from lines
def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename):
@@ -39,7 +38,7 @@ def open_input_file(input_filename):
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename]
cmd = ['/kloneusr/bin/zstd','-dck', input_filename, '--memory=2048MB --stdout']
elif re.match(r'.*\.gz',input_filename):
cmd = ['gzip','-dc', input_filename]
try:

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
stop-all.sh
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh

4
datasets/submissions_2_parquet.sh Normal file → Executable file
View File

@@ -1,8 +1,8 @@
#!/usr/bin/env bash
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash
./parse_submissions.sh
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@@ -3,26 +3,23 @@
# two stages:
# 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import simdjson
import fire
import os
parser = simdjson.Parser()
import json
def parse_submission(post, names = None):
if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try:
post = parser.parse(post)
post = json.loads(post)
except (ValueError) as e:
# print(e)
# print(post)
@@ -92,8 +89,7 @@ def parse_dump(partition):
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
@@ -108,7 +104,7 @@ def parse_dump(partition):
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("parse_submissions_task_list",'w') as of:
with open("submissions_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')

View File

@@ -4,9 +4,9 @@ from pathlib import Path
import fire
import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf
# sys.path.append("..")
# sys.path.append("../similarities")
# from similarities.similarities_helper import pull_tfidf
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member

View File

@@ -8,7 +8,7 @@ import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
#shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
shasums = shasums1
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
for l in shasums.strip().split('\n'):

View File

@@ -1,8 +1,6 @@
#!/usr/bin/env bash
module load parallel_sql
source ./bin/activate
python3 tf_comments.py gen_task_list
psu --del --Y
cat tf_task_list | psu --load
for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;

View File

@@ -2,12 +2,17 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import fire
def main(inparquet, outparquet, colname):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/")
df = spark.read.parquet(inparquet)
df = df.repartition(2000,'term')
df = df.sort(['term','week','subreddit'])
df = df.sortWithinPartitions(['term','week','subreddit'])
df = df.repartition(2000,colname)
df = df.sort([colname,'week','subreddit'])
df = df.sortWithinPartitions([colname,'week','subreddit'])
df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy')
df.write.parquet(outparquet,mode='overwrite',compression='snappy')
if __name__ == '__main__':
fire.Fire(main)

View File

@@ -14,21 +14,29 @@ from nltk.util import ngrams
import string
from random import random
from redditcleaner import clean
from pathlib import Path
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/")
dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
outputdir = Path(outputdir)
samppath = outputdir / "reddit_comment_ngrams_10p_sample"
if not samppath.exists():
samppath.mkdir(parents=True, exist_ok=True)
ngram_output = partition.replace("parquet","txt")
if excluded_users is not None:
excluded_users = set(map(str.strip,open(excluded_users)))
df = df.filter(~ (f.col("author").isin(excluded_users)))
ngram_path = samppath / ngram_output
if mwe_pass == 'first':
if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
if ngram_path.exists():
ngram_path.unlink()
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@@ -62,8 +70,10 @@ def weekly_tf(partition, mwe_pass = 'first'):
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
mwe_path = outputdir / "multiword_expressions.feather"
if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
mwe_dataset = pd.read_feather(mwe_path)
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -115,7 +125,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
with open(ngram_path,'a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
@@ -150,7 +160,14 @@ def weekly_tf(partition, mwe_pass = 'first'):
outchunksize = 10000
with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
termtf_outputdir = (outputdir / "comment_terms")
termtf_outputdir.mkdir(parents=True, exist_ok=True)
authortf_outputdir = (outputdir / "comment_authors")
authortf_outputdir.mkdir(parents=True, exist_ok=True)
termtf_path = termtf_outputdir / partition
authortf_path = authortf_outputdir / partition
with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \
pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer:
while True:
@@ -179,12 +196,12 @@ def weekly_tf(partition, mwe_pass = 'first'):
author_writer.close()
def gen_task_list(mwe_pass='first'):
def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
with open("tf_task_list",'w') as outfile:
with open(tf_task_list,'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,

27
ngrams/top_comment_phrases.py Normal file → Executable file
View File

@@ -1,10 +1,17 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import SparkSession
import numpy as np
import fire
from pathlib import Path
def main(ngram_dir="/gscratch/comdata/output/reddit_ngrams"):
spark = SparkSession.builder.getOrCreate()
df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
ngram_dir = Path(ngram_dir)
ngram_sample = ngram_dir / "reddit_comment_ngrams_10p_sample"
df = spark.read.text(str(ngram_sample))
df = df.withColumnRenamed("value","phrase")
@@ -13,7 +20,6 @@ phrases = df.groupby('phrase').count()
phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
# count overall
N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
@@ -41,18 +47,23 @@ df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
df = df.sort(['phrasePWMI'],descending=True)
df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/")
pwmi_dir = ngram_dir / "reddit_comment_ngrams_pwmi.parquet/"
df.write.parquet(str(pwmi_dir), mode='overwrite', compression='snappy')
df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none')
df = spark.read.parquet(str(pwmi_dir))
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet")
df.write.csv(str(ngram_dir / "reddit_comment_ngrams_pwmi.csv/"),mode='overwrite',compression='none')
df = spark.read.parquet(str(pwmi_dir))
df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
#
df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas()
df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather")
df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")
df.to_feather(ngram_dir / "multiword_expressions.feather")
df.to_csv(ngram_dir / "multiword_expressions.csv")
if __name__ == '__main__':
fire.Fire(main)

View File

@@ -1,8 +1,10 @@
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output
similarity_data=${base_data}/reddit_similarity
# srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
# srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
srun=srun -p compute-bigmem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40
srun_huge=srun -p compute-hugemem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40
similarity_data=/gscratch/scrubbed/comdata/reddit_similarity
tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly
similarity_weekly_data=${similarity_data}/weekly
@@ -10,7 +12,10 @@ lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
all: ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather
#all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
#${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
@@ -18,103 +23,106 @@ all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.pa
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200
${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 --inpath=$<
${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000
${srun_huge} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000
${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000
${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 --inpath=$<
${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${srun} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather
${similarity_data}/subreddits_by_num_comments_nonsfw.csv:
start_spark_and_run.sh 3 top_subreddits_by_comments.py
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py terms --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_terms_100k.parquet
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py terms --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py terms --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py authors --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_authors_100k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py authors --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
# mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py authors --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 3 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=100000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${srun} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_terms_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
,${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv
# start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
# /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 tfidf.py authors --topN=100000 --inpath=/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet --outpath=/gscratch/scrubbed/comdata/reddit_similarity/tfidf/comment_authors_100k.parquet
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh

View File

@@ -5,19 +5,20 @@ from similarities_helper import *
#from similarities_helper import similarities, lsi_column_similarities
from functools import partial
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/"
term_colname='term'
outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI'
n_components=[10,50,100]
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
n_iter=5
random_state=1968
algorithm='arpack'
topN = None
from_date=None
to_date=None
min_df=None
max_df=None
# inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
# term_colname='authors'
# outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_test_compex_LSI'
# n_components=[10,50,100]
# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
# n_iter=5
# random_state=1968
# algorithm='randomized'
# topN = None
# from_date=None
# to_date=None
# min_df=None
# max_df=None
def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
print(n_components,flush=True)
@@ -62,7 +63,7 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t
n_components=n_components
)
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968):
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
'author',
outfile,

View File

@@ -262,6 +262,7 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196
lsimat = mod.transform(tfidfmat.T)
if lsi_model_save is not None:
Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
pickle.dump(mod, open(lsi_model_save,'wb'))
sims_list = []

View File

@@ -2,9 +2,12 @@ import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
from functools import partial
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits):
spark = SparkSession.builder.getOrCreate()y
inpath = '/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet'
# include_terms is a path to a parquet file that contains a column of term_colname + '_id' to include.
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
@@ -15,50 +18,71 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
else:
include_subs = select_topN_subreddits(topN)
dfwriter = func(df, include_subs, term_colname)
include_subs = spark.sparkContext.broadcast(include_subs)
# term_id = term_colname + "_id"
if included_terms is not None:
terms_df = spark.read.parquet(included_terms)
terms_df = terms_df.select(term_colname).distinct()
df = df.join(terms_df, on=term_colname, how='left_semi')
dfwriter = func(df, include_subs.value, term_colname)
dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits, min_df, max_df):
tfidf_func = partial(tfidf_dataset, max_df=max_df, min_df=min_df)
return _tfidf_wrapper(tfidf_func, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, static_tfidf_path, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=static_tfidf_path)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=None,
included_subreddits=None):
included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath,
outpath,
topN,
'author',
['[deleted]','AutoModerator'],
included_subreddits=included_subreddits
included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
)
def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=None,
included_subreddits=None):
included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath,
outpath,
topN,
'term',
[],
included_subreddits=included_subreddits
included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
)
def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=None,
included_subreddits=None):
return tfidf_weekly(inpath,
outpath,
static_tfidf_path,
topN,
'author',
['[deleted]','AutoModerator'],
@@ -66,6 +90,7 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_
)
def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=None,
included_subreddits=None):
@@ -73,6 +98,7 @@ def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_te
return tfidf_weekly(inpath,
outpath,
static_tfidf_path,
topN,
'term',
[],

View File

@@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit')
#df = df.filter(df.prop_nsfw < 0.5)
df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +26,4 @@ df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False)
df.to_csv('/gscratch/scrubbed/comdata/reddit_similarity/subreddits_by_num_comments_nonsfw.csv', index=False)

View File

@@ -13,18 +13,23 @@ from similarities_helper import pull_tfidf, column_similarities, write_weekly_si
from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count
from functools import partial
import pickle
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
min_df=None
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None
topN=100
term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None
# tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors_tfidf.parquet"
# #tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data//comment_authors_compex.parquet"
# min_df=2
# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
# max_df = None
# topN=100
# term_colname='author'
# # outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# # included_subreddits=None
outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors.parquet"; infile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_authors_tfidf.parquet"; included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"; lsi_model="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/2000_authors_LSIMOD.pkl"; n_components=1500; algorithm="randomized"; term_colname='author'; tfidf_path=infile; random_state=1968;
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms):
# static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
# dftest = spark.read.parquet(static_tfidf)
def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
@@ -32,20 +37,19 @@ def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df,
entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week,
week=week.isoformat(),
rescale_idf=False)
tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
print('computing similarities')
print(simfunc)
sims = simfunc(mat)
del mat
sims = next(sims)[0]
sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
@@ -56,18 +60,20 @@ def pull_weeks(batch):
return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs):
def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kwargs):
print(args)
print(kwargs)
term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
# lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/1000_author_LSIMOD.pkl"
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model)
lsi_model = pickle.load(open(lsi_model,'rb'))
#simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=random_state,algorithm='randomized',lsi_model=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=kwargs.get('random_state'),lsi_model=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities):
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=None,max_df=None):
print(outfile)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
@@ -84,12 +90,14 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None,
spark.stop()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms)
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms)
pool = Pool(cpu_count())
for week in weeks:
week_similarities_helper(week)
# pool = Pool(cpu_count())
list(pool.imap(week_similarities_helper,weeks))
pool.close()
# list(pool.imap(week_similarities_helper, weeks))
# pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
@@ -97,10 +105,11 @@ def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/
return cosine_similarities_weekly(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN)
topN,
min_df=2
)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile,
@@ -112,32 +121,29 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re
topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None):
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN,
included_subreddits=included_subreddits,
n_components=n_components,
lsi_model=lsi_model)
lsi_model=lsi_model
)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None):
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
included_subreddits=included_subreddits,
n_components=n_components,
lsi_model=lsi_model)
lsi_model=lsi_model,
)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly
'terms-lsi':term_cosine_similarities_weekly_lsi
})

View File

@@ -12,10 +12,6 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"):
clusters = load_clusters(term_clusters_path, author_clusters_path)
densities = load_densities(term_densities_path, author_densities_path)
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
@@ -26,11 +22,15 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit')
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
if term_densities_path is not None and author_densities_path is not None:
densities = load_densities(term_densities_path, author_densities_path)
spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner')
clusters = load_clusters(term_clusters_path, author_clusters_path)
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":