15
0

refactor clustring in object oriented style

This commit is contained in:
Nate E TeBlunthuis 2021-05-07 22:33:26 -07:00
parent 8d1df5b26e
commit f05cb962e0
4 changed files with 612 additions and 286 deletions

View File

@ -2,7 +2,8 @@ from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from dataclasses import dataclass
from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
@ -17,116 +18,158 @@ class affinity_clustering_result(clustering_result):
damping:float
convergence_iter:int
preference_quantile:float
preference:float
max_iter:int
def affinity_clustering(similarities, output, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
clustering = _affinity_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
cluster_data['algorithm'] = 'affinity'
return(cluster_data)
@dataclass
class affinity_clustering_result_lsi(affinity_clustering_result, lsi_result_mixin):
pass
def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
'''
similarities: matrix of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
'''
print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
class affinity_job(clustering_job):
def __init__(self, infile, outpath, name, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=self._affinity_clustering,
preference_quantile=preference_quantile,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
random_state=1968,
verbose=verbose)
self.damping=damping
self.max_iter=max_iter
self.convergence_iter=convergence_iter
self.preference_quantile=preference_quantile
preference = np.quantile(mat,preference_quantile)
def _affinity_clustering(self, mat, preference_quantile, *args, **kwargs):
mat = 1-mat
preference = np.quantile(mat, preference_quantile)
self.preference = preference
print(f"preference is {preference}")
print("data loaded")
sys.stdout.flush()
clustering = AffinityPropagation(*args,
preference=preference,
affinity='precomputed',
copy=False,
**kwargs).fit(mat)
return clustering
print(f"preference is {preference}")
print("data loaded")
sys.stdout.flush()
clustering = AffinityPropagation(damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
copy=False,
preference=preference,
affinity='precomputed',
verbose=verbose,
random_state=random_state).fit(mat)
def get_info(self):
result = super().get_info()
self.result=affinity_clustering_result(**result.__dict__,
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
preference_quantile=self.preference_quantile,
preference=self.preference)
cluster_data = process_clustering_result(clustering, subreddits)
output = Path(output)
output.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(output)
print(f"saved {output}")
return clustering
return self.result
class affinity_lsi_job(affinity_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
outpath.parent.mkdir(parents=True,exist_ok=True)
print(outpath)
clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
cluster_data = process_clustering_result(clustering, subreddits)
mat = sim_to_dist(clustering.affinity_matrix_)
def get_info(self):
result = super().get_info()
self.result = affinity_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
class affinity_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
super().__init__(affinity_job,
_afffinity_grid_sweep,
inpath,
outpath,
self.namer,
*args,
**kwargs)
def namer(self,
damping,
max_iter,
convergence_iter,
preference_quantile):
return f"damp-{damping}_maxit-{max_iter}_convit-{convergence_iter}_prefq-{preference_quantile}"
class _affinity_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
self.lsi_dim = lsi_dim
self.jobtype = affinity_lsi_job
super().__init__(self.jobtype,
inpath,
outpath,
self.namer,
self.lsi_dim,
*args,
**kwargs)
def namer(self, *args, **kwargs):
s = affinity_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
class affinity_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
dampings=[0.9],
max_iters=[10000],
convergence_iters=[30],
preference_quantiles=[0.5]):
super().__init__(affinity_lsi_job,
_affinity_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
dampings,
max_iters,
convergence_iters,
preference_quantiles)
res = affinity_clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
def test_select_affinity_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_affinity";
dampings=[0.8,0.9]
max_iters=[100000]
convergence_iters=[15]
preference_quantiles=[0.5,0.7]
gs = affinity_lsi_grid_sweep(inpath, 'all', outpath, dampings, max_iters, convergence_iters, preference_quantiles)
gs.run(20)
gs.save("test_affinity/lsi_sweep.csv")
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
damping = list(map(float,damping))
convergence_iter = convergence_iter = list(map(int,convergence_iter))
preference_quantile = list(map(float,preference_quantile))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
if J is None:
J = cpu_count()
pool = Pool(J)
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(damping, convergence_iter, preference_quantile)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# similarities = Array('d', mat)
# call pool.starmap
print("running clustering selection")
clustering_data = pool.starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_affinity_clustering)
fire.Fire{'grid_sweep':affinity_grid_sweep,
'grid_sweep_lsi':affinity_lsi_grid_sweep
'cluster':affinity_job,
'cluster_lsi':affinity_lsi_job}

