18
0

21 Commits

Author SHA1 Message Date
930ee47d2b refactor similarities to use submodule. 2022-01-19 15:05:49 -08:00
98c1317af5 update pushshift dumps. 2021-12-10 21:23:32 -08:00
541e125b28 lsi support for weekly similarities 2021-08-11 22:48:33 -07:00
b7c39a3494 Merge branch 'master' of code:cdsc_reddit into excise_reindex 2021-08-03 15:13:39 -07:00
ce549c6c97 Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex 2021-08-03 15:13:21 -07:00
6e43294a41 Updates to similarities code for smap project. 2021-08-03 15:06:48 -07:00
2d21ff1137 Merge branch 'master' of code:cdsc_reddit into excise_reindex 2021-08-03 15:02:08 -07:00
Nate E TeBlunthuis
cf86c7492c update clustering scripts 2021-08-03 14:55:02 -07:00
Nate E TeBlunthuis
87ffaa6858 script for picking the best clustering given constraints 2021-05-14 19:10:36 -07:00
Nate E TeBlunthuis
7b14db67de Merge branch 'excise_reindex' of code:cdsc_reddit into excise_reindex 2021-05-13 22:28:31 -07:00
Nate E TeBlunthuis
0b95bea30e support isolates in visualization 2021-05-13 22:26:58 -07:00
Nate E TeBlunthuis
582cf263ea bug fix in affinity clustering 2021-05-13 22:26:15 -07:00
Nate E TeBlunthuis
8a2248fae1 Merge remote-tracking branch 'origin/excise_reindex' into temp 2021-05-10 18:32:03 -07:00
Nate E TeBlunthuis
47ba04aa97 add script for pulling cluster timeseries 2021-05-10 18:24:22 -07:00
Nate E TeBlunthuis
4cb7eeec80 Refactor to make a decent api. 2021-05-10 13:46:49 -07:00
Nate E TeBlunthuis
f05cb962e0 refactor clustring in object oriented style 2021-05-07 22:33:26 -07:00
Nate E TeBlunthuis
8d1df5b26e refactor clustering.py into method-specific files. 2021-05-03 11:28:48 -07:00
Nate E TeBlunthuis
e1c9d9af6f Remove 'exclude phrases' parameter. 2021-05-03 10:37:09 -07:00
Nate E TeBlunthuis
7df8436067 Use Latent semantic indexing and hdbscan 2021-05-02 23:39:55 -07:00
Nate E TeBlunthuis
36b24ee933 reindex tfidf in memory instead of using spark 2021-04-30 12:48:19 -07:00
Nate E TeBlunthuis
a013f6718b export timeseries functions 2021-03-24 17:18:30 -07:00
36 changed files with 1743 additions and 488 deletions

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "cdsc_ecology_utils"]
path = cdsc_ecology_utils
url = code:cdsc_ecology_utils

2
__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from timeseries import load_clusters, load_densities, build_cluster_timeseries
from cdsc_ecology_utils import similarity_functions

1
cdsc_ecology_utils Submodule

Submodule cdsc_ecology_utils added at 5b5fdbb3c0

View File

