13
0

Use Latent semantic indexing and hdbscan

This commit is contained in:
Nate E TeBlunthuis 2021-05-02 23:39:55 -07:00
parent 36b24ee933
commit 7df8436067
14 changed files with 835 additions and 373 deletions

View File

@ -2,20 +2,41 @@
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
selection_grid="--max_iter=3000 --convergence_iter=15,30,100 --damping=0.5,0.6,0.7,0.8,0.85,0.9,0.95,0.97,0.99, --preference_quantile=0.1,0.3,0.5,0.7,0.9"
kmeans_selection_grid="--max_iter=3000 --n_init=[10] --n_clusters=[100,500,1000,1500,2000,2500,3000,2350,3500,3570,4000]"
#selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
all:$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv
all:$(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k $(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(selection_grid) -J 20
$(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k/kmeans $(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
$(clustering_data)/subreddit_comment_terms_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv $(selection_grid) -J 20
$(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k/kmeans $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
$(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(selection_grid) -J 20
$(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py kmeans $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv $(kmeans_selection_grid)
affinity_selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
$(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k/affinity $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
$(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k/affinity $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
$(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py affinity $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k/affinity $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv $(affinity_selection_grid) -J 20
clean:
rm -f $(clustering_data)/subreddit_comment_authors-tf_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_terms_10k/affinity/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors-tf_10k/kmeans/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_authors_10k/kmeans/selection_data.csv
rm -f $(clustering_data)/subreddit_comment_terms_10k/kmeans/selection_data.csv
PHONY: clean
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS

View File

@ -3,24 +3,23 @@
import sys
import pandas as pd
import numpy as np
from sklearn.cluster import AffinityPropagation
from sklearn.cluster import AffinityPropagation, KMeans
import fire
from pathlib import Path
from multiprocessing import cpu_count
from dataclasses import dataclass
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)
def affinity_clustering(similarities, *args, **kwargs):
def affinity_clustering(similarities, output, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
return _affinity_clustering(mat, subreddits, *args, **kwargs)
clustering = _affinity_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
cluster_data['algorithm'] = 'affinity'
return(cluster_data)
def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
'''
similarities: feather file with a dataframe of similarity scores
similarities: matrix of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
'''
@ -40,25 +39,32 @@ def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000,
verbose=verbose,
random_state=random_state).fit(mat)
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count()
print(f"the largest cluster has {cluster_sizes.subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
sys.stdout.flush()
cluster_data = process_clustering_result(clustering, subreddits)
output = Path(output)
output.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(output)
print(f"saved {output}")
return clustering
def kmeans_clustering(similarities, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, *args, **kwargs)
cluster_data = process_clustering_result(clustering, subreddits)
return(cluster_data)
def _kmeans_clustering(mat, output, n_clusters, n_init=10, max_iter=100000, random_state=1968, verbose=True):
clustering = KMeans(n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
verbose=verbose
).fit(mat)
return clustering
if __name__ == "__main__":
fire.Fire(affinity_clustering)

View File

@ -0,0 +1,49 @@
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
def sim_to_dist(mat):
dist = 1-mat
dist[dist < 0] = 0
np.fill_diagonal(dist,0)
return dist
def process_clustering_result(clustering, subreddits):
if hasattr(clustering,'n_iter_'):
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
print(f"{(cluster_sizes.loc[cluster_sizes.cluster==-1,['subreddit']])} subreddits are in cluster -1",flush=True)
return cluster_data
@dataclass
class clustering_result:
outpath:Path
max_iter:int
silhouette_score:float
alt_silhouette_score:float
name:str
n_clusters:int
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)

View File

@ -0,0 +1,172 @@
from clustering_base import sim_to_dist, process_clustering_result, clustering_result, read_similarity_mat
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap
import pandas as pd
from sklearn.metrics import silhouette_score, silhouette_samples
from pathlib import Path
from multiprocessing import Pool, cpu_count
import fire
from pyarrow.feather import write_feather
def test_select_hdbscan_clustering():
select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
"test_hdbscan_author30k",
min_cluster_sizes=[2],
min_samples=[1,2],
cluster_selection_epsilons=[0,0.05,0.1,0.15],
cluster_selection_methods=['eom','leaf'],
lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI"
outpath = "test_hdbscan";
min_cluster_sizes=[2,3,4];
min_samples=[1,2,3];
cluster_selection_epsilons=[0,0.1,0.3,0.5];
cluster_selection_methods=['eom'];
lsi_dimensions='all'
@dataclass
class hdbscan_clustering_result(clustering_result):
min_cluster_size:int
min_samples:int
cluster_selection_epsilon:float
cluster_selection_method:str
lsi_dimensions:int
n_isolates:int
silhouette_samples:str
def select_hdbscan_clustering(inpath,
outpath,
outfile=None,
min_cluster_sizes=[2],
min_samples=[1],
cluster_selection_epsilons=[0],
cluster_selection_methods=['eom'],
lsi_dimensions='all'
):
inpath = Path(inpath)
outpath = Path(outpath)
outpath.mkdir(exist_ok=True, parents=True)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*"))
else:
lsi_paths = [inpath / (dim + '.feather') for dim in lsi_dimensions]
lsi_nums = [p.stem for p in lsi_paths]
grid = list(product(lsi_nums,
min_cluster_sizes,
min_samples,
cluster_selection_epsilons,
cluster_selection_methods))
# fix the output file names
names = list(map(lambda t:'_'.join(map(str,t)),grid))
grid = [(inpath/(str(t[0])+'.feather'),outpath/(name + '.feather'), t[0], name) + t[1:] for t, name in zip(grid, names)]
with Pool(int(cpu_count()/4)) as pool:
mods = starmap(hdbscan_clustering, grid)
res = pd.DataFrame(mods)
if outfile is None:
outfile = outpath / "selection_data.csv"
res.to_csv(outfile)
def hdbscan_clustering(similarities, output, lsi_dim, name, min_cluster_size=2, min_samples=1, cluster_selection_epsilon=0, cluster_selection_method='eom'):
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
clustering = _hdbscan_clustering(mat,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method,
metric='precomputed',
core_dist_n_jobs=cpu_count()
)
cluster_data = process_clustering_result(clustering, subreddits)
isolates = clustering.labels_ == -1
scoremat = mat[~isolates][:,~isolates]
score = silhouette_score(scoremat, clustering.labels_[~isolates], metric='precomputed')
cluster_data.to_feather(output)
silhouette_samp = silhouette_samples(mat, clustering.labels_, metric='precomputed')
silhouette_samp = pd.DataFrame({'subreddit':subreddits,'score':silhouette_samp})
silsampout = output.parent / ("silhouette_samples" + output.name)
silhouette_samp.to_feather(silsampout)
result = hdbscan_clustering_result(outpath=output,
max_iter=None,
silhouette_samples=silsampout,
silhouette_score=score,
alt_silhouette_score=score,
name=name,
min_cluster_size=min_cluster_size,
min_samples=min_samples,
cluster_selection_epsilon=cluster_selection_epsilon,
cluster_selection_method=cluster_selection_method,
lsi_dimensions=lsi_dim,
n_isolates=isolates.sum(),
n_clusters=len(set(clustering.labels_))
)
return(result)
# for all runs we should try cluster_selection_epsilon = None
# for terms we should try cluster_selection_epsilon around 0.56-0.66
# for authors we should try cluster_selection_epsilon around 0.98-0.99
def _hdbscan_clustering(mat, *args, **kwargs):
print(f"running hdbscan clustering. args:{args}. kwargs:{kwargs}")
print(mat)
clusterer = hdbscan.HDBSCAN(*args,
**kwargs,
)
clustering = clusterer.fit(mat.astype('double'))
return(clustering)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
distances, indices = nbrs.kneighbors(mat)
d2 = distances[:,-1]
df = pd.DataFrame({'dist':d2})
df = df.sort_values("dist",ascending=False)
df['idx'] = np.arange(0,d2.shape[0]) + 1
p = pn.qplot(x='idx',y='dist',data=df,geom='line') + pn.scales.scale_y_continuous(minor_breaks = np.arange(0,50)/50,
breaks = np.arange(0,10)/10)
p.save(outname,width=16,height=10)
def make_KNN_plots():
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='terms_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
df = pd.read_csv("test_hdbscan/selection_data.csv")
test_select_hdbscan_clustering()
check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)

View File

@ -1,8 +1,8 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from clustering import _affinity_clustering, read_similarity_mat
from dataclasses import dataclass
from clustering import _affinity_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
@ -12,40 +12,69 @@ import fire
import sys
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
class clustering_result:
outpath:Path
class affinity_clustering_result(clustering_result):
damping:float
max_iter:int
convergence_iter:int
preference_quantile:float
silhouette_score:float
alt_silhouette_score:float
name:str
def sim_to_dist(mat):
dist = 1-mat
dist[dist < 0] = 0
np.fill_diagonal(dist,0)
return dist
def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
def do_affinity_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
outpath.parent.mkdir(parents=True,exist_ok=True)
print(outpath)
clustering = _affinity_clustering(mat, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
cluster_data = process_clustering_result(clustering, subreddits)
mat = sim_to_dist(clustering.affinity_matrix_)
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = affinity_clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
def do_affinity_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
outpath.parent.mkdir(parents=True,exist_ok=True)
print(outpath)
clustering = _affinity_clustering(mat, subreddits, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
mat = sim_to_dist(clustering.affinity_matrix_)
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = clustering_result(outpath=outpath,
damping=damping,
@ -58,6 +87,7 @@ def do_clustering(damping, convergence_iter, preference_quantile, name, mat, sub
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
@ -86,7 +116,7 @@ def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max
hyper_grid = product(damping, convergence_iter, preference_quantile)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
_do_clustering = partial(do_affinity_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# similarities = Array('d', mat)
# call pool.starmap
@ -94,6 +124,7 @@ def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max
clustering_data = pool.starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data

View File

@ -0,0 +1,92 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from clustering import _kmeans_clustering, read_similarity_mat, sim_to_dist, process_clustering_result, clustering_result
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
import numpy as np
import pandas as pd
import fire
import sys
@dataclass
class kmeans_clustering_result(clustering_result):
n_clusters:int
n_init:int
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
def do_clustering(n_clusters, n_init, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
print(outpath)
mat = sim_to_dist(mat)
clustering = _kmeans_clustering(mat, outpath, n_clusters, n_init, max_iter, random_state, verbose)
outpath.parent.mkdir(parents=True,exist_ok=True)
cluster_data.to_feather(outpath)
cluster_data = process_clustering_result(clustering, subreddits)
try:
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
except ValueError:
score = None
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
try:
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
except ValueError:
alt_score = None
res = kmeans_clustering_result(outpath=outpath,
max_iter=max_iter,
n_clusters=n_clusters,
n_init = n_init,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_kmeans_clustering(similarities, outdir, outinfo, n_clusters=[1000], max_iter=100000, n_init=10, random_state=1968, verbose=True, alt_similarities=None):
n_clusters = list(map(int,n_clusters))
n_init = list(map(int,n_init))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(n_clusters, n_init)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# call starmap
print("running clustering selection")
clustering_data = starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_kmeans_clustering)

7
clustering/selection.py Normal file
View File

@ -0,0 +1,7 @@
import fire
from select_affinity import select_affinity_clustering
from select_kmeans import select_kmeans_clustering
if __name__ == "__main__":
fire.Fire({"kmeans":select_kmeans_clustering,
"affinity":select_affinity_clustering})

View File

@ -1,25 +1,130 @@
all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms.parquet
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output/
similarity_data=${base_data}/reddit_similarity
tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly
similarity_weekly_data=${similarity_data}/weekly
lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
#${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_130k.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_weekly_130k.parquet
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.feather
${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py terms --topN=10000
${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py authors --topN=10000
${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200
/gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000
/gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet
${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2
${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv
mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
# start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
# /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
# start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py ${tfidf_weekly_data}/comment_authors.parquet
# start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet
/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet

View File

@ -2,12 +2,13 @@ import pandas as pd
import fire
from pathlib import Path
from similarities_helper import similarities, column_similarities
from functools import partial
def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',

View File

@ -1,4 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
stop-all.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname).hyak.local:7077 lsi_similarities.py author --outfile=/gscratch/comdata/output//reddit_similarity/subreddit_comment_authors_10k_LSI.feather --topN=10000
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh

View File

@ -0,0 +1,61 @@
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import similarities, lsi_column_similarities
from functools import partial
def lsi_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack'):
print(n_components,flush=True)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm)
return similarities(infile=infile, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf).
def term_lsi_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',
'term',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date,
to_date,
n_components=n_components
)
def author_lsi_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date=from_date,
to_date=to_date,
n_components=n_components
)
def author_tf_similarities(outfile, min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
return lsi_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
from_date=from_date,
to_date=to_date,
tfidf_colname='relative_tf',
n_components=n_components
)
if __name__ == "__main__":
fire.Fire({'term':term_lsi_similarities,
'author':author_lsi_similarities,
'author-tf':author_tf_similarities})

View File

@ -2,6 +2,7 @@ from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as f
from enum import Enum
from multiprocessing import cpu_count, Pool
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
@ -19,46 +20,16 @@ class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
infile = "/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet"
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet"
cache_file = "/gscratch/comdata/users/nathante/cdsc_reddit/similarities/term_tfidf_entries_bak.parquet"
def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf_weekly = spark.read.parquet(infile)
# create the time interval
if from_date is not None:
if type(from_date) is str:
from_date = datetime.fromisoformat(from_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
if to_date is not None:
if type(to_date) is str:
to_date = datetime.fromisoformat(to_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read_parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
return(tempdir, subreddit_names)
def termauthor_tfidf(term_tfidf_callable, author_tfidf_callable):
# subreddits missing after this step don't have any terms that have a high enough idf
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, tf_family=tf_weight.MaxTF):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
# try rewriting without merges
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, week=None, from_date=None, to_date=None, rescale_idf=True, tf_family=tf_weight.MaxTF):
print("loading tfidf", flush=True)
tfidf_ds = ds.dataset(infile)
if included_subreddits is None:
@ -74,94 +45,116 @@ def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subre
if max_df is not None:
ds_filter &= ds.field("count") <= max_df
if week is not None:
ds_filter &= ds.field("week") == week
if from_date is not None:
ds_filter &= ds.field("week") >= from_date
if to_date is not None:
ds_filter &= ds.field("week") <= to_date
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
projection = {
'subreddit_id':ds.field('subreddit_id'),
term_id:ds.field(term_id),
'relative_tf':ds.field("relative_tf").cast('float32')
}
if not rescale_idf:
projection = {
'subreddit_id':ds.field('subreddit_id'),
term_id:ds.field(term_id),
'relative_tf':ds.field('relative_tf').cast('float32'),
'tf_idf':ds.field('tf_idf').cast('float32')}
tfidf_ds = ds.dataset(infile)
df = tfidf_ds.to_table(filter=ds_filter,columns=projection)
df = df.to_pandas(split_blocks=True,self_destruct=True)
print("assigning indexes",flush=True)
df['subreddit_id_new'] = df.groupby("subreddit_id").ngroup()
grouped = df.groupby(term_id)
df[term_id_new] = grouped.ngroup()
if rescale_idf:
print("computing idf", flush=True)
df['new_count'] = grouped[term_id].transform('count')
N_docs = df.subreddit_id_new.max() + 1
df['idf'] = np.log(N_docs/(1+df.new_count),dtype='float32') + 1
if tf_family == tf_weight.MaxTF:
df["tf_idf"] = df.relative_tf * df.idf
else: # tf_fam = tf_weight.Norm05
df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
print("assigning names")
subreddit_names = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id'])
batches = subreddit_names.to_batches()
with Pool(cpu_count()) as pool:
chunks = pool.imap_unordered(pull_names,batches)
subreddit_names = pd.concat(chunks,copy=False).drop_duplicates()
subreddit_names = subreddit_names.set_index("subreddit_id")
new_ids = df.loc[:,['subreddit_id','subreddit_id_new']].drop_duplicates()
new_ids = new_ids.set_index('subreddit_id')
subreddit_names = subreddit_names.join(new_ids,on='subreddit_id').reset_index()
subreddit_names = subreddit_names.drop("subreddit_id",1)
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
return(df, subreddit_names)
def pull_names(batch):
return(batch.to_pandas().drop_duplicates())
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, from_date=None, to_date=None, tfidf_colname='tf_idf'):
'''
tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
'''
def proc_sims(sims, outfile):
if issparse(sims):
sims = sims.todense()
print(f"shape of sims:{sims.shape}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}",flush=True)
sims = pd.DataFrame(sims)
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
outfile.parent.mkdir(exist_ok=True, parents=True)
sims.to_feather(outfile)
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
df = tfidf_ds.to_table(filter=ds_filter,columns=['subreddit','subreddit_id',term_id,'relative_tf']).to_pandas()
sub_ids = df.subreddit_id.drop_duplicates()
new_sub_ids = pd.DataFrame({'subreddit_id':old,'subreddit_id_new':new} for new, old in enumerate(sorted(sub_ids)))
df = df.merge(new_sub_ids,on='subreddit_id',how='inner',validate='many_to_one')
new_count = df.groupby(term_id)[term_id].aggregate(new_count='count').reset_index()
df = df.merge(new_count,on=term_id,how='inner',validate='many_to_one')
term_ids = df[term_id].drop_duplicates()
new_term_ids = pd.DataFrame({term_id:old,term_id_new:new} for new, old in enumerate(sorted(term_ids)))
df = df.merge(new_term_ids, on=term_id, validate='many_to_one')
N_docs = sub_ids.shape[0]
df['idf'] = np.log(N_docs/(1+df.new_count)) + 1
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df["tf_idf"] = df.relative_tf * df.idf
else: # tf_fam = tf_weight.Norm05
df["tf_idf"] = (0.5 + 0.5 * df.relative_tf) * df.idf
subreddit_names = df.loc[:,['subreddit','subreddit_id_new']].drop_duplicates()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
return(df, subreddit_names)
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
'''
tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
'''
if from_date is not None or to_date is not None:
tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)
else:
entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)))
entries, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN,from_date=from_date,to_date=to_date)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
print("loading matrix")
# mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
print(f'computing similarities on mat. mat.shape:{mat.shape}')
print(f"size of mat is:{mat.data.nbytes}")
print(f"size of mat is:{mat.data.nbytes}",flush=True)
sims = simfunc(mat)
del mat
if issparse(sims):
sims = sims.todense()
print(f"shape of sims:{sims.shape}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
sims = pd.DataFrame(sims)
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
# tempdir.cleanup()
def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
print(f"tfidf_colname:{tfidf_colname}")
entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
if hasattr(sims,'__next__'):
for simmat, name in sims:
proc_sims(simmat, Path(outfile)/(str(name) + ".feather"))
else:
proc_sims(simmat, outfile)
def write_weekly_similarities(path, sims, week, names):
sims['week'] = week
@ -182,155 +175,62 @@ def column_overlaps(mat):
return intersection / den
def test_lsi_sims():
term = "term"
term_id = term + '_id'
term_id_new = term + '_id_new'
t1 = time.perf_counter()
entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet",
term_colname='term',
min_df=2000,
topN=10000
)
t2 = time.perf_counter()
print(f"first load took:{t2 - t1}s")
entries, subreddit_names = reindex_tfidf("/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
term_colname='term',
min_df=2000,
topN=10000
)
t3=time.perf_counter()
print(f"second load took:{t3 - t2}s")
mat = csr_matrix((entries['tf_idf'],(entries[term_id_new], entries.subreddit_id_new)))
sims = list(lsi_column_similarities(mat, [10,50]))
sims_og = sims
sims_test = list(lsi_column_similarities(mat,[10,50],algorithm='randomized',n_iter=10))
# n_components is the latent dimensionality. sklearn recommends 100. More might be better
# if algorithm is 'random' instead of 'arpack' then n_iter gives the number of iterations.
# if n_components is a list we'll return a list of similarities with different latent dimensionalities
# if algorithm is 'randomized' instead of 'arpack' then n_iter gives the number of iterations.
# this function takes the svd and then the column similarities of it
def lsi_column_similarities(tfidfmat,n_components=300,n_iter=5,random_state=1968,algorithm='arpack'):
def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=1968,algorithm='randomized'):
# first compute the lsi of the matrix
# then take the column similarities
svd = TruncatedSVD(n_components=n_components,random_state=random_state,algorithm='arpack')
print("running LSI",flush=True)
if type(n_components) is int:
n_components = [n_components]
n_components = sorted(n_components,reverse=True)
svd_components = n_components[0]
svd = TruncatedSVD(n_components=svd_components,random_state=random_state,algorithm=algorithm,n_iter=n_iter)
mod = svd.fit(tfidfmat.T)
lsimat = mod.transform(tfidfmat.T)
sims = column_similarities(lsimat)
return sims
for n_dims in n_components:
sims = column_similarities(lsimat[:,np.arange(n_dims)])
if len(n_components) > 1:
yield (sims, n_dims)
else:
return sims
def column_similarities(mat):
return 1 - pairwise_distances(mat,metric='cosine')
# if issparse(mat):
# norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
# mat = mat.multiply(1/norm)
# else:
# norm = np.matrix(np.power(np.power(mat,2).sum(axis=0),0.5,dtype=np.float32))
# mat = np.multiply(mat,1/norm)
# sims = mat.T @ mat
# return(sims)
def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
sub_ids = tfidf.select(['subreddit_id','week']).distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
# only use terms in at least min_df included subreddits in a given week
new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
# reset the term ids
term_ids = tfidf.select([term_id,'week']).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
tfidf = tfidf.join(term_ids,[term_id,'week'])
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf = tfidf.repartition('week')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return(tempdir)
def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
# try computing cosine similarities using spark
def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
tfidf = tfidf.cache()
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
# put like 10 subredis in each partition
# step 2 make it into a distributed.RowMatrix
coordMat = CoordinateMatrix(entries)
coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
# this needs to be an IndexedRowMatrix()
mat = coordMat.toRowMatrix()
#goal: build a matrix of subreddit columns and tf-idfs rows
sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
return (sim_dist, tfidf)
def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
@ -382,7 +282,9 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
df = df.repartition(400,'subreddit','week')
dfwriter = df.write.partitionBy("week").sortBy("subreddit")
return dfwriter
def _calc_tfidf(df, term_colname, tf_family):
term = term_colname
@ -393,7 +295,7 @@ def _calc_tfidf(df, term_colname, tf_family):
df = df.join(max_subreddit_terms, on='subreddit')
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
df = df.withColumn("relative_tf", (df.tf / df.sr_max_tf))
# group by term. term is unique
idf = df.groupby([term]).count()
@ -436,8 +338,9 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
df = _calc_tfidf(df, term_colname, tf_family)
return df
df = df.repartition('subreddit')
dfwriter = df.write.sortBy("subreddit","tf")
return dfwriter
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
rankdf = pd.read_csv(path)
@ -445,3 +348,18 @@ def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarit
return included_subreddits
def repartition_tfidf(inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet",
outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k_repartitioned.parquet"):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.repartition(400,'subreddit')
df.write.parquet(outpath,mode='overwrite')
def repartition_tfidf_weekly(inpath="/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet",
outpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_repartitioned.parquet"):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.repartition(400,'subreddit','week')
dfwriter = df.write.partitionBy("week")
dfwriter.parquet(outpath,mode='overwrite')

View File

@ -15,10 +15,9 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
else:
include_subs = select_topN_subreddits(topN)
df = func(df, include_subs, term_colname)
df.write.parquet(outpath,mode='overwrite',compression='snappy')
dfwriter = func(df, include_subs, term_colname)
dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):

View File

@ -3,78 +3,78 @@ from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pyarrow.dataset as ds
import pandas as pd
import fire
from itertools import islice
from itertools import islice, chain
from pathlib import Path
from similarities_helper import *
from multiprocessing import Pool, cpu_count
from functools import partial
def _week_similarities(tempdir, term_colname, week):
print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
print('computing similarities')
sims = column_similarities(mat)
del mat
names = subreddit_names.loc[subreddit_names.week == week]
sims = pd.DataFrame(sims.todense())
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
entries, subreddit_names = reindex_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new], entries.subreddit_id_new)))
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, names)
sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
sims['_subreddit'] = names.subreddit.values
write_weekly_similarities(outfile, sims, week, names)
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500):
print(outfile)
tfidf = spark.read.parquet(tfidf_path)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
tfidf_ds = ds.dataset(tfidf_path)
tfidf_ds = tfidf_ds.to_table(columns=["week"])
batches = tfidf_ds.to_batches()
print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
with Pool(cpu_count()) as pool:
weeks = set(chain( * pool.imap_unordered(pull_weeks,batches)))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df=None, included_subreddits=included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
# the ids can change each week.
subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
weeks = sorted(list(subreddit_names.week.drop_duplicates()))
weeks = sorted(weeks)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
def week_similarities_helper(week):
_week_similarities(tempdir, term_colname, week)
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=column_similarities, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN)
with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
list(pool.map(week_similarities_helper,weeks))
def author_cosine_similarities_weekly(outfile, min_df=2 , included_subreddits=None, topN=500):
def author_cosine_similarities_weekly(outfile, min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,