View File

@ -2,6 +2,9 @@ from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from sklearn.metrics import silhouette_score, silhouette_samples
from itertools import product, chain
from multiprocessing import Pool, cpu_count
def sim_to_dist(mat):
dist = 1-mat
@ -9,41 +12,147 @@ def sim_to_dist(mat):
np.fill_diagonal(dist,0)
return dist
def process_clustering_result(clustering, subreddits):
class grid_sweep:
def __init__(self, jobtype, inpath, outpath, namer, *args):
self.jobtype = jobtype
self.namer = namer
grid = list(product(*args))
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(*g)) + g for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]
if hasattr(clustering,'n_iter_'):
print(f"clustering took {clustering.n_iter_} iterations")
def run(self, cores=20):
if cores is not None and cores > 1:
with Pool(cores) as pool:
infos = pool.map(self.jobtype.get_info, self.jobs)
else:
infos = map(self.jobtype.get_info, self.jobs)
clusters = clustering.labels_
self.infos = pd.DataFrame(infos)
self.hasrun = True
print(f"found {len(set(clusters))} clusters")
def save(self, outcsv):
if not self.hasrun:
self.run()
outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv)
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
class lsi_grid_sweep(grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, *args, **kwargs):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*"))
else:
lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
lsi_nums = [p.stem for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
# this is meant to be an interface, not created directly
class clustering_job:
def __init__(self, infile, outpath, name, call, *args, **kwargs):
self.outpath = Path(outpath)
self.call = call
self.args = args
self.kwargs = kwargs
self.infile = Path(infile)
self.name = name
self.hasrun = False
return cluster_data
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.clustering = self.call(self.mat, *self.args, **self.kwargs)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True
def get_info(self):
if not self.hasrun:
self.run()
self.result = clustering_result(outpath=str(self.outpath.resolve()),
silhouette_score=self.score,
name=self.name,
n_clusters=self.n_clusters,
n_isolates=self.n_isolates,
silhouette_samples = str(self.silsampout.resolve())
)
return self.result
def silhouette(self):
isolates = self.clustering.labels_ == -1
scoremat = self.mat[~isolates][:,~isolates]
score = silhouette_score(scoremat, self.clustering.labels_[~isolates], metric='precomputed')
silhouette_samp = silhouette_samples(self.mat, self.clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':self.subreddits,'score':silhouette_samp})
self.outpath.mkdir(parents=True, exist_ok=True)
self.silsampout = self.outpath / ("silhouette_samples-" + self.name + ".feather")
silhouette_samp.to_feather(self.silsampout)
return score
def read_distance_mat(self, similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,1-mat)
def process_clustering(self, clustering, subreddits):
if hasattr(clustering,'n_iter_'):
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
self.n_clusters = len(set(clusters))
print(f"found {self.n_clusters} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
n_isolates1 = (cluster_sizes.subreddit==1).sum()
print(f"{n_isolates1} clusters have 1 member")
n_isolates2 = (cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])
print(f"{n_isolates2} subreddits are in cluster -1",flush=True)
if n_isolates1 == 0:
self.n_isolates = n_isolates2
else:
self.n_isolates = n_isolates1
return cluster_data
class lsi_mixin():
def set_lsi_dims(self, lsi_dims):
self.lsi_dims = lsi_dims
@dataclass
class clustering_result:
outpath:Path
max_iter:int
silhouette_score:float
alt_silhouette_score:float
name:str
n_clusters:int
n_isolates:int
silhouette_samples:str
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)
@dataclass
class lsi_result_mixin:
lsi_dimensions:int

View File

@ -1,10 +1,11 @@
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap
from itertools import product, starmap, chain
import pandas as pd
from sklearn.metrics import silhouette_score, silhouette_samples
from pathlib import Path
@ -13,27 +14,88 @@ import fire
from pyarrow.feather import write_feather
def test_select_hdbscan_clustering():
select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
"test_hdbscan_author30k",
min_cluster_sizes=[2],
min_samples=[1,2],
cluster_selection_epsilons=[0,0.05,0.1,0.15],
cluster_selection_methods=['eom','leaf'],
lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_hdbscan";
min_cluster_sizes=[2,3,4];
min_samples=[1,2,3];
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
# job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
# job1.run()
# print(job1.get_info())
df = pd.read_csv("test_hdbscan/selection_data.csv")
test_select_hdbscan_clustering()
check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
# df = pd.read_csv("test_hdbscan/selection_data.csv")
# test_select_hdbscan_clustering()
# check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
# silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
# c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
class hdbscan_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods
):
super().__init__(hdbscan_lsi_job,
_hdbscan_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods)
class hdbscan_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(hdbscan_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
min_cluster_size,
min_samples,
cluster_selection_epsilon,
cluster_selection_method):
return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}"
class _hdbscan_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
self.lsi_dim = lsi_dim
self.jobtype = hdbscan_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
def namer(self, *args, **kwargs):
s = hdbscan_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
@dataclass
class hdbscan_clustering_result(clustering_result):
@ -41,107 +103,166 @@ class hdbscan_clustering_result(clustering_result):
min_samples:int
cluster_selection_epsilon:float
cluster_selection_method:str
lsi_dimensions:int
n_isolates:int
silhouette_samples:str
def select_hdbscan_clustering(inpath,
outpath,
outfile=None,
min_cluster_sizes=[2],
min_samples=[1],
cluster_selection_epsilons=[0],
cluster_selection_methods=['eom'],
lsi_dimensions='all'
):
@dataclass
class hdbscan_clustering_result_lsi(hdbscan_clustering_result, lsi_result_mixin):
pass
inpath = Path(inpath)
outpath = Path(outpath)
outpath.mkdir(exist_ok=True, parents=True)
class hdbscan_job(clustering_job):
def __init__(self, infile, outpath, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
super().__init__(infile,
outpath,
name,
call=hdbscan_job._hdbscan_clustering,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method
)
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
self.cluster_selection_epsilon = cluster_selection_epsilon
self.cluster_selection_method = cluster_selection_method
# self.mat = 1 - self.mat
def _hdbscan_clustering(mat, *args, **kwargs):
print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
print(mat)
clusterer = hdbscan.HDBSCAN(metric='precomputed',
core_dist_n_jobs=cpu_count(),
*args,
**kwargs,
)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*"))
clustering = clusterer.fit(mat.astype('double'))
return(clustering)
else:
lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
def get_info(self):
result = super().get_info()
self.result = hdbscan_clustering_result(**result.__dict__,
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
cluster_selection_epsilon=self.cluster_selection_epsilon,
cluster_selection_method=self.cluster_selection_method)
return self.result
lsi_nums = [p.stem for p in lsi_paths]
grid = list(product(lsi_nums,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods))
class hdbscan_lsi_job(hdbscan_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(
infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
# fix the output file names
names = list(map(lambda t:'_'.join(map(str,t)),grid))
def get_info(self):
partial_result = super().get_info()
self.result = hdbscan_clustering_result_lsi(**partial_result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
# def select_hdbscan_clustering(inpath,
# outpath,
# outfile=None,
# min_cluster_sizes=[2],
# min_samples=[1],
# cluster_selection_epsilons=[0],
# cluster_selection_methods=['eom'],
# lsi_dimensions='all'
# ):
# inpath = Path(inpath)
# outpath = Path(outpath)
# outpath.mkdir(exist_ok=True, parents=True)
# if lsi_dimensions is None:
# lsi_paths = [inpath]
# elif lsi_dimensions == 'all':
# lsi_paths = list(inpath.glob("*"))
# else:
# lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
# if lsi_dimensions is not None:
# lsi_nums = [p.stem for p in lsi_paths]
# else:
# lsi_nums = [None]
# grid = list(product(lsi_nums,
# min_cluster_sizes,
# min_samples,
# cluster_selection_epsilons,
# cluster_selection_methods))
# # fix the output file names
# names = list(map(lambda t:'_'.join(map(str,t)),grid))
# grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
with Pool(int(cpu_count()/4)) as pool:
mods = starmap(hdbscan_clustering, grid)
# with Pool(int(cpu_count()/4)) as pool:
# mods = starmap(hdbscan_clustering, grid)
res = pd.DataFrame(mods)
if outfile is None:
outfile = outpath / "selection_data.csv"
# res = pd.DataFrame(mods)
# if outfile is None:
# outfile = outpath / "selection_data.csv"
res.to_csv(outfile)
# res.to_csv(outfile)
def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _hdbscan_clustering(mat,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method,
metric='precomputed',
core_dist_n_jobs=cpu_count()
)
# def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
# subreddits, mat = read_similarity_mat(similarities)
# mat = sim_to_dist(mat)
# clustering = _hdbscan_clustering(mat,
# min_cluster_size=min_cluster_size,
# min_samples=min_samples,
# cluster_selection_epsilon=cluster_selection_epsilon,
# cluster_selection_method=cluster_selection_method,
# metric='precomputed',
# core_dist_n_jobs=cpu_count()
# )
cluster_data = process_clustering_result(clustering, subreddits)
isolates = clustering.labels_ == -1
scoremat = mat[~isolates][:,~isolates]
score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
cluster_data.to_feather(output)
# cluster_data = process_clustering_result(clustering, subreddits)
# isolates = clustering.labels_ == -1
# scoremat = mat[~isolates][:,~isolates]
# score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
# cluster_data.to_feather(output)
# silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
# silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
# silsampout = output.parent / ("silhouette_samples" + output.name)
# silhouette_samp.to_feather(silsampout)
silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
silsampout = output.parent / ("silhouette_samples" + output.name)
silhouette_samp.to_feather(silsampout)
result = hdbscan_clustering_result(outpath=output,
max_iter=None,
silhouette_samples=silsampout,
silhouette_score=score,
alt_silhouette_score=score,
name=name,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method,
lsi_dimensions=lsi_dim,
n_isolates=isolates.sum(),
n_clusters=len(set(clustering.labels_))
)
# result = hdbscan_clustering_result(outpath=output,
# silhouette_samples=silsampout,
# silhouette_score=score,
# name=name,
# min_cluster_size=min_cluster_size,
# min_samples=min_samples,
# cluster_selection_epsilon=cluster_selection_epsilon,
# cluster_selection_method=cluster_selection_method,
# lsi_dimensions=lsi_dim,
# n_isolates=isolates.sum(),
# n_clusters=len(set(clustering.labels_))
# )
return(result)
# return(result)
# for all runs we should try cluster_selection_epsilon = None
# for terms we should try cluster_selection_epsilon around 0.56-0.66
# for authors we should try cluster_selection_epsilon around 0.98-0.99
def _hdbscan_clustering(mat, *args, **kwargs):
print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
# # for all runs we should try cluster_selection_epsilon = None
# # for terms we should try cluster_selection_epsilon around 0.56-0.66
# # for authors we should try cluster_selection_epsilon around 0.98-0.99
# def _hdbscan_clustering(mat, *args, **kwargs):
# print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
print(mat)
clusterer = hdbscan.HDBSCAN(*args,
**kwargs,
)
# print(mat)
# clusterer = hdbscan.HDBSCAN(*args,
# **kwargs,
# )
clustering = clusterer.fit(mat.astype('double'))
# clustering = clusterer.fit(mat.astype('double'))
return(clustering)
# return(clustering)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
@ -172,4 +293,10 @@ def make_KNN_plots():
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
fire.Fire(select_hdbscan_clustering)
fire.Fire{'grid_sweep':hdbscan_grid_sweep,
'grid_sweep_lsi':hdbscan_lsi_grid_sweep
'cluster':hdbscan_job,
'cluster_lsi':hdbscan_lsi_job}
# test_select_hdbscan_clustering()
#fire.Fire(select_hdbscan_clustering)

View File

@ -4,98 +4,145 @@ from pathlib import Path
from multiprocessing import cpu_count
from dataclasses import dataclass
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
from clustering_base import lsi_result_mixin, lsi_mixin, clustering_job, grid_sweep, lsi_grid_sweep
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
max_iter:int
def kmeans_clustering(similarities, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
return(cluster_data)
@dataclass
class kmeans_clustering_result_lsi(kmeans_clustering_result, lsi_result_mixin):
pass
def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
class kmeans_job(clustering_job):
def __init__(self, infile, outpath, name, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
super().__init__(infile,
outpath,
name,
call=kmeans_job._kmeans_clustering,
n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose)
clustering = KMeans(n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose
).fit(mat)
self.n_clusters=n_clusters
self.n_init=n_init
self.max_iter=max_iter
return clustering
def _kmeans_clustering(mat, *args, **kwargs):
def do_clustering(n_clusters, n_init, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
print(outpath)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
clustering = KMeans(*args,
**kwargs,
).fit(mat)
outpath.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(outpath)
cluster_data = process_clustering_result(clustering, subreddits)
return clustering
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result(**result.__dict__,
n_init=n_init,
max_iter=max_iter)
return self.result
class kmeans_lsi_job(kmeans_job, lsi_mixin):
def __init__(self, infile, outpath, name, lsi_dims, *args, **kwargs):
super().__init__(infile,
outpath,
name,
*args,
**kwargs)
super().set_lsi_dims(lsi_dims)
def get_info(self):
result = super().get_info()
self.result = kmeans_clustering_result_lsi(**result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
res = kmeans_clustering_result(outpath=outpath,
max_iter=max_iter,
n_clusters=n_clusters,
n_init = n_init,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
class kmeans_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
*args,
**kwargs):
super().__init__(kmeans_job, inpath, outpath, self.namer, *args, **kwargs)
def namer(self,
n_clusters,
n_init,
max_iter):
return f"nclusters-{n_clusters}_nit-{n_init}_maxit-{max_iter}"
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
class _kmeans_lsi_grid_sweep(grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
*args,
**kwargs):
self.lsi_dim = lsi_dim
self.jobtype = kmeans_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, self.lsi_dim, *args, **kwargs)
n_clusters = list(map(int,n_clusters))
n_init = list(map(int,n_init))
def namer(self, *args, **kwargs):
s = kmeans_grid_sweep.namer(self, *args[1:], **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
if type(outdir) is str:
outdir = Path(outdir)
class kmeans_lsi_grid_sweep(lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters
):
outdir.mkdir(parents=True,exist_ok=True)
super().__init__(kmeans_lsi_job,
_kmeans_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
n_clusters,
n_inits,
max_iters)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
def test_select_kmeans_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI/"
outpath = "test_kmeans";
n_clusters=[200,300,400];
n_init=[1,2,3];
max_iter=[100000]
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
gs = kmeans_lsi_grid_sweep(inpath, 'all', outpath, n_clusters, n_init, max_iter)
gs.run(1)
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(n_clusters, n_init)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
gs = hdbscan_lsi_grid_sweep(inpath, "all", outpath, min_cluster_sizes, min_samples, cluster_selection_epsilons, cluster_selection_methods)
gs.run(20)
gs.save("test_hdbscan/lsi_sweep.csv")
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# call starmap
print("running clustering selection")
clustering_data = starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_kmeans_clustering)
fire.Fire{'grid_sweep':kmeans_grid_sweep,
'grid_sweep_lsi':kmeans_lsi_grid_sweep
'cluster':kmeans_job,
'cluster_lsi':kmeans_lsi_job}