18
0

3 Commits

Author SHA1 Message Date
Nate E TeBlunthuis
f728292461 Merge branch 'charliepatch' of code:cdsc_reddit into charliepatch 2021-05-02 23:56:16 -07:00
Nate E TeBlunthuis
95905cfc8b Merge branch 'excise_reindex' of code:cdsc_reddit into charliepatch 2021-05-02 23:52:52 -07:00
Nate E TeBlunthuis
46623927fe Merge branch 'charliepatch' of code:cdsc_reddit into charliepatch 2021-04-26 13:22:29 -07:00
37 changed files with 561 additions and 1300 deletions

3
.gitmodules vendored
View File

@@ -1,3 +0,0 @@
[submodule "cdsc_ecology_utils"]
path = cdsc_ecology_utils
url = code:cdsc_ecology_utils

View File

@@ -1,2 +0,0 @@
from timeseries import load_clusters, load_densities, build_cluster_timeseries
from cdsc_ecology_utils import similarity_functions

View File

@@ -2,164 +2,41 @@
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
kmeans_selection_grid="--max_iter=3000 --n_init=[10] --n_clusters=[100,500,1000,1500,2000,2500,3000,2350,3500,3570,4000]"
#selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
all:$(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
authors_10k_input=$(similarity_data)/subreddit_comment_authors_10k.feather
authors_10k_input_lsi=$(similarity_data)/subreddit_comment_authors_10k_LSI
authors_10k_output=$(clustering_data)/subreddit_comment_authors_10k
authors_10k_output_lsi=$(clustering_data)/subreddit_comment_authors_10k_LSI
$(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k/kmeans $(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
authors_tf_10k_input=$(similarity_data)/subreddit_comment_authors-tf_10k.feather
authors_tf_10k_input_lsi=$(similarity_data)/subreddit_comment_authors-tf_10k_LSI
authors_tf_10k_output=$(clustering_data)/subreddit_comment_authors-tf_10k
authors_tf_10k_output_lsi=$(clustering_data)/subreddit_comment_authors-tf_10k_LSI
$(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k/kmeans $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
terms_10k_input=$(similarity_data)/subreddit_comment_terms_10k.feather
terms_10k_input_lsi=$(similarity_data)/subreddit_comment_terms_10k_LSI
terms_10k_output=$(clustering_data)/subreddit_comment_terms_10k
terms_10k_output_lsi=$(clustering_data)/subreddit_comment_terms_10k_LSI
all:terms_10k authors_10k authors_tf_10k terms_10k_lsi authors_10k_lsi authors_tf_10k_lsi
terms_10k:${terms_10k_output}/kmeans/selection_data.csv ${terms_10k_output}/affinity/selection_data.csv ${terms_10k_output}/hdbscan/selection_data.csv
authors_10k:${authors_10k_output}/kmeans/selection_data.csv ${authors_10k_output}/hdbscan/selection_data.csv ${authors_10k_output}/affinity/selection_data.csv
authors_tf_10k:${authors_tf_10k_output}/kmeans/selection_data.csv ${authors_tf_10k_output}/hdbscan/selection_data.csv ${authors_tf_10k_output}/affinity/selection_data.csv
terms_10k_lsi:${terms_10k_output_lsi}/kmeans/selection_data.csv ${terms_10k_output_lsi}/affinity/selection_data.csv ${terms_10k_output_lsi}/hdbscan/selection_data.csv
authors_10k_lsi:${authors_10k_output_lsi}/kmeans/selection_data.csv ${authors_10k_output_lsi}/hdbscan/selection_data.csv ${authors_10k_output_lsi}/affinity/selection_data.csv
authors_tf_10k_lsi:${authors_tf_10k_output_lsi}/kmeans/selection_data.csv ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
${authors_10k_output}/kmeans/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/kmeans --savefile=${authors_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${terms_10k_output}/kmeans/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/kmeans --savefile=${terms_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_tf_10k_output}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/kmeans --savefile=${authors_tf_10k_output}/kmeans/selection_data.csv $(kmeans_selection_grid)
${authors_10k_output}/affinity/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/affinity --savefile=${authors_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${terms_10k_output}/affinity/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/affinity --savefile=${terms_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_tf_10k_output}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/affinity --savefile=${authors_tf_10k_output}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output}/hdbscan/selection_data.csv:selection.py ${authors_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_10k_input} --outpath=${authors_10k_output}/hdbscan --savefile=${authors_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output}/hdbscan/selection_data.csv:selection.py ${terms_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${terms_10k_input} --outpath=${terms_10k_output}/hdbscan --savefile=${terms_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering.py --inpath=${authors_tf_10k_input} --outpath=${authors_tf_10k_output}/hdbscan --savefile=${authors_tf_10k_output}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
$(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
## LSI Models
${authors_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/kmeans --savefile=${authors_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
affinity_selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
$(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k/affinity $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
${terms_10k_output_lsi}/kmeans/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/kmeans --savefile=${terms_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
$(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k/affinity $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
${authors_tf_10k_output_lsi}/kmeans/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py kmeans_clustering.py
$(srun_singularity) python3 kmeans_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/kmeans --savefile=${authors_tf_10k_output_lsi}/kmeans/selection_data.csv $(kmeans_selection_grid)
$(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k/affinity $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
${authors_10k_output_lsi}/affinity/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/affinity --savefile=${authors_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
clean:
rm -f $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv
${terms_10k_output_lsi}/affinity/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/affinity --savefile=${terms_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_tf_10k_output_lsi}/affinity/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py affinity_clustering.py
$(srun_singularity) python3 affinity_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/affinity --savefile=${authors_tf_10k_output_lsi}/affinity/selection_data.csv $(affinity_selection_grid)
${authors_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${authors_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_10k_input_lsi} --outpath=${authors_10k_output_lsi}/hdbscan --savefile=${authors_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${terms_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${terms_10k_input_lsi} --outpath=${terms_10k_output_lsi}/hdbscan --savefile=${terms_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
clean_affinity:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/affinity/selection_data.csv
clean_kmeans:
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
clean_hdbscan:
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_authors:
rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_10k_output}/kmeans/selection_data.csv
rm -f ${authors_10k_output}/hdbscan/selection_data.csv
clean_authors_tf:
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output}/hdbscan/selection_data.csv
clean_terms:
rm -f ${terms_10k_output}/affinity/selection_data.csv
rm -f ${terms_10k_output}/kmeans/selection_data.csv
rm -f ${terms_10k_output}/hdbscan/selection_data.csv
clean_lsi_affinity:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
clean_lsi_kmeans:
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
clean_lsi_hdbscan:
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors:
rm -f ${authors_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_authors_tf:
rm -f ${authors_tf_10k_output_lsi}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv
clean_lsi_terms:
rm -f ${terms_10k_output_lsi}/affinity/selection_data.csv
rm -f ${terms_10k_output_lsi}/kmeans/selection_data.csv
rm -f ${terms_10k_output_lsi}/hdbscan/selection_data.csv
clean: clean_affinity clean_kmeans clean_hdbscan
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k
PHONY: clean
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS

View File

@@ -1,129 +0,0 @@
from sklearn.cluster import AffinityPropagation
from dataclasses import dataclass
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
from pathlib import Path
from itertools import product, starmap
import fire
import sys
import numpy as np
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
class affinity_clustering_result(clustering_result):
damping:float
convergence_iter:int
preference_quantile:float
preference:float
max_iter:int
class affinity_job(clustering_job):
def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=self._affinity_clustering,
preference_quantile=preference_quantile,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
random_state=1968,
verbose=verbose)
self.damping=damping
self.max_iter=max_iter
self.convergence_iter=convergence_iter
self.preference_quantile=preference_quantile
def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
mat = 1-mat
preference = np.quantile(mat, preference_quantile)
self.preference = preference
print(f"preference is {preference}")
print("data loaded")
sys.stdout.flush()
clustering = AffinityPropagation(*args,
preference=preference,
affinity='precomputed',
copy=False,
**kwargs).fit(mat)
return clustering
def get_info(self):
result = super().get_info()
self.result=affinity_clustering_result(**result.__dict__,
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
preference_quantile=self.preference_quantile,
preference=self.preference)
return self.result
class affinity_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(affinity_job,
_afffinity_grid_sweep,
inpath,
outpath,
self.namer,
*args,
**kwargs)
def namer(self,
damping,
max_iter,
convergence_iter,
preference_quantile):
return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
def run_affinity_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5],n_cores=10):
"""Run affinity clustering once or more with different parameters.
Usage:
affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering.
preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
convergence_iters:one or more integers of number of iterations without improvement before stopping.
max_iters: one or more numbers of different maximum interations.
"""
obj = affinity_grid_sweep(inpath,
outpath,
map(float,dampings),
map(int,max_iters),
map(int,convergence_iters),
map(float,preference_quantiles))
obj.run(n_cores)
obj.save(savefile)
def test_select_affinity_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_affinity";
dampings=[0.8,0.9]
max_iters=[100000]
convergence_iters=[15]
preference_quantiles=[0.5,0.7]
gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
gs.run(20)
gs.save("test_affinity/lsi_sweep.csv")
if __name__ == "__main__":
fire.Fire(run_affinity_grid_sweep)

View File

@@ -1,99 +0,0 @@
import fire
from affinity_clustering import affinity_clustering_result, affinity_job, affinity_grid_sweep
from grid_sweep import grid_sweep
from lsi_base import lsi_result_mixin, lsi_grid_sweep, lsi_mixin
from dataclasses import dataclass
@dataclass
class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
pass
class affinity_lsi_job(affinity_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
result = super().get_info()
self.result = affinity_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class affinity_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
dampings=[0.9],
max_iters=[10000],
convergence_iters=[30],
preference_quantiles=[0.5]):
super().__init__(affinity_lsi_job,
_affinity_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
dampings,
max_iters,
convergence_iters,
preference_quantiles)
class _affinity_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
self.lsi_dim = lsi_dim
self.jobtype = affinity_lsi_job
super().__init__(self.jobtype,
inpath,
outpath,
self.namer,
[self.lsi_dim],
*args,
**kwargs)
def namer(self, *args, **kwargs):
s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_affinity_lsi_grid_sweep(savefile, inpath, outpath, dampings=[0.8], max_iters=[3000], convergence_iters=[30], preference_quantiles=[0.5], lsi_dimensions='all',n_cores=30):
"""Run affinity clustering once or more with different parameters.
Usage:
affinity_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --max_iters=<csv> --dampings=<csv> --preference_quantiles=<csv> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit kmeans clusterings.
dampings:one or more numbers in [0.5, 1). damping parameter in affinity propagatin clustering.
preference_quantiles:one or more numbers in (0,1) for selecting the 'preference' parameter.
convergence_iters:one or more integers of number of iterations without improvement before stopping.
max_iters: one or more numbers of different maximum interations.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
obj = affinity_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
map(float,dampings),
map(int,max_iters),
map(int,convergence_iters),
map(float,preference_quantiles))
obj.run(n_cores)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_affinity_lsi_grid_sweep)

View File

@@ -3,7 +3,7 @@
import sys
import pandas as pd
import numpy as np
from sklearn.cluster import AffinityPropagation
from sklearn.cluster import AffinityPropagation, KMeans
import fire
from pathlib import Path
from multiprocessing import cpu_count
@@ -46,6 +46,24 @@ def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000,
print(f"saved {output}")
return clustering
def kmeans_clustering(similarities, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
return(cluster_data)
def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
clustering = KMeans(n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose
).fit(mat)
return clustering
if __name__ == "__main__":

View File

@@ -2,76 +2,21 @@ from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from sklearn.metrics import silhouette_score, silhouette_samples
from collections import Counter
# this is meant to be an interface, not created directly
class clustering_job:
def __init__(self, infile, outpath, name, call, *args, **kwargs):
self.outpath = Path(outpath)
self.call = call
self.args = args
self.kwargs = kwargs
self.infile = Path(infile)
self.name = name
self.hasrun = False
def sim_to_dist(mat):
dist = 1-mat
dist[dist < 0] = 0
np.fill_diagonal(dist,0)
return dist
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.clustering = self.call(self.mat, *self.args, **self.kwargs)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True
def get_info(self):
if not self.hasrun:
self.run()
self.result = clustering_result(outpath=str(self.outpath.resolve()),
silhouette_score=self.score,
name=self.name,
n_clusters=self.n_clusters,
n_isolates=self.n_isolates,
silhouette_samples = self.silsampout
)
return self.result
def silhouette(self):
counts = Counter(self.clustering.labels_)
singletons = [key for key, value in counts.items() if value == 1]
isolates = (self.clustering.labels_ == -1) | (np.isin(self.clustering.labels_,np.array(singletons)))
scoremat = self.mat[~isolates][:,~isolates]
if self.n_clusters > 1:
score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
self.outpath.mkdir(parents=True, exist_ok=True)
silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather")
self.silsampout = silsampout.resolve()
silhouette_samp.to_feather(self.silsampout)
else:
score = None
self.silsampout = None
return score
def read_distance_mat(self, similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,1-mat)
def process_clustering(self, clustering, subreddits):
def process_clustering_result(clustering, subreddits):
if hasattr(clustering,'n_iter_'):
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
self.n_clusters = len(set(clusters))
print(f"found {self.n_clusters} clusters")
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
@@ -79,27 +24,26 @@ class clustering_job:
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
n_isolates1 = (cluster_sizes.subreddit==1).sum()
print(f"{n_isolates1} clusters have 1 member")
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
n_isolates2 = cluster_sizes.loc[cluster_sizes.cluster==-1,:]['subreddit'].to_list()
if len(n_isolates2) > 0:
n_isloates2 = n_isolates2[0]
print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
if n_isolates1 == 0:
self.n_isolates = n_isolates2
else:
self.n_isolates = n_isolates1
print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
return cluster_data
@dataclass
class clustering_result:
outpath:Path
max_iter:int
silhouette_score:float
alt_silhouette_score:float
name:str
n_clusters:int
n_isolates:int
silhouette_samples:str
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)

View File

@@ -17,7 +17,7 @@ def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=1000
df = pd.read_feather(similarities)
n = df.shape[0]
mat = np.array(df.drop('_subreddit',1),dtype=np.float64)
mat = np.array(df.drop('subreddit',1),dtype=np.float64)
mat[range(n),range(n)] = 1
mat[mat > 1] = 1
dist = 2*np.arccos(mat)/np.pi
@@ -26,7 +26,7 @@ def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=1000
tsne_fit_whole = tsne_fit_model.fit_transform(dist)
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], '_subreddit':df['_subreddit']})
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], 'subreddit':df.subreddit})
plot_data.to_feather(output)

View File

@@ -1,33 +0,0 @@
from pathlib import Path
from multiprocessing import Pool, cpu_count
from itertools import product, chain
import pandas as pd
class grid_sweep:
def __init__(self, jobtype, inpath, outpath, namer, *args):
self.jobtype = jobtype
self.namer = namer
print(*args)
grid = list(product(*args))
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]
def run(self, cores=20):
if cores is not None and cores > 1:
with Pool(cores) as pool:
infos = pool.map(self.jobtype.get_info, self.jobs)
else:
infos = map(self.jobtype.get_info, self.jobs)
self.infos = pd.DataFrame(infos)
self.hasrun = True
def save(self, outcsv):
if not self.hasrun:
self.run()
outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv)

View File

@@ -1,57 +1,32 @@
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap, chain
from itertools import product, starmap
import pandas as pd
from multiprocessing import cpu_count
from sklearn.metrics import silhouette_score, silhouette_samples
from pathlib import Path
from multiprocessing import Pool, cpu_count
import fire
from pyarrow.feather import write_feather
def test_select_hdbscan_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI"
select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
"test_hdbscan_author30k",
min_cluster_sizes=[2],
min_samples=[1,2],
cluster_selection_epsilons=[0,0.05,0.1,0.15],
cluster_selection_methods=['eom','leaf'],
lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
outpath = "test_hdbscan";
min_cluster_sizes=[2,3,4];
min_samples=[1,2,3];
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=[1];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
# job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
# job1.run()
# print(job1.get_info())
# df = pd.read_csv("test_hdbscan/selection_data.csv")
# test_select_hdbscan_clustering()
# check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
# silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
# c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
class hdbscan_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
min_cluster_size,
min_samples,
cluster_selection_epsilon,
cluster_selection_method):
return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
@dataclass
class hdbscan_clustering_result(clustering_result):
@@ -59,31 +34,101 @@ class hdbscan_clustering_result(clustering_result):
min_samples:int
cluster_selection_epsilon:float
cluster_selection_method:str
lsi_dimensions:int
n_isolates:int
silhouette_samples:str
class hdbscan_job(clustering_job):
def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
super().__init__(infile,
def select_hdbscan_clustering(inpath,
outpath,
name,
call=hdbscan_job._hdbscan_clustering,
outfile=None,
min_cluster_sizes=[2],
min_samples=[1],
cluster_selection_epsilons=[0],
cluster_selection_methods=['eom'],
lsi_dimensions='all'
):
inpath = Path(inpath)
outpath = Path(outpath)
outpath.mkdir(exist_ok=True, parents=True)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*"))
else:
lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
lsi_nums = [p.stem for p in lsi_paths]
grid = list(product(lsi_nums,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods))
# fix the output file names
names = list(map(lambda t:'_'.join(map(str,t)),grid))
grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
with Pool(int(cpu_count()/4)) as pool:
mods = starmap(hdbscan_clustering, grid)
res = pd.DataFrame(mods)
if outfile is None:
outfile = outpath / "selection_data.csv"
res.to_csv(outfile)
def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _hdbscan_clustering(mat,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method
cluster_selection_method=cluster_selection_method,
metric='precomputed',
core_dist_n_jobs=cpu_count()
)
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
self.cluster_selection_epsilon = cluster_selection_epsilon
self.cluster_selection_method = cluster_selection_method
# self.mat = 1 - self.mat
cluster_data = process_clustering_result(clustering, subreddits)
isolates = clustering.labels_ == -1
scoremat = mat[~isolates][:,~isolates]
score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
cluster_data.to_feather(output)
def _hdbscan_clustering(mat, *args, **kwargs):
silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
silsampout = output.parent / ("silhouette_samples" + output.name)
silhouette_samp.to_feather(silsampout)
result = hdbscan_clustering_result(outpath=output,
max_iter=None,
silhouette_samples=silsampout,
silhouette_score=score,
alt_silhouette_score=score,
name=name,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method,
lsi_dimensions=lsi_dim,
n_isolates=isolates.sum(),
n_clusters=len(set(clustering.labels_))
)
return(result)
# for all runs we should try cluster_selection_epsilon = None
# for terms we should try cluster_selection_epsilon around 0.56-0.66
# for authors we should try cluster_selection_epsilon around 0.98-0.99
def _hdbscan_clustering(mat, *args, **kwargs):
print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
print(mat)
clusterer = hdbscan.HDBSCAN(metric='precomputed',
core_dist_n_jobs=cpu_count(),
*args,
clusterer = hdbscan.HDBSCAN(*args,
**kwargs,
)
@@ -91,39 +136,6 @@ class hdbscan_job(clustering_job):
return(clustering)
def get_info(self):
result = super().get_info()
self.result = hdbscan_clustering_result(**result.__dict__,
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
cluster_selection_epsilon=self.cluster_selection_epsilon,
cluster_selection_method=self.cluster_selection_method)
return self.result
def run_hdbscan_grid_sweep(savefile, inpath, outpath, min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom']):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=<csv "eom"|"leaf">
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilon: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_method: "eom" or "leaf" eom gives larger clusters.
"""
obj = hdbscan_grid_sweep(inpath,
outpath,
map(int,min_cluster_sizes),
map(int,min_samples),
map(float,cluster_selection_epsilons),
cluster_selection_methods)
obj.run()
obj.save(savefile)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
distances, indices = nbrs.kneighbors(mat)
@@ -153,7 +165,8 @@ def make_KNN_plots():
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
fire.Fire(run_hdbscan_grid_sweep)
# test_select_hdbscan_clustering()
#fire.Fire(select_hdbscan_clustering)
df = pd.read_csv("test_hdbscan/selection_data.csv")
test_select_hdbscan_clustering()
check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)

View File

@@ -1,101 +0,0 @@
from hdbscan_clustering import hdbscan_job, hdbscan_grid_sweep, hdbscan_clustering_result
from lsi_base import lsi_grid_sweep, lsi_mixin, lsi_result_mixin
from grid_sweep import grid_sweep
import fire
from dataclasses import dataclass
@dataclass
class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
pass
class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(
infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
partial_result = super().get_info()
self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods
):
super().__init__(hdbscan_lsi_job,
_hdbscan_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods)
class _hdbscan_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
print(args)
print(kwargs)
self.lsi_dim = lsi_dim
self.jobtype = hdbscan_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, [self.lsi_dim], *args, **kwargs)
def namer(self, *args, **kwargs):
s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_hdbscan_lsi_grid_sweep(savefile, inpath, outpath, min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=[1],lsi_dimensions='all'):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering_lsi --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=[eom]> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilons: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_methods: one or more of "eom" or "leaf" eom gives larger clusters.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
obj = hdbscan_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
list(map(int,min_cluster_sizes)),
list(map(int,min_samples)),
list(map(float,cluster_selection_epsilons)),
cluster_selection_methods)
obj.run(10)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_hdbscan_lsi_grid_sweep)

View File

@@ -1,105 +0,0 @@
from sklearn.cluster import KMeans
import fire
from pathlib import Path
from dataclasses import dataclass
from clustering_base import clustering_result, clustering_job
from grid_sweep import grid_sweep
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
max_iter:int
class kmeans_job(clustering_job):
def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=kmeans_job._kmeans_clustering,
n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose)
self.n_clusters=n_clusters
self.n_init=n_init
self.max_iter=max_iter
def _kmeans_clustering(mat, *args, **kwargs):
clustering = KMeans(*args,
**kwargs,
).fit(mat)
return clustering
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result(**result.__dict__,
n_init=self.n_init,
max_iter=self.max_iter)
return self.result
class kmeans_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
n_clusters,
n_init,
max_iter):
return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
def test_select_kmeans_clustering():
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_kmeans";
n_clusters=[200,300,400];
n_init=[1,2,3];
max_iter=[100000]
gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
gs.run(1)
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
def run_kmeans_grid_sweep(savefile, inpath, outpath, n_clusters=[500], n_inits=[1], max_iters=[3000]):
"""Run kmeans clustering once or more with different parameters.
Usage:
kmeans_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --n_clusters=<csv number of clusters> --n_inits=<csv> --max_iters=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
n_clusters: one or more numbers of kmeans clusters to select.
n_inits: one or more numbers of different initializations to use for each clustering.
max_iters: one or more numbers of different maximum interations.
"""
obj = kmeans_grid_sweep(inpath,
outpath,
map(int,n_clusters),
map(int,n_inits),
map(int,max_iters))
obj.run(1)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_kmeans_grid_sweep)

View File

@@ -1,93 +0,0 @@
import fire
from dataclasses import dataclass
from kmeans_clustering import kmeans_job, kmeans_clustering_result, kmeans_grid_sweep
from lsi_base import lsi_mixin, lsi_result_mixin, lsi_grid_sweep
from grid_sweep import grid_sweep
@dataclass
class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
pass
class kmeans_lsi_job(kmeans_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class _kmeans_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
print(args)
print(kwargs)
self.lsi_dim = lsi_dim
self.jobtype = kmeans_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, [self.lsi_dim], *args, **kwargs)
def namer(self, *args, **kwargs):
s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
class kmeans_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters
):
super().__init__(kmeans_lsi_job,
_kmeans_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters)
def run_kmeans_lsi_grid_sweep(savefile, inpath, outpath, n_clusters=[500], n_inits=[1], max_iters=[3000], lsi_dimensions="all"):
"""Run kmeans clustering once or more with different parameters.
Usage:
kmeans_clustering_lsi.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH d--lsi_dimensions=<"all"|csv number of LSI dimensions to use> --n_clusters=<csv number of clusters> --n_inits=<csv> --max_iters=<csv>
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit kmeans clusterings.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
n_clusters: one or more numbers of kmeans clusters to select.
n_inits: one or more numbers of different initializations to use for each clustering.
max_iters: one or more numbers of different maximum interations.
"""
obj = kmeans_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
list(map(int,n_clusters)),
list(map(int,n_inits)),
list(map(int,max_iters))
)
obj.run(1)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_kmeans_lsi_grid_sweep)

View File

@@ -1,29 +0,0 @@
from clustering_base import clustering_job, clustering_result
from grid_sweep import grid_sweep
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
class lsi_mixin():
def set_lsi_dims(self, lsi_dims):
self.lsi_dims = lsi_dims
@dataclass
class lsi_result_mixin:
lsi_dimensions:int
class lsi_grid_sweep(grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*.feather"))
else:
lsi_paths = [inpath / (str(dim) + '.feather') for dim in lsi_dimensions]
print(lsi_paths)
lsi_nums = [int(p.stem) for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))

View File

@@ -1,33 +0,0 @@
#!/usr/bin/env python3
import fire
import pandas as pd
from pathlib import Path
import shutil
selection_data="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/clustering/comment_authors_compex_LSI/selection_data.csv"
outpath = 'test_best.feather'
min_clusters=50; max_isolates=7500; min_cluster_size=2
# pick the best clustering according to silhouette score subject to contraints
def pick_best_clustering(selection_data, output, min_clusters, max_isolates, min_cluster_size):
df = pd.read_csv(selection_data,index_col=0)
df = df.sort_values("silhouette_score",ascending=False)
# not sure I fixed the bug underlying this fully or not.
df['n_isolates_str'] = df.n_isolates.str.strip("[]")
df['n_isolates_0'] = df['n_isolates_str'].apply(lambda l: len(l) == 0)
df.loc[df.n_isolates_0,'n_isolates'] = 0
df.loc[~df.n_isolates_0,'n_isolates'] = df.loc[~df.n_isolates_0].n_isolates_str.apply(lambda l: int(l))
best_cluster = df[(df.n_isolates <= max_isolates)&(df.n_clusters >= min_clusters)&(df.min_cluster_size==min_cluster_size)]
best_cluster = best_cluster.iloc[0]
best_lsi_dimensions = best_cluster.lsi_dimensions
print(best_cluster.to_dict())
best_path = Path(best_cluster.outpath) / (str(best_cluster['name']) + ".feather")
shutil.copy(best_path,output)
print(f"lsi dimensions:{best_lsi_dimensions}")
if __name__ == "__main__":
fire.Fire(pick_best_clustering)

View File

@@ -0,0 +1,132 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from dataclasses import dataclass
from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
import numpy as np
import pandas as pd
import fire
import sys
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
class affinity_clustering_result(clustering_result):
damping:float
convergence_iter:int
preference_quantile:float
def do_affinity_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
outpath.parent.mkdir(parents=True,exist_ok=True)
print(outpath)
clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
cluster_data = process_clustering_result(clustering, subreddits)
mat = sim_to_dist(clustering.affinity_matrix_)
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = affinity_clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
def do_affinity_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
outpath.parent.mkdir(parents=True,exist_ok=True)
print(outpath)
clustering = _affinity_clustering(mat, subreddits, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
mat = sim_to_dist(clustering.affinity_matrix_)
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
damping = list(map(float,damping))
convergence_iter = convergence_iter = list(map(int,convergence_iter))
preference_quantile = list(map(float,preference_quantile))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
if J is None:
J = cpu_count()
pool = Pool(J)
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(damping, convergence_iter, preference_quantile)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_affinity_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# similarities = Array('d', mat)
# call pool.starmap
print("running clustering selection")
clustering_data = pool.starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_affinity_clustering)

View File

@@ -0,0 +1,92 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from clustering import _kmeans_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
import numpy as np
import pandas as pd
import fire
import sys
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
def do_clustering(n_clusters, n_init, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
print(outpath)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
outpath.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(outpath)
cluster_data = process_clustering_result(clustering, subreddits)
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = kmeans_clustering_result(outpath=outpath,
max_iter=max_iter,
n_clusters=n_clusters,
n_init = n_init,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
n_clusters = list(map(int,n_clusters))
n_init = list(map(int,n_init))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(n_clusters, n_init)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# call starmap
print("running clustering selection")
clustering_data = starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_kmeans_clustering)

View File

@@ -1,38 +1,7 @@
import pandas as pd
import plotnine as pn
from pathlib import Path
from clustering.fit_tsne import fit_tsne
from visualization.tsne_vis import build_visualization
df = pd.read_csv("/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/hdbscan/selection_data.csv",index_col=0)
# plot silhouette_score as a function of isolates
df = df.sort_values("silhouette_score")
df['n_isolates'] = df.n_isolates.str.split("\n0").apply(lambda rg: int(rg[1]))
p = pn.ggplot(df,pn.aes(x='n_isolates',y='silhouette_score')) + pn.geom_point()
p.save("isolates_x_score.png")
p = pn.ggplot(df,pn.aes(y='n_clusters',x='n_isolates',color='silhouette_score')) + pn.geom_point()
p.save("clusters_x_isolates.png")
# the best result for hdbscan seems like this one: it has a decent number of
# i think I prefer the 'eom' clustering style because larger clusters are less likely to suffer from ommitted variables
best_eom = df[(df.n_isolates <5000)&(df.silhouette_score>0.4)&(df.cluster_selection_method=='eom')&(df.min_cluster_size==2)].iloc[df.shape[1]]
best_lsi = df[(df.n_isolates <5000)&(df.silhouette_score>0.4)&(df.cluster_selection_method=='leaf')&(df.min_cluster_size==2)].iloc[df.shape[1]]
tsne_data = Path("./clustering/authors-tf_lsi850_tsne.feather")
if not tnse_data.exists():
fit_tsne("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather",
tnse_data)
build_visualization("./clustering/authors-tf_lsi850_tsne.feather",
Path(best_eom.outpath)/(best_eom['name']+'.feather'),
"./authors-tf_lsi850_best_eom.html")
build_visualization("./clustering/authors-tf_lsi850_tsne.feather",
Path(best_leaf.outpath)/(best_leaf['name']+'.feather'),
"./authors-tf_lsi850_best_leaf.html")
import fire
from select_affinity import select_affinity_clustering
from select_kmeans import select_kmeans_clustering
if __name__ == "__main__":
fire.Fire({"kmeans":select_kmeans_clustering,
"affinity":select_affinity_clustering})

View File

@@ -8,9 +8,3 @@ all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscrat
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/850.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/850.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather" --agg=pd.DataFrame.sum

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname).hyak.local:7077 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/600.feather --outpath=/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10K_LSI/600.feather --agg=pd.DataFrame.sum
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh
spark-submit --master spark://$(hostname):18899 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --outpath=/gscratch/comdata/output/reddit_density/comment_authors_10000.feather --agg=pd.DataFrame.sum
stop-all.sh

View File

@@ -1,12 +1,11 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy as GroupBy
from pathlib import Path
import fire
import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf
from similarities.similarities_helper import reindex_tfidf, reindex_tfidf_time_interval
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member
@@ -14,12 +13,10 @@ from similarities.similarities_helper import reindex_tfidf
def overlap_density(inpath, outpath, agg = pd.DataFrame.sum):
df = pd.read_feather(inpath)
df = df.drop('_subreddit',1)
df = df.drop('subreddit',1)
np.fill_diagonal(df.values,0)
df = agg(df, 0).reset_index()
df = df.rename({0:'overlap_density'},axis='columns')
outpath = Path(outpath)
outpath.parent.mkdir(parents=True, exist_ok = True)
df.to_feather(outpath)
return df
@@ -28,8 +25,6 @@ def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum):
# exclude the diagonal
df = df.loc[df.subreddit != df.variable]
res = agg(df.groupby(['subreddit','week'])).reset_index()
outpath = Path(outpath)
outpath.parent.mkdir(parents=True, exist_ok = True)
res.to_feather(outpath)
return res

View File

@@ -6,7 +6,7 @@ from os import path
import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
#shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"

View File

@@ -1,12 +1,14 @@
#!/bin/bash
user_agent='"nathante teblunthuis <nathante@uw.edu>"'
user_agent='nathante teblunthuis <nathante@uw.edu>'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments'
base_url='https://files.pushshift.io/reddit/comments/'
wget -r --no-parent -A 'RC_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
# starting in 2020 we use daily dumps not monthly dumps
wget -r --no-parent -A 'RC_202*.gz' -U $user_agent -P $output_dir -nd -nc $base_url/daily/
./check_comments_shas.py

View File

@@ -1,14 +1,14 @@
#!/bin/bash
user_agent='"nathante teblunthuis <nathante@uw.edu>"'
user_agent='nathante teblunthuis <nathante@uw.edu>'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/submissions'
base_url='https://files.pushshift.io/reddit/submissions/'
wget -r --no-parent -A 'RS_20*.bz2' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.xz' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.zst' --user-agent=$user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.bz2' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.xz' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' --user-agent=$user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
./check_submission_shas.py

View File

@@ -13,7 +13,10 @@ from nltk.corpus import stopwords
from nltk.util import ngrams
import string
from random import random
from redditcleaner import clean
# remove urls
# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url
urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)")
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
@@ -92,8 +95,8 @@ def weekly_tf(partition, mwe_pass = 'first'):
# lowercase
text = text.lower()
# redditcleaner removes reddit markdown(newlines, quotes, bullet points, links, strikethrough, spoiler, code, superscript, table, headings)
text = clean(text)
# remove urls
text = urlregex.sub("", text)
# sentence tokenize
sentences = sent_tokenize(text)
@@ -104,13 +107,14 @@ def weekly_tf(partition, mwe_pass = 'first'):
# remove punctuation
sentences = map(remove_punct, sentences)
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
# datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase.
# they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms
# here we take a 10 percent sample of sentences
if mwe_pass == 'first':
# remove sentences with less than 2 words
sentences = filter(lambda sentence: len(sentence) > 2, sentences)
sentences = list(sentences)
for sentence in sentences:
if random() <= 0.1:

View File

@@ -1,7 +1,7 @@
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output
base_data=/gscratch/comdata/output/
similarity_data=${base_data}/reddit_similarity
tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly
@@ -97,7 +97,7 @@ ${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/outpu
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet

View File

@@ -1,16 +1,14 @@
import pandas as pd
import fire
from pathlib import Path
from cdsc_ecology_utils.similarity import similarities, column_similarities
from similarities_helper import similarities, column_similarities
from functools import partial
def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
return cosine_similarities(infile,

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname).hyak.local:7077 lsi_similarities.py author --outfile=/gscratch/comdata/output//reddit_similarity/subreddit_comment_authors_10k_LSI.feather --topN=10000
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh

View File

@@ -1,41 +1,20 @@
import pandas as pd
import fire
from pathlib import Path
from cdsc_ecology_utils.similarity.similarity_functions import lsi_column_similarities, similarities,
#from similarities_helper import similarities, lsi_column_similarities
from similarities_helper import similarities, lsi_column_similarities
from functools import partial
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/"
term_colname='term'
outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI'
n_components=[10,50,100]
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
n_iter=5
random_state=1968
algorithm='arpack'
topN = None
from_date=None
to_date=None
min_df=None
max_df=None
def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
def lsi_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack'):
print(n_components,flush=True)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm)
if lsi_model is None:
if type(n_components) == list:
lsi_model = Path(outfile) / f'{max(n_components)}_{term_colname}_LSIMOD.pkl'
else:
lsi_model = Path(outfile) / f'{n_components}_{term_colname}_LSIMOD.pkl'
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model)
return similarities(inpath=inpath, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
return similarities(infile=infile, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',outfile=None, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, algorithm='arpack', n_components=300,n_iter=5,random_state=1968):
def term_lsi_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
res = lsi_similarities(inpath,
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',
'term',
outfile,
min_df,
@@ -44,13 +23,11 @@ def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfi
topN,
from_date,
to_date,
n_components=n_components,
algorithm = algorithm
n_components=n_components
)
return res
def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
def author_lsi_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
'author',
outfile,
min_df,
@@ -62,8 +39,8 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t
n_components=n_components
)
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath,
def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
'author',
outfile,
min_df,
@@ -73,8 +50,7 @@ def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tf
from_date=from_date,
to_date=to_date,
tfidf_colname='relative_tf',
n_components=n_components,
algorithm=algorithm
n_components=n_components
)

View File

@@ -15,53 +15,27 @@ import numpy as np
import pathlib
from datetime import datetime
from pathlib import Path
import pickle
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
# infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
# cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
def termauthor_tfidf(term_tfidf_callable, author_tfidf_callable):
# subreddits missing after this step don't have any terms that have a high enough idf
# try rewriting without merges
# does reindex_tfidf, but without reindexing.
def reindex_tfidf(*args, **kwargs):
df, tfidf_ds, ds_filter = _pull_or_reindex_tfidf(*args, **kwargs, reindex=True)
print("assigning names")
subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
batches = subreddit_names.to_batches()
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
subreddit_names = subreddit_names.set_index("subreddit_id")
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
return(df, subreddit_names)
def pull_tfidf(*args, **kwargs):
df, _, _ = _pull_or_reindex_tfidf(*args, **kwargs, reindex=False)
return df
def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF, reindex=True):
print(f"loading tfidf {infile}", flush=True)
if week is not None:
tfidf_ds = ds.dataset(infile, partitioning='hive')
else:
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
print("loading tfidf", flush=True)
tfidf_ds = ds.dataset(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(map(str.strip,open(included_subreddits)))
included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
ds_filter = ds.field("subreddit").isin(included_subreddits)
@@ -97,22 +71,15 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
'relative_tf':ds.field('relative_tf').cast('float32'),
'tf_idf':ds.field('tf_idf').cast('float32')}
print(projection)
tfidf_ds = ds.dataset(infile)
df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
df = df.to_pandas(split_blocks=True,self_destruct=True)
print("assigning indexes",flush=True)
if reindex:
df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
else:
df['subreddit_id_new'] = df['subreddit_id']
if reindex:
grouped = df.groupby(term_id)
df[term_id_new] = grouped.ngroup()
else:
df[term_id_new] = df[term_id]
if rescale_idf:
print("computing idf", flush=True)
@@ -124,7 +91,9 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
else: # tf_fam = tf_weight.Norm05
df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
return (df, tfidf_ds, ds_filter)
print("assigning names")
subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
batches = subreddit_names.to_batches()
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
@@ -141,7 +110,7 @@ def _pull_or_reindex_tfidf(infile, term_colname, min_df=None, max_df=None, inclu
def pull_names(batch):
return(batch.to_pandas().drop_duplicates())
def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
'''
tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
'''
@@ -161,7 +130,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
p.parent.mkdir(exist_ok=True, parents=True)
outfile.parent.mkdir(exist_ok=True, parents=True)
sims.to_feather(outfile)
@@ -169,7 +138,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non
term_id = term + '_id'
term_id_new = term + '_id_new'
entries, subreddit_names = reindex_tfidf(inpath, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
print("loading matrix")
@@ -185,7 +154,7 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non
for simmat, name in sims:
proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
else:
proc_sims(sims, outfile)
proc_sims(simmat, outfile)
def write_weekly_similarities(path, sims, week, names):
sims['week'] = week
@@ -238,9 +207,10 @@ def test_lsi_sims():
# if n_components is a list we'll return a list of similarities with different latent dimensionalities
# if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
# this function takes the svd and then the column similarities of it
def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized',lsi_model_save=None,lsi_model_load=None):
def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
# first compute the lsi of the matrix
# then take the column similarities
print("running LSI",flush=True)
if type(n_components) is int:
n_components = [n_components]
@@ -248,23 +218,9 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196
n_components = sorted(n_components,reverse=True)
svd_components = n_components[0]
if lsi_model_load is not None and Path(lsi_model_load).exists():
print("loading LSI")
mod = pickle.load(open(lsi_model_load ,'rb'))
lsi_model_save = lsi_model_load
else:
print("running LSI",flush=True)
svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
mod = svd.fit(tfidfmat.T)
lsimat = mod.transform(tfidfmat.T)
if lsi_model_save is not None:
pickle.dump(mod, open(lsi_model_save,'wb'))
sims_list = []
for n_dims in n_components:
sims = column_similarities(lsimat[:,np.arange(n_dims)])
if len(n_components) > 1:
@@ -301,20 +257,20 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select([term]).distinct() # terms are distinct
terms = idf.select([term,'week']).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
subreddits = df.select(['subreddit','week']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
df = df.join(subreddits,on=['subreddit'])
df = df.join(subreddits,on=['subreddit','week'])
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=[term]) # subreddit-term-id is unique
df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
idf = idf.join(terms,on=[term])
idf = idf.join(terms,on=[term,'week'])
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term,'week'])
@@ -327,7 +283,7 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
df = df.repartition(400,'subreddit','week')
dfwriter = df.write.partitionBy("week")
dfwriter = df.write.partitionBy("week").sortBy("subreddit")
return dfwriter
def _calc_tfidf(df, term_colname, tf_family):
@@ -374,7 +330,7 @@ def _calc_tfidf(df, term_colname, tf_family):
return df
def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
@@ -383,7 +339,7 @@ def tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
df = _calc_tfidf(df, term_colname, tf_family)
df = df.repartition('subreddit')
dfwriter = df.write
dfwriter = df.write.sortBy("subreddit","tf")
return dfwriter
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):