@@ -2,20 +2,164 @@
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
selection_grid="--max_iter=3000 --convergence_iter=15,30,100 --damping=0.5,0.6,0.7,0.8,0.85,0.9,0.95,0.97,0.99, --preference_quantile=0.1,0.3,0.5,0.7,0.9"
#selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
all:$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k $(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(selection_grid) -J 20
authors_10k_input=$(similarity_data)/subreddit_comment_authors_10k.feather
authors_10k_input_lsi=$(similarity_data)/subreddit_comment_authors_10k_LSI
authors_10k_output=$(clustering_data)/subreddit_comment_authors_10k
authors_10k_output_lsi=$(clustering_data)/subreddit_comment_authors_10k_LSI
$(clustering_data)/subreddit_comment_terms_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv $(selection_grid) -J 20
authors_tf_10k_input=$(similarity_data)/subreddit_comment_authors-tf_10k.feather
authors_tf_10k_input_lsi=$(similarity_data)/subreddit_comment_authors-tf_10k_LSI
authors_tf_10k_output=$(clustering_data)/subreddit_comment_authors-tf_10k
authors_tf_10k_output_lsi=$(clustering_data)/subreddit_comment_authors-tf_10k_LSI
$(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(selection_grid) -J 20
terms_10k_input=$(similarity_data)/subreddit_comment_terms_10k.feather
terms_10k_input_lsi=$(similarity_data)/subreddit_comment_terms_10k_LSI
terms_10k_output=$(clustering_data)/subreddit_comment_terms_10k
terms_10k_output_lsi=$(clustering_data)/subreddit_comment_terms_10k_LSI
all:terms_10k authors_10k authors_tf_10k terms_10k_lsi authors_10k_lsi authors_tf_10k_lsi
terms_10k:${terms_10k_output}/kmeans/selection_data.csv ${terms_10k_output}/affinity/selection_data.csv ${terms_10k_output}/hdbscan/selection_data.csv
authors_10k:${authors_10k_output}/kmeans/selection_data.csv ${authors_10k_output}/hdbscan/selection_data.csv ${authors_10k_output}/affinity/selection_data.csv
authors_tf_10k:${authors_tf_10k_output}/kmeans/selection_data.csv ${authors_tf_10k_output}/hdbscan/selection_data.csv ${authors_tf_10k_output}/affinity/selection_data.csv
terms_10k_lsi:${terms_10k_output_lsi}/kmeans/selection_data.csv ${terms_10k_output_lsi}/affinity/selection_data.csv ${terms_10k_output_lsi}/hdbscan/selection_data.csv
authors_10k_lsi:${authors_10k_output_lsi}/kmeans/selection_data.csv ${authors_10k_output_lsi}/hdbscan/selection_data.csv ${authors_10k_output_lsi}/affinity/selection_data.csv
authors_tf_10k_lsi:${authors_tf_10k_output_lsi}/kmeans/selection_data.csv ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
${authors_10k_output}/kmeans/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/kmeans --savefile=${authors_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${terms_10k_output}/kmeans/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/kmeans --savefile=${terms_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_tf_10k_output}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/kmeans --savefile=${authors_tf_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_10k_output}/affinity/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/affinity --savefile=${authors_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${terms_10k_output}/affinity/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/affinity --savefile=${terms_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_tf_10k_output}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/affinity --savefile=${authors_tf_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output}/hdbscan/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/hdbscan --savefile=${authors_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output}/hdbscan/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/hdbscan --savefile=${terms_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/hdbscan --savefile=${authors_tf_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
## LSI Models
${authors_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/kmeans --savefile=${authors_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${terms_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/kmeans --savefile=${terms_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_tf_10k_output_lsi}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/kmeans --savefile=${authors_tf_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_10k_output_lsi}/affinity/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/affinity --savefile=${authors_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${terms_10k_output_lsi}/affinity/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/affinity --savefile=${terms_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_tf_10k_output_lsi}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/affinity --savefile=${authors_tf_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/hdbscan --savefile=${authors_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/hdbscan --savefile=${terms_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
clean_affinity:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/affinity/selection_data.csv
clean_kmeans:
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
clean_hdbscan:
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_authors:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
clean_authors_tf:
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
clean_terms:
rm -f ${terms_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_lsi_affinity:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
clean_lsi_kmeans:
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
clean_lsi_hdbscan:
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors_tf:
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_terms:
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean: clean_affinity clean_kmeans clean_hdbscan
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS

View File

@@ -0,0 +1,129 @@
from sklearn.cluster import AffinityPropagation
from dataclasses import dataclass
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
from pathlib import Path
from itertools import product, starmap
import fire
import sys
import numpy as np
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
class affinity_clustering_result(clustering_result):
damping:float
convergence_iter:int
preference_quantile:float
preference:float
max_iter:int
class affinity_job(clustering_job):
def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=self._affinity_clustering,
preference_quantile=preference_quantile,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
random_state=1968,
verbose=verbose)
self.damping=damping
self.max_iter=max_iter
self.convergence_iter=convergence_iter
self.preference_quantile=preference_quantile
def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
mat = 1-mat
preference = np.quantile(mat, preference_quantile)
self.preference = preference
print(f"preference is {preference}")
print("data loaded")
sys.stdout.flush()
clustering = AffinityPropagation(*args,
preference=preference,
affinity='precomputed',
copy=False,
**kwargs).fit(mat)
return clustering
def get_info(self):
result = super().get_info()
self.result=affinity_clustering_result(**result.__dict__,
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
preference_quantile=self.preference_quantile,
preference=self.preference)
return self.result
class affinity_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(affinity_job,
_afffinity_grid_sweep,
inpath,
outpath,
self.namer,
*args,
**kwargs)
def namer(self,
damping,
max_iter,
convergence_iter,
preference_quantile):
return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
def run_affinity_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5],n_cores=10):
"""Run affinity clustering once or more with different parameters.
Usage:
affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering.
preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
convergence_iters:one or more integers of number of iterations without improvement before stopping.
max_iters: one or more numbers of different maximum interations.
"""
obj = affinity_grid_sweep(inpath,
outpath,
map(float,dampings),
map(int,max_iters),
map(int,convergence_iters),
map(float,preference_quantiles))
obj.run(n_cores)
obj.save(savefile)
def test_select_affinity_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_affinity";
dampings=[0.8,0.9]
max_iters=[100000]
convergence_iters=[15]
preference_quantiles=[0.5,0.7]
gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
gs.run(20)
gs.save("test_affinity/lsi_sweep.csv")
if __name__ == "__main__":
fire.Fire(run_affinity_grid_sweep)

View File

@@ -0,0 +1,99 @@
import fire
from affinity_clustering import affinity_clustering_result, affinity_job, affinity_grid_sweep
from grid_sweep import grid_sweep
from lsi_base import lsi_result_mixin, lsi_grid_sweep, lsi_mixin
from dataclasses import dataclass
@dataclass
class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
pass
class affinity_lsi_job(affinity_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
result = super().get_info()
self.result = affinity_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class affinity_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
dampings=[0.9],
max_iters=[10000],
convergence_iters=[30],
preference_quantiles=[0.5]):
super().__init__(affinity_lsi_job,
_affinity_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
dampings,
max_iters,
convergence_iters,
preference_quantiles)
class _affinity_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
self.lsi_dim = lsi_dim
self.jobtype = affinity_lsi_job
super().__init__(self.jobtype,
inpath,
outpath,
self.namer,
[self.lsi_dim],
*args,
**kwargs)
def namer(self, *args, **kwargs):
s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_affinity_lsi_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5], lsi_dimensions='all',n_cores=30):
"""Run affinity clustering once or more with different parameters.
Usage:
affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit kmeans clusterings.
dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering.
preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
convergence_iters:one or more integers of number of iterations without improvement before stopping.
max_iters: one or more numbers of different maximum interations.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
obj = affinity_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
map(float,dampings),
map(int,max_iters),
map(int,convergence_iters),
map(float,preference_quantiles))
obj.run(n_cores)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_affinity_lsi_grid_sweep)

View File

@@ -6,21 +6,20 @@ import numpy as np
from sklearn.cluster import AffinityPropagation
import fire
from pathlib import Path
from multiprocessing import cpu_count
from dataclasses import dataclass
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)
def affinity_clustering(similarities, *args, **kwargs):
def affinity_clustering(similarities, output, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
return _affinity_clustering(mat, subreddits, *args, **kwargs)
clustering = _affinity_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
cluster_data['algorithm'] = 'affinity'
return(cluster_data)
def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
'''
similarities: feather file with a dataframe of similarity scores
similarities: matrix of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
'''
@@ -40,25 +39,14 @@ def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000,
verbose=verbose,
random_state=random_state).fit(mat)
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count()
print(f"the largest cluster has {cluster_sizes.subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
sys.stdout.flush()
cluster_data = process_clustering_result(clustering, subreddits)
output = Path(output)
output.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(output)
print(f"saved {output}")
return clustering
if __name__ == "__main__":
fire.Fire(affinity_clustering)

View File

@@ -0,0 +1,105 @@
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from sklearn.metrics import silhouette_score, silhouette_samples
from collections import Counter
# this is meant to be an interface, not created directly
class clustering_job:
def __init__(self, infile, outpath, name, call, *args, **kwargs):
self.outpath = Path(outpath)
self.call = call
self.args = args
self.kwargs = kwargs
self.infile = Path(infile)
self.name = name
self.hasrun = False
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.clustering = self.call(self.mat, *self.args, **self.kwargs)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True
def get_info(self):
if not self.hasrun:
self.run()
self.result = clustering_result(outpath=str(self.outpath.resolve()),
silhouette_score=self.score,
name=self.name,
n_clusters=self.n_clusters,
n_isolates=self.n_isolates,
silhouette_samples = self.silsampout
)
return self.result
def silhouette(self):
counts = Counter(self.clustering.labels_)
singletons = [key for key, value in counts.items() if value == 1]
isolates = (self.clustering.labels_ == -1) | (np.isin(self.clustering.labels_,np.array(singletons)))
scoremat = self.mat[~isolates][:,~isolates]
if self.n_clusters > 1:
score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
self.outpath.mkdir(parents=True, exist_ok=True)
silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather")
self.silsampout = silsampout.resolve()
silhouette_samp.to_feather(self.silsampout)
else:
score = None
self.silsampout = None
return score
def read_distance_mat(self, similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,1-mat)
def process_clustering(self, clustering, subreddits):
if hasattr(clustering,'n_iter_'):
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
self.n_clusters = len(set(clusters))
print(f"found {self.n_clusters} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
n_isolates1 = (cluster_sizes.subreddit==1).sum()
print(f"{n_isolates1} clusters have 1 member")
n_isolates2 = cluster_sizes.loc[cluster_sizes.cluster==-1,:]['subreddit'].to_list()
if len(n_isolates2) > 0:
n_isloates2 = n_isolates2[0]
print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
if n_isolates1 == 0:
self.n_isolates = n_isolates2
else:
self.n_isolates = n_isolates1
return cluster_data
@dataclass
class clustering_result:
outpath:Path
silhouette_score:float
name:str
n_clusters:int
n_isolates:int
silhouette_samples:str

View File

@@ -17,7 +17,7 @@ def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=1000
df = pd.read_feather(similarities)
n = df.shape[0]
mat = np.array(df.drop('subreddit',1),dtype=np.float64)
mat = np.array(df.drop('_subreddit',1),dtype=np.float64)
mat[range(n),range(n)] = 1
mat[mat > 1] = 1
dist = 2*np.arccos(mat)/np.pi
@@ -26,7 +26,7 @@ def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=1000
tsne_fit_whole = tsne_fit_model.fit_transform(dist)
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], 'subreddit':df.subreddit})
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], '_subreddit':df['_subreddit']})
plot_data.to_feather(output)

33
clustering/grid_sweep.py Normal file
View File

@@ -0,0 +1,33 @@
from pathlib import Path
from multiprocessing import Pool, cpu_count
from itertools import product, chain
import pandas as pd
class grid_sweep:
def __init__(self, jobtype, inpath, outpath, namer, *args):
self.jobtype = jobtype
self.namer = namer
print(*args)
grid = list(product(*args))
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]
def run(self, cores=20):
if cores is not None and cores > 1:
with Pool(cores) as pool:
infos = pool.map(self.jobtype.get_info, self.jobs)
else:
infos = map(self.jobtype.get_info, self.jobs)
self.infos = pd.DataFrame(infos)
self.hasrun = True
def save(self, outcsv):
if not self.hasrun:
self.run()
outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv)

View File

@@ -0,0 +1,159 @@
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap, chain
import pandas as pd
from multiprocessing import cpu_count
import fire
def test_select_hdbscan_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI"
outpath = "test_hdbscan";
min_cluster_sizes=[2,3,4];
min_samples=[1,2,3];
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=[1];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
# job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
# job1.run()
# print(job1.get_info())
# df = pd.read_csv("test_hdbscan/selection_data.csv")
# test_select_hdbscan_clustering()
# check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
# silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
# c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
class hdbscan_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
min_cluster_size,
min_samples,
cluster_selection_epsilon,
cluster_selection_method):
return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
@dataclass
class hdbscan_clustering_result(clustering_result):
min_cluster_size:int
min_samples:int
cluster_selection_epsilon:float
cluster_selection_method:str
class hdbscan_job(clustering_job):
def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
super().__init__(infile,
outpath,
name,
call=hdbscan_job._hdbscan_clustering,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method
)
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
self.cluster_selection_epsilon = cluster_selection_epsilon
self.cluster_selection_method = cluster_selection_method
# self.mat = 1 - self.mat
def _hdbscan_clustering(mat, *args, **kwargs):
print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
print(mat)
clusterer = hdbscan.HDBSCAN(metric='precomputed',
core_dist_n_jobs=cpu_count(),
*args,
**kwargs,
)
clustering = clusterer.fit(mat.astype('double'))
return(clustering)
def get_info(self):
result = super().get_info()
self.result = hdbscan_clustering_result(**result.__dict__,
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
cluster_selection_epsilon=self.cluster_selection_epsilon,
cluster_selection_method=self.cluster_selection_method)
return self.result
def run_hdbscan_grid_sweep(savefile, inpath, outpath, min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom']):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=<csv "eom"|"leaf">
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilon: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_method: "eom" or "leaf" eom gives larger clusters.
"""
obj = hdbscan_grid_sweep(inpath,
outpath,
map(int,min_cluster_sizes),
map(int,min_samples),
map(float,cluster_selection_epsilons),
cluster_selection_methods)
obj.run()
obj.save(savefile)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
distances, indices = nbrs.kneighbors(mat)
d2 = distances[:,-1]
df = pd.DataFrame({'dist':d2})
df = df.sort_values("dist",ascending=False)
df['idx'] = np.arange(0,d2.shape[0]) + 1
p = pn.qplot(x='idx',y='dist',data=df,geom='line') + pn.scales.scale_y_continuous(minor_breaks = np.arange(0,50)/50,
breaks = np.arange(0,10)/10)
p.save(outname,width=16,height=10)
def make_KNN_plots():
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='terms_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
fire.Fire(run_hdbscan_grid_sweep)
# test_select_hdbscan_clustering()
#fire.Fire(select_hdbscan_clustering)

View File

@@ -0,0 +1,101 @@
from hdbscan_clustering import hdbscan_job, hdbscan_grid_sweep, hdbscan_clustering_result
from lsi_base import lsi_grid_sweep, lsi_mixin, lsi_result_mixin
from grid_sweep import grid_sweep
import fire
from dataclasses import dataclass
@dataclass
class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
pass
class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(
infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
partial_result = super().get_info()
self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods
):
super().__init__(hdbscan_lsi_job,
_hdbscan_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods)
class _hdbscan_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
print(args)
print(kwargs)
self.lsi_dim = lsi_dim
self.jobtype = hdbscan_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, [self.lsi_dim], *args, **kwargs)
def namer(self, *args, **kwargs):
s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_hdbscan_lsi_grid_sweep(savefile, inpath, outpath, min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=[1],lsi_dimensions='all'):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering_lsi --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=[eom]> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilons: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_methods: one or more of "eom" or "leaf" eom gives larger clusters.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
obj = hdbscan_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
list(map(int,min_cluster_sizes)),
list(map(int,min_samples)),
list(map(float,cluster_selection_epsilons)),
cluster_selection_methods)
obj.run(10)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_hdbscan_lsi_grid_sweep)

View File

@@ -0,0 +1,105 @@
from sklearn.cluster import KMeans
import fire
from pathlib import Path
from dataclasses import dataclass
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
max_iter:int
class kmeans_job(clustering_job):
def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=kmeans_job._kmeans_clustering,
n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose)
self.n_clusters=n_clusters
self.n_init=n_init
self.max_iter=max_iter
def _kmeans_clustering(mat, *args, **kwargs):
clustering = KMeans(*args,
**kwargs,
).fit(mat)
return clustering
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result(**result.__dict__,
n_init=self.n_init,
max_iter=self.max_iter)
return self.result
class kmeans_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
n_clusters,
n_init,
max_iter):
return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
def test_select_kmeans_clustering():
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_kmeans";
n_clusters=[200,300,400];
n_init=[1,2,3];
max_iter=[100000]
gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
gs.run(1)
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
def run_kmeans_grid_sweep(savefile, inpath, outpath, n_clusters=[500], n_inits=[1], max_iters=[3000]):
"""Run kmeans clustering once or more with different parameters.
Usage:
kmeans_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --n_clusters=<csv number of clusters> --n_inits=<csv> --max_iters=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
n_clusters: one or more numbers of kmeans clusters to select.
n_inits: one or more numbers of different initializations to use for each clustering.
max_iters: one or more numbers of different maximum interations.
"""
obj = kmeans_grid_sweep(inpath,
outpath,
map(int,n_clusters),
map(int,n_inits),
map(int,max_iters))
obj.run(1)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_kmeans_grid_sweep)

View File

@@ -0,0 +1,93 @@
import fire
from dataclasses import dataclass
from kmeans_clustering import kmeans_job, kmeans_clustering_result, kmeans_grid_sweep
from lsi_base import lsi_mixin, lsi_result_mixin, lsi_grid_sweep
from grid_sweep import grid_sweep
@dataclass
class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
pass
class kmeans_lsi_job(kmeans_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class _kmeans_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
print(args)
print(kwargs)
self.lsi_dim = lsi_dim
self.jobtype = kmeans_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, [self.lsi_dim], *args, **kwargs)
def namer(self, *args, **kwargs):
s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
class kmeans_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters
):
super().__init__(kmeans_lsi_job,
_kmeans_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters)
def run_kmeans_lsi_grid_sweep(savefile, inpath, outpath, n_clusters=[500], n_inits=[1], max_iters=[3000], lsi_dimensions="all"):
"""Run kmeans clustering once or more with different parameters.
Usage:
kmeans_clustering_lsi.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH d--lsi_dimensions=<"all"|csv number of LSI dimensions to use> --n_clusters=<csv number of clusters> --n_inits=<csv> --max_iters=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit kmeans clusterings.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
n_clusters: one or more numbers of kmeans clusters to select.
n_inits: one or more numbers of different initializations to use for each clustering.
max_iters: one or more numbers of different maximum interations.
"""
obj = kmeans_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
list(map(int,n_clusters)),
list(map(int,n_inits)),
list(map(int,max_iters))
)
obj.run(1)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_kmeans_lsi_grid_sweep)

29
clustering/lsi_base.py Normal file
View File

@@ -0,0 +1,29 @@
from clustering_base import clustering_job, clustering_result
from grid_sweep import grid_sweep
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
class lsi_mixin():
def set_lsi_dims(self, lsi_dims):
self.lsi_dims = lsi_dims
@dataclass
class lsi_result_mixin:
lsi_dimensions:int
class lsi_grid_sweep(grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*.feather"))
else:
lsi_paths = [inpath / (str(dim) + '.feather') for dim in lsi_dimensions]
print(lsi_paths)
lsi_nums = [int(p.stem) for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))

View File

@@ -0,0 +1,33 @@
#!/usr/bin/env python3
import fire
import pandas as pd
from pathlib import Path
import shutil
selection_data="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/clustering/comment_authors_compex_LSI/selection_data.csv"
outpath = 'test_best.feather'
min_clusters=50; max_isolates=7500; min_cluster_size=2
# pick the best clustering according to silhouette score subject to contraints
def pick_best_clustering(selection_data, output, min_clusters, max_isolates, min_cluster_size):
df = pd.read_csv(selection_data,index_col=0)
df = df.sort_values("silhouette_score",ascending=False)
# not sure I fixed the bug underlying this fully or not.
df['n_isolates_str'] = df.n_isolates.str.strip("[]")
df['n_isolates_0'] = df['n_isolates_str'].apply(lambda l: len(l) == 0)
df.loc[df.n_isolates_0,'n_isolates'] = 0
df.loc[~df.n_isolates_0,'n_isolates'] = df.loc[~df.n_isolates_0].n_isolates_str.apply(lambda l: int(l))
best_cluster = df[(df.n_isolates <= max_isolates)&(df.n_clusters >= min_clusters)&(df.min_cluster_size==min_cluster_size)]
best_cluster = best_cluster.iloc[0]
best_lsi_dimensions = best_cluster.lsi_dimensions
print(best_cluster.to_dict())
best_path = Path(best_cluster.outpath) / (str(best_cluster['name']) + ".feather")
shutil.copy(best_path,output)
print(f"lsi dimensions:{best_lsi_dimensions}")
if __name__ == "__main__":
fire.Fire(pick_best_clustering)