View File

@@ -1,8 +1,7 @@
import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from cdsc_ecology_utils.similarity.similarity_functions import tfidf_dataset, \
build_weekly_tfidf_dataset, select_topN_communities
from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits):
spark = SparkSession.builder.getOrCreate()
@@ -12,9 +11,9 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
df = df.filter(~ f.col(term_colname).isin(exclude))
if included_subreddits is not None:
include_subs = set(map(str.strip,open(included_subreddits)))
include_subs = set(map(str.strip,map(str.lower, open(included_subreddits))))
else:
include_subs = select_topN_communities(topN)
include_subs = select_topN_subreddits(topN)
dfwriter = func(df, include_subs, term_colname)
@@ -22,17 +21,16 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=None,
def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=25000,
included_subreddits=None):
return tfidf(inpath,
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
@@ -40,12 +38,11 @@ def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors
included_subreddits=included_subreddits
)
def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=None,
def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=25000,
included_subreddits=None):
return tfidf(inpath,
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',
@@ -53,12 +50,11 @@ def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.par
included_subreddits=included_subreddits
)
def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=None,
def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=25000,
included_subreddits=None):
return tfidf_weekly(inpath,
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
@@ -66,13 +62,12 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_
included_subreddits=included_subreddits
)
def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=None,
def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=25000,
included_subreddits=None):
return tfidf_weekly(inpath,
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',

View File

@@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit')
#df = df.filter(df.prop_nsfw < 0.5)
df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +26,4 @@ df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False)
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)