View File

@@ -1,101 +1,38 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from clustering import _affinity_clustering, read_similarity_mat
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
import numpy as np
import pandas as pd
import fire
import sys
import plotnine as pn
from pathlib import Path
from clustering.fit_tsne import fit_tsne
from visualization.tsne_vis import build_visualization
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
df = pd.read_csv("/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/hdbscan/selection_data.csv",index_col=0)
@dataclass
class clustering_result:
outpath:Path
damping:float
max_iter:int
convergence_iter:int
preference_quantile:float
silhouette_score:float
alt_silhouette_score:float
name:str
# plot silhouette_score as a function of isolates
df = df.sort_values("silhouette_score")
df['n_isolates'] = df.n_isolates.str.split("\n0").apply(lambda rg: int(rg[1]))
p = pn.ggplot(df,pn.aes(x='n_isolates',y='silhouette_score')) + pn.geom_point()
p.save("isolates_x_score.png")
def sim_to_dist(mat):
dist = 1-mat
dist[dist < 0] = 0
np.fill_diagonal(dist,0)
return dist
p = pn.ggplot(df,pn.aes(y='n_clusters',x='n_isolates',color='silhouette_score')) + pn.geom_point()
p.save("clusters_x_isolates.png")
def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
print(outpath)
clustering = _affinity_clustering(mat, subreddits, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
mat = sim_to_dist(clustering.affinity_matrix_)
# the best result for hdbscan seems like this one: it has a decent number of
# i think I prefer the 'eom' clustering style because larger clusters are less likely to suffer from ommitted variables
best_eom = df[(df.n_isolates <5000)&(df.silhouette_score>0.4)&(df.cluster_selection_method=='eom')&(df.min_cluster_size==2)].iloc[df.shape[1]]
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
best_lsi = df[(df.n_isolates <5000)&(df.silhouette_score>0.4)&(df.cluster_selection_method=='leaf')&(df.min_cluster_size==2)].iloc[df.shape[1]]
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
tsne_data = Path("./clustering/authors-tf_lsi850_tsne.feather")
res = clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
if not tnse_data.exists():
fit_tsne("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather",
tnse_data)
return res
build_visualization("./clustering/authors-tf_lsi850_tsne.feather",
Path(best_eom.outpath)/(best_eom['name']+'.feather'),
"./authors-tf_lsi850_best_eom.html")
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
build_visualization("./clustering/authors-tf_lsi850_tsne.feather",
Path(best_leaf.outpath)/(best_leaf['name']+'.feather'),
"./authors-tf_lsi850_best_leaf.html")
def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
damping = list(map(float,damping))
convergence_iter = convergence_iter = list(map(int,convergence_iter))
preference_quantile = list(map(float,preference_quantile))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
if J is None:
J = cpu_count()
pool = Pool(J)
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(damping, convergence_iter, preference_quantile)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# similarities = Array('d', mat)
# call pool.starmap
print("running clustering selection")
clustering_data = pool.starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_affinity_clustering)

View File

@@ -8,3 +8,9 @@ all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscrat
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather" --agg=pd.DataFrame.sum

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --outpath=/gscratch/comdata/output/reddit_density/comment_authors_10000.feather --agg=pd.DataFrame.sum
stop-all.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname).hyak.local:7077 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather --outpath=/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather --agg=pd.DataFrame.sum
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh

View File

@@ -1,11 +1,12 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy as GroupBy
from pathlib import Path
import fire
import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf, reindex_tfidf_time_interval
from similarities.similarities_helper import reindex_tfidf
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member
@@ -13,10 +14,12 @@ from similarities.similarities_helper import reindex_tfidf, reindex_tfidf_time_i
def overlap_density(inpath, outpath, agg = pd.DataFrame.sum):
df = pd.read_feather(inpath)
df = df.drop('subreddit',1)
df = df.drop('_subreddit',1)
np.fill_diagonal(df.values,0)
df = agg(df, 0).reset_index()
df = df.rename({0:'overlap_density'},axis='columns')
outpath = Path(outpath)
outpath.parent.mkdir(parents=True, exist_ok = True)
df.to_feather(outpath)
return df
@@ -25,6 +28,8 @@ def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum):
# exclude the diagonal
df = df.loc[df.subreddit != df.variable]
res = agg(df.groupby(['subreddit','week'])).reset_index()
outpath = Path(outpath)
outpath.parent.mkdir(parents=True, exist_ok = True)
res.to_feather(outpath)
return res

View File

@@ -6,7 +6,7 @@ from os import path
import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
#shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"

View File

@@ -1,12 +1,12 @@
#!/bin/bash
user_agent='nathante teblunthuis <nathante@uw.edu>'
user_agent='"nathante teblunthuis <nathante@uw.edu>"'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments'
base_url='https://files.pushshift.io/reddit/comments/'
wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
./check_comments_shas.py