114
similarities/weekly_cosine_similarities.py Executable file → Normal file
View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
@@ -9,92 +8,58 @@ import pandas as pd
import fire
from itertools import islice, chain
from pathlib import Path
from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities
from scipy.sparse import csr_matrix
from similarities_helper import *
from multiprocessing import Pool, cpu_count
from functools import partial
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
min_df=None
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None
topN=100
term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms):
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
entries = pull_tfidf(infile = tfidf_path,
entries, subreddit_names = reindex_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week,
rescale_idf=False)
tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
week=week)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
print('computing similarities')
sims = simfunc(mat)
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims)
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
sims['_subreddit'] = names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, subreddit_names)
write_weekly_similarities(outfile, sims, week, names)
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs):
term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities):
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500):
print(outfile)
tfidf_ds = ds.dataset(tfidf_path)
tfidf_ds = tfidf_ds.to_table(columns=["week"])
batches = tfidf_ds.to_batches()
with Pool(cpu_count()) as pool:
weeks = set(chain( * pool.imap_unordered(pull_weeks,batches)))
weeks = sorted(weeks)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(tfidf_path)
# load subreddits + topN
subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id")
nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max
weeks = df.select(f.col("week")).distinct().toPandas().week.values
spark.stop()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms)
week_similarities_helper = partial(_week_similarities,simfunc=column_similarities, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN)
pool = Pool(cpu_count())
with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
list(pool.map(week_similarities_helper,weeks))
list(pool.imap(week_similarities_helper,weeks))
pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly(infile,
def author_cosine_similarities_weekly(outfile, min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
outfile,
'author',
min_df,
@@ -102,8 +67,8 @@ def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile,
def term_cosine_similarities_weekly(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,
@@ -111,33 +76,6 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re
included_subreddits,
topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly
})
'terms':term_cosine_similarities_weekly})

View File

@@ -1,2 +0,0 @@
from .choose_clusters import load_clusters, load_densities
from .cluster_timeseries import build_cluster_timeseries

View File

@@ -2,11 +2,11 @@ import pandas as pd
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from .choose_clusters import load_clusters, load_densities
from choose_clusters import load_clusters, load_densities
import fire
from pathlib import Path
def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
@@ -34,4 +34,4 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":
fire.Fire(build_cluster_timeseries)
fire.Fire(main)

View File

@@ -22,12 +22,8 @@ def base_plot(plot_data):
#
# subreddit_select = alt.selection_single(on='click',fields=['subreddit'],bind=subreddit_dropdown,name='subreddit_click')
base_scale = alt.Scale(scheme={"name":'category10',
"extent":[0,100],
"count":10})
color = alt.condition(cluster_click_select ,
alt.Color(field='color',type='nominal',scale=base_scale),
alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')),
alt.value("lightgray"))
@@ -88,11 +84,6 @@ def viewport_plot(plot_data):
return chart
def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
isolate_color = 101
cluster_sizes = clusters.groupby('cluster').count()
singletons = set(cluster_sizes.loc[cluster_sizes.subreddit == 1].reset_index().cluster)
tsne_data = tsne_data.merge(clusters,on='subreddit')
centroids = tsne_data.groupby('cluster').agg({'x':np.mean,'y':np.mean})
@@ -129,9 +120,6 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
color_assignments = np.repeat(-1,len(centroids))
for i in range(len(centroids)):
if (centroids.iloc[i].name == -1) or (i in singletons):
color_assignments[i] = isolate_color
else:
knn = indices[i]
knn_colors = color_assignments[knn]
available_colors = color_ids[list(set(color_ids) - set(knn_colors))]
@@ -141,6 +129,7 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
else:
raise Exception("Can't color this many neighbors with this many colors")
centroids = centroids.reset_index()
colors = centroids.loc[:,['cluster']]
colors['color'] = color_assignments
@@ -154,13 +143,12 @@ def build_visualization(tsne_data, clusters, output):
# clusters = "/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather"
tsne_data = pd.read_feather(tsne_data)
tsne_data = tsne_data.rename(columns={'_subreddit':'subreddit'})
clusters = pd.read_feather(clusters)
tsne_data = assign_cluster_colors(tsne_data,clusters,10,8)
sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
# sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
# sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
tsne_data = tsne_data.merge(sr_per_cluster,on='cluster')