View File

@@ -1,14 +1,14 @@
#!/bin/bash
user_agent='nathante teblunthuis <nathante@uw.edu>'
user_agent='"nathante teblunthuis <nathante@uw.edu>"'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/submissions'
base_url='https://files.pushshift.io/reddit/submissions/'
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.bz2' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.xz' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.zst' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.bz2' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.xz' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
./check_submission_shas.py

View File

@@ -13,10 +13,7 @@ from nltk.corpus import stopwords
from nltk.util import ngrams
import string
from random import random
# remove urls
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
from redditcleaner import clean
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
@@ -95,8 +92,8 @@ def weekly_tf(partition, mwe_pass = 'first'):
# lowercase
text = text.lower()
# remove urls
text = urlregex.sub("", text)
# redditcleaner removes reddit markdown(newlines, quotes, bullet points, links, strikethrough, spoiler, code, superscript, table, headings)
text = clean(text)
# sentence tokenize
sentences = sent_tokenize(text)
@@ -107,14 +104,13 @@ def weekly_tf(partition, mwe_pass = 'first'):
# remove punctuation
sentences = map(remove_punct, sentences)
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
# datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase.
# they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms
# here we take a 10 percent sample of sentences
if mwe_pass == 'first':
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
sentences = list(sentences)
for sentence in sentences:
if random() <= 0.1:

View File

@@ -1,25 +1,130 @@
all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms.parquet
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output
similarity_data=${base_data}/reddit_similarity
tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly
similarity_weekly_data=${similarity_data}/weekly
lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
#${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_weekly_130k.parquet
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.feather
${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py terms --topN=10000
${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py authors --topN=10000
${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200
/gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000
/gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet
${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
# start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
# /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
# start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py ${tfidf_weekly_data}/comment_authors.parquet
# start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet
/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet

View File

@@ -1 +0,0 @@
Try normalizing tf by the mean or std instead of the max to avoid penalizing subreddits with very active users.

View File

@@ -1,12 +1,15 @@
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import similarities, column_similarities
from cdsc_ecology_utils.similarity import similarities, column_similarities
from functools import partial
def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
stop-all.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh

View File

@@ -0,0 +1,85 @@
import pandas as pd
import fire
from pathlib import Path
from cdsc_ecology_utils.similarity.similarity_functions import lsi_column_similarities, similarities,
#from similarities_helper import similarities, lsi_column_similarities
from functools import partial
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/"
term_colname='term'
outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI'
n_components=[10,50,100]
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
n_iter=5
random_state=1968
algorithm='arpack'
topN = None
from_date=None
to_date=None
min_df=None
max_df=None
def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
print(n_components,flush=True)
if lsi_model is None:
if type(n_components) == list:
lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}_LSIMOD.pkl'
else:
lsi_model = Path(outfile) / f'{n_components}_{term_colname}_LSIMOD.pkl'
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model)
return similarities(inpath=inpath, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',outfile=None, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, algorithm='arpack', n_components=300,n_iter=5,random_state=1968):
res = lsi_similarities(inpath,
'term',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date,
to_date,
n_components=n_components,
algorithm = algorithm
)
return res
def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date=from_date,
to_date=to_date,
n_components=n_components
)
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date=from_date,
to_date=to_date,
tfidf_colname='relative_tf',
n_components=n_components,
algorithm=algorithm
)
if __name__ == "__main__":
fire.Fire({'term':term_lsi_similarities,
'author':author_lsi_similarities,
'author-tf':author_tf_similarities})

View File

@@ -2,143 +2,199 @@ from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as f
from enum import Enum
from multiprocessing import cpu_count, Pool
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from sklearn.metrics import pairwise_distances
from scipy.sparse import csr_matrix, issparse
from sklearn.decomposition import TruncatedSVD
import pandas as pd
import numpy as np
import pathlib
from datetime import datetime
from pathlib import Path
import pickle
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet"
# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
# cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
# subreddits missing after this step don't have any terms that have a high enough idf
# try rewriting without merges
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf_weekly = spark.read.parquet(infile)
# does reindex_tfidf, but without reindexing.
def reindex_tfidf(*args, **kwargs):
df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
# create the time interval
if from_date is not None:
if type(from_date) is str:
from_date = datetime.fromisoformat(from_date)
print("assigning names")
subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
batches = subreddit_names.to_batches()
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
subreddit_names = subreddit_names.set_index("subreddit_id")
if to_date is not None:
if type(to_date) is str:
to_date = datetime.fromisoformat(to_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read_parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
return(tempdir, subreddit_names)
return(df, subreddit_names)
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
def pull_tfidf(*args, **kwargs):
df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
return df
tfidf = spark.read.parquet(infile)
def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
print(f"loading tfidf {infile}", flush=True)
if week is not None:
tfidf_ds = ds.dataset(infile, partitioning='hive')
else:
tfidf_ds = ds.dataset(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
included_subreddits = set(map(str.strip,open(included_subreddits)))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
ds_filter = ds.field("subreddit").isin(included_subreddits)
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
if min_df is not None:
ds_filter &= ds.field("count") >= min_df
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
if max_df is not None:
ds_filter &= ds.field("count") <= max_df
if week is not None:
ds_filter &= ds.field("week") == week
if from_date is not None:
ds_filter &= ds.field("week") >= from_date
if to_date is not None:
ds_filter &= ds.field("week") <= to_date
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
projection = {
'subreddit_id':ds.field('subreddit_id'),
term_id:ds.field(term_id),
'relative_tf':ds.field("relative_tf").cast('float32')
}
if not rescale_idf:
projection = {
'subreddit_id':ds.field('subreddit_id'),
term_id:ds.field(term_id),
'relative_tf':ds.field('relative_tf').cast('float32'),
'tf_idf':ds.field('tf_idf').cast('float32')}
print(projection)
df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
df = df.to_pandas(split_blocks=True,self_destruct=True)
print("assigning indexes",flush=True)
if reindex:
df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
else:
df['subreddit_id_new'] = df['subreddit_id']
if reindex:
grouped = df.groupby(term_id)
df[term_id_new] = grouped.ngroup()
else:
df[term_id_new] = df[term_id]
if rescale_idf:
print("computing idf", flush=True)
df['new_count'] = grouped[term_id].transform('count')
N_docs = df.subreddit_id_new.max() + 1
df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
if tf_family == tf_weight.MaxTF:
df["tf_idf"] = df.relative_tf * df.idf
else: # tf_fam = tf_weight.Norm05
df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
return (df, tfidf_ds, ds_filter)
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
subreddit_names = subreddit_names.set_index("subreddit_id")
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
return (tempdir, subreddit_names)
return(df, subreddit_names)
def pull_names(batch):
return(batch.to_pandas().drop_duplicates())
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
'''
tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
'''
if from_date is not None or to_date is not None:
tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
else:
tempdir, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
print("loading matrix")
# mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)
print(f'computing similarities on mat. mat.shape:{mat.shape}')
print(f"size of mat is:{mat.data.nbytes}")
sims = simfunc(mat)
del mat
def proc_sims(sims, outfile):
if issparse(sims):
sims = sims.todense()
print(f"shape of sims:{sims.shape}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
sims = pd.DataFrame(sims)
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
sims['_subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
p.parent.mkdir(exist_ok=True, parents=True)
sims.to_feather(outfile)
tempdir.cleanup()
def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
print(f"tfidf_colname:{tfidf_colname}")
entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
print("loading matrix")
# mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
print(f'computing similarities on mat. mat.shape:{mat.shape}')
print(f"size of mat is:{mat.data.nbytes}",flush=True)
sims = simfunc(mat)
del mat
if hasattr(sims,'__next__'):
for simmat, name in sims:
proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
else:
proc_sims(sims, outfile)
def write_weekly_similarities(path, sims, week, names):
sims['week'] = week
p = pathlib.Path(path)
if not p.is_dir():
p.mkdir()
p.mkdir(exist_ok=True,parents=True)
# reformat as a pairwise list
sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values)
sims.to_parquet(p / week.isoformat())
def column_overlaps(mat):
@@ -150,136 +206,75 @@ def column_overlaps(mat):
return intersection / den
def test_lsi_sims():
term = "term"
term_id = term + '_id'
term_id_new = term + '_id_new'
t1 = time.perf_counter()
entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
term_colname='term',
min_df=2000,
topN=10000
)
t2 = time.perf_counter()
print(f"first load took:{t2 - t1}s")
entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
term_colname='term',
min_df=2000,
topN=10000
)
t3=time.perf_counter()
print(f"second load took:{t3 - t2}s")
mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
sims = list(lsi_column_similarities(mat, [10,50]))
sims_og = sims
sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
# n_components is the latent dimensionality. sklearn recommends 100. More might be better
# if n_components is a list we'll return a list of similarities with different latent dimensionalities
# if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
# this function takes the svd and then the column similarities of it
def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
# first compute the lsi of the matrix
# then take the column similarities
if type(n_components) is int:
n_components = [n_components]
n_components = sorted(n_components,reverse=True)
svd_components = n_components[0]
if lsi_model_load is not None and Path(lsi_model_load).exists():
print("loading LSI")
mod = pickle.load(open(lsi_model_load ,'rb'))
lsi_model_save = lsi_model_load
else:
print("running LSI",flush=True)
svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
mod = svd.fit(tfidfmat.T)
lsimat = mod.transform(tfidfmat.T)
if lsi_model_save is not None:
pickle.dump(mod, open(lsi_model_save,'wb'))
sims_list = []
for n_dims in n_components:
sims = column_similarities(lsimat[:,np.arange(n_dims)])
if len(n_components) > 1:
yield (sims, n_dims)
else:
return sims
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
sub_ids = tfidf.select(['subreddit_id','week']).distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
# only use terms in at least min_df included subreddits in a given week
new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
# reset the term ids
term_ids = tfidf.select([term_id,'week']).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
tfidf = tfidf.join(term_ids,[term_id,'week'])
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf = tfidf.repartition('week')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return(tempdir)
def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
# try computing cosine similarities using spark
def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
tfidf = tfidf.cache()
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
# put like 10 subredis in each partition
# step 2 make it into a distributed.RowMatrix
coordMat = CoordinateMatrix(entries)
coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
# this needs to be an IndexedRowMatrix()
mat = coordMat.toRowMatrix()
#goal: build a matrix of subreddit columns and tf-idfs rows
sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
return (sim_dist, tfidf)
return 1 - pairwise_distances(mat,metric='cosine')
def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
@@ -306,20 +301,20 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select([term,'week']).distinct() # terms are distinct
terms = idf.select([term]).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit','week']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
subreddits = df.select(['subreddit']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
df = df.join(subreddits,on=['subreddit','week'])
df = df.join(subreddits,on=['subreddit'])
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
df = df.join(terms,on=[term]) # subreddit-term-id is unique
idf = idf.join(terms,on=[term,'week'])
idf = idf.join(terms,on=[term])
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term,'week'])
@@ -331,7 +326,9 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
df = df.repartition(400,'subreddit','week')
dfwriter = df.write.partitionBy("week")
return dfwriter
def _calc_tfidf(df, term_colname, tf_family):
term = term_colname
@@ -342,7 +339,7 @@ def _calc_tfidf(df, term_colname, tf_family):
df = df.join(max_subreddit_terms, on='subreddit')
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
# group by term. term is unique
idf = df.groupby([term]).count()
@@ -377,7 +374,7 @@ def _calc_tfidf(df, term_colname, tf_family):
return df
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
@@ -385,10 +382,28 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
df = _calc_tfidf(df, term_colname, tf_family)
return df
df = df.repartition('subreddit')
dfwriter = df.write
return dfwriter
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
rankdf = pd.read_csv(path)
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
return included_subreddits
def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.repartition(400,'subreddit')
df.write.parquet(outpath,mode='overwrite')
def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.repartition(400,'subreddit','week')
dfwriter = df.write.partitionBy("week")
dfwriter.parquet(outpath,mode='overwrite')

View File

@@ -1,7 +1,8 @@
import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
from cdsc_ecology_utils.similarity.similarity_functions import tfidf_dataset, \
build_weekly_tfidf_dataset, select_topN_communities
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits):
spark = SparkSession.builder.getOrCreate()
@@ -11,27 +12,27 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
df = df.filter(~ f.col(term_colname).isin(exclude))
if included_subreddits is not None:
include_subs = set(map(str.strip,map(str.lower, open(included_subreddits))))
include_subs = set(map(str.strip,open(included_subreddits)))
else:
include_subs = select_topN_subreddits(topN)
include_subs = select_topN_communities(topN)
df = func(df, include_subs, term_colname)
df.write.parquet(outpath,mode='overwrite',compression='snappy')
dfwriter = func(df, include_subs, term_colname)
dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=25000,
def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=None,
included_subreddits=None):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
return tfidf(inpath,
outpath,
topN,
'author',
@@ -39,11 +40,12 @@ def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comm
included_subreddits=included_subreddits
)
def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=25000,
def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=None,
included_subreddits=None):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
return tfidf(inpath,
outpath,
topN,
'term',
@@ -51,11 +53,12 @@ def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/commen
included_subreddits=included_subreddits
)
def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=25000,
def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=None,
included_subreddits=None):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
return tfidf_weekly(inpath,
outpath,
topN,
'author',
@@ -63,12 +66,13 @@ def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfi
included_subreddits=included_subreddits
)
def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=25000,
def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=None,
included_subreddits=None):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
return tfidf_weekly(inpath,
outpath,
topN,
'term',

View File

@@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit')
df = df.filter(df.prop_nsfw < 0.5)
#df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +26,4 @@ df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False)

148
similarities/weekly_cosine_similarities.py Normal file → Executable file
View File

@@ -1,81 +1,143 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pyarrow.dataset as ds
import pandas as pd
import fire
from itertools import islice
from itertools import islice, chain
from pathlib import Path
from similarities_helper import *
from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities
from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count
from functools import partial
def _week_similarities(tempdir, term_colname, week):
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
min_df=None
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None
topN=100
term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week,
rescale_idf=False)
tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
print('computing similarities')
sims = column_similarities(mat)
sims = simfunc(mat)
del mat
sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, subreddit_names)
names = subreddit_names.loc[subreddit_names.week == week]
sims = pd.DataFrame(sims.todense())
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
sims['_subreddit'] = names.subreddit.values
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs):
term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
write_weekly_similarities(outfile, sims, week, names)
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities):
print(outfile)
tfidf = spark.read.parquet(tfidf_path)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df=None, included_subreddits=included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
# the ids can change each week.
subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
weeks = sorted(list(subreddit_names.week.drop_duplicates()))
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
def week_similarities_helper(week):
_week_similarities(tempdir, term_colname, week)
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(tfidf_path)
with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
list(pool.map(week_similarities_helper,weeks))
# load subreddits + topN
def author_cosine_similarities_weekly(outfile, min_df=2 , included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id")
nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max
weeks = df.select(f.col("week")).distinct().toPandas().week.values
spark.stop()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms)
pool = Pool(cpu_count())
list(pool.imap(week_similarities_helper,weeks))
pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly})
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly
})

2
timeseries/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .choose_clusters import load_clusters, load_densities
from .cluster_timeseries import build_cluster_timeseries

View File

@@ -2,11 +2,11 @@ import pandas as pd
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from choose_clusters import load_clusters, load_densities
from .choose_clusters import load_clusters, load_densities
import fire
from pathlib import Path
def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
@@ -34,4 +34,4 @@ def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":
fire.Fire(main)
fire.Fire(build_cluster_timeseries)

View File

@@ -22,8 +22,12 @@ def base_plot(plot_data):
#
# subreddit_select = alt.selection_single(on='click',fields=['subreddit'],bind=subreddit_dropdown,name='subreddit_click')
base_scale = alt.Scale(scheme={"name":'category10',
"extent":[0,100],
"count":10})
color = alt.condition(cluster_click_select ,
alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')),
alt.Color(field='color',type='nominal',scale=base_scale),
alt.value("lightgray"))
@@ -84,6 +88,11 @@ def viewport_plot(plot_data):
return chart
def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
isolate_color = 101
cluster_sizes = clusters.groupby('cluster').count()
singletons = set(cluster_sizes.loc[cluster_sizes.subreddit == 1].reset_index().cluster)
tsne_data = tsne_data.merge(clusters,on='subreddit')
centroids = tsne_data.groupby('cluster').agg({'x':np.mean,'y':np.mean})
@@ -120,6 +129,9 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
color_assignments = np.repeat(-1,len(centroids))
for i in range(len(centroids)):
if (centroids.iloc[i].name == -1) or (i in singletons):
color_assignments[i] = isolate_color
else:
knn = indices[i]
knn_colors = color_assignments[knn]
available_colors = color_ids[list(set(color_ids) - set(knn_colors))]
@@ -129,7 +141,6 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
else:
raise Exception("Can't color this many neighbors with this many colors")
centroids = centroids.reset_index()
colors = centroids.loc[:,['cluster']]
colors['color'] = color_assignments
@@ -143,12 +154,13 @@ def build_visualization(tsne_data, clusters, output):
# clusters = "/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather"
tsne_data = pd.read_feather(tsne_data)
tsne_data = tsne_data.rename(columns={'_subreddit':'subreddit'})
clusters = pd.read_feather(clusters)
tsne_data = assign_cluster_colors(tsne_data,clusters,10,8)
# sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
# sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
tsne_data = tsne_data.merge(sr_per_cluster,on='cluster')