18
0

6 Commits

32 changed files with 796 additions and 322 deletions

3
.gitmodules vendored
View File

@@ -1,3 +0,0 @@
[submodule "cdsc_ecology_utils"]
path = cdsc_ecology_utils
url = code:cdsc_ecology_utils

View File

@@ -1,2 +1,2 @@
from timeseries import load_clusters, load_densities, build_cluster_timeseries from .timeseries import load_clusters, load_densities, build_cluster_timeseries
from cdsc_ecology_utils import similarity_functions

View File

@@ -1,8 +1,11 @@
#srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28' #srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh srun_singularity=srun -p compute-bigmem -A comdata --time=48:00:00 --mem=362G -c 40
similarity_data=/gscratch/comdata/output/reddit_similarity similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering clustering_data=/gscratch/comdata/output/reddit_clustering
kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000] kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10]
hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15] affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
@@ -91,12 +94,28 @@ ${terms_10k_output_lsi}/hdbscan/selection_data.csv:selection.py ${terms_10k_inpu
${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py ${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv:clustering.py ${authors_tf_10k_input_lsi} clustering_base.py hdbscan_clustering.py
$(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid) $(srun_singularity) python3 hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/hdbscan --savefile=${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv $(hdbscan_selection_grid)
${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv:umap_hdbscan_clustering_lsi.py
$(srun_singularity) python3 umap_hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/umap_hdbscan --savefile=${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv $(umap_hdbscan_selection_grid)
${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py ${terms_10k_output_lsi}/best_hdbscan.feather:${terms_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2 $(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py ${authors_tf_10k_output_lsi}/best_hdbscan.feather:${authors_tf_10k_output_lsi}/hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2 $(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
${authors_tf_10k_output_lsi}/best_umap_hdbscan_2.feather:${authors_tf_10k_output_lsi}/umap_hdbscan/selection_data.csv pick_best_clustering.py
$(srun_singularity) python3 pick_best_clustering.py $< $@ --min_clusters=50 --max_isolates=5000 --min_cluster_size=2
best_umap_hdbscan.feather:${authors_tf_10k_output_lsi}/best_umap_hdbscan_2.feather
# {'lsi_dimensions': 700, 'outpath': '/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/umap_hdbscan', 'silhouette_score': 0.27616957, 'name': 'mcs-2_ms-5_cse-0.05_csm-leaf_nn-15_lr-1.0_md-0.1_lc-1_lsi-700', 'n_clusters': 547, 'n_isolates': 2093, 'silhouette_samples': '/gscratch/comdata/output/reddit_clustering/subreddit_comment_authors-tf_10k_LSI/umap_hdbscan/silhouette_samples-mcs-2_ms-5_cse-0.05_csm-leaf_nn-15_lr-1.0_md-0.1_lc-1_lsi-700.feather', 'min_cluster_size': 2, 'min_samples': 5, 'cluster_selection_epsilon': 0.05, 'cluster_selection_method': 'leaf', 'n_neighbors': 15, 'learning_rate': 1.0, 'min_dist': 0.1, 'local_connectivity': 1, 'n_isolates_str': '2093', 'n_isolates_0': False}
best_umap_grid=--min_cluster_sizes=[2] --min_samples=[5] --cluster_selection_epsilons=[0.05] --cluster_selection_methods=[leaf] --n_neighbors=[15] --learning_rate=[1] --min_dist=[0.1] --local_connectivity=[1] --save_step1=True
umap_hdbscan_coords:
python3 umap_hdbscan_clustering_lsi.py --inpath=${authors_tf_10k_input_lsi} --outpath=${authors_tf_10k_output_lsi}/umap_hdbscan --savefile=/dev/null ${best_umap_grid}
clean_affinity: clean_affinity:
rm -f ${authors_10k_output}/affinity/selection_data.csv rm -f ${authors_10k_output}/affinity/selection_data.csv
rm -f ${authors_tf_10k_output}/affinity/selection_data.csv rm -f ${authors_tf_10k_output}/affinity/selection_data.csv
@@ -159,7 +178,7 @@ clean_lsi_terms:
clean: clean_affinity clean_kmeans clean_hdbscan clean: clean_affinity clean_kmeans clean_hdbscan
PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k PHONY: clean clean_affinity clean_kmeans clean_hdbscan clean_authors clean_authors_tf clean_terms terms_10k authors_10k authors_tf_10k best_umap_hdbscan.feather umap_hdbscan_coords
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py # $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS # $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS

View File

@@ -1,3 +1,4 @@
import pickle
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@@ -24,6 +25,13 @@ class clustering_job:
self.outpath.mkdir(parents=True, exist_ok=True) self.outpath.mkdir(parents=True, exist_ok=True)
self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
self.hasrun = True self.hasrun = True
self.cleanup()
def cleanup(self):
self.cluster_data = None
self.mat = None
self.clustering=None
self.subreddits=None
def get_info(self): def get_info(self):
if not self.hasrun: if not self.hasrun:
@@ -57,6 +65,7 @@ class clustering_job:
return score return score
def read_distance_mat(self, similarities, use_threads=True): def read_distance_mat(self, similarities, use_threads=True):
print(similarities)
df = pd.read_feather(similarities, use_threads=use_threads) df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1)) mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0] n = mat.shape[0]
@@ -95,6 +104,38 @@ class clustering_job:
return cluster_data return cluster_data
class twoway_clustering_job(clustering_job):
def __init__(self, infile, outpath, name, call1, call2, args1, args2):
self.outpath = Path(outpath)
self.call1 = call1
self.args1 = args1
self.call2 = call2
self.args2 = args2
self.infile = Path(infile)
self.name = name
self.hasrun = False
self.args = args1|args2
def run(self):
self.subreddits, self.mat = self.read_distance_mat(self.infile)
self.step1 = self.call1(self.mat, **self.args1)
self.clustering = self.call2(self.mat, self.step1, **self.args2)
self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
self.hasrun = True
self.after_run()
self.cleanup()
def after_run():
self.score = self.silhouette()
self.outpath.mkdir(parents=True, exist_ok=True)
print(self.outpath/(self.name+".feather"))
self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
def cleanup(self):
super().cleanup()
self.step1 = None
@dataclass @dataclass
class clustering_result: class clustering_result:
outpath:Path outpath:Path

View File

@@ -31,3 +31,19 @@ class grid_sweep:
outcsv = Path(outcsv) outcsv = Path(outcsv)
outcsv.parent.mkdir(parents=True, exist_ok=True) outcsv.parent.mkdir(parents=True, exist_ok=True)
self.infos.to_csv(outcsv) self.infos.to_csv(outcsv)
class twoway_grid_sweep(grid_sweep):
def __init__(self, jobtype, inpath, outpath, namer, args1, args2, *args, **kwargs):
self.jobtype = jobtype
self.namer = namer
prod1 = product(* args1.values())
prod2 = product(* args2.values())
grid1 = [dict(zip(args1.keys(), pargs)) for pargs in prod1]
grid2 = [dict(zip(args2.keys(), pargs)) for pargs in prod2]
grid = product(grid1, grid2)
inpath = Path(inpath)
outpath = Path(outpath)
self.hasrun = False
self.grid = [(inpath,outpath,namer(**(g[0] | g[1])), g[0], g[1], *args) for g in grid]
self.jobs = [jobtype(*g) for g in self.grid]

View File

@@ -1,5 +1,5 @@
from clustering_base import clustering_job, clustering_result from clustering_base import clustering_job, clustering_result
from grid_sweep import grid_sweep from grid_sweep import grid_sweep, twoway_grid_sweep
from dataclasses import dataclass from dataclasses import dataclass
from itertools import chain from itertools import chain
from pathlib import Path from pathlib import Path
@@ -27,3 +27,18 @@ class lsi_grid_sweep(grid_sweep):
self.hasrun = False self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)] self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, *args, **kwargs) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids))) self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))
class twoway_lsi_grid_sweep(twoway_grid_sweep):
def __init__(self, jobtype, subsweep, inpath, lsi_dimensions, outpath, args1, args2):
self.jobtype = jobtype
self.subsweep = subsweep
inpath = Path(inpath)
if lsi_dimensions == 'all':
lsi_paths = list(inpath.glob("*.feather"))
else:
lsi_paths = [inpath / (str(dim) + '.feather') for dim in lsi_dimensions]
lsi_nums = [int(p.stem) for p in lsi_paths]
self.hasrun = False
self.subgrids = [self.subsweep(lsi_path, outpath, lsi_dim, args1, args2) for lsi_dim, lsi_path in zip(lsi_nums, lsi_paths)]
self.jobs = list(chain(*map(lambda gs: gs.jobs, self.subgrids)))

View File

@@ -0,0 +1,230 @@
from clustering_base import clustering_result, clustering_job, twoway_clustering_job
from hdbscan_clustering import hdbscan_clustering_result
import umap
from grid_sweep import twoway_grid_sweep
from dataclasses import dataclass
import hdbscan
from sklearn.neighbors import NearestNeighbors
import plotnine as pn
import numpy as np
from itertools import product, starmap, chain
import pandas as pd
from multiprocessing import cpu_count
import fire
def test_select_hdbscan_clustering():
# select_hdbscan_clustering("/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_30k_LSI",
# "test_hdbscan_author30k",
# min_cluster_sizes=[2],
# min_samples=[1,2],
# cluster_selection_epsilons=[0,0.05,0.1,0.15],
# cluster_selection_methods=['eom','leaf'],
# lsi_dimensions='all')
inpath = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k_LSI"
outpath = "test_umap_hdbscan_lsi"
min_cluster_sizes=[2,3,4]
min_samples=[1,2,3]
cluster_selection_epsilons=[0,0.1,0.3,0.5]
cluster_selection_methods=[1]
lsi_dimensions='all'
n_neighbors = [5,10,15,25,35,70,100]
learning_rate = [0.1,0.5,1,2]
min_dist = [0.5,1,1.5,2]
local_connectivity = [1,2,3,4,5]
hdbscan_params = {"min_cluster_sizes":min_cluster_sizes, "min_samples":min_samples, "cluster_selection_epsilons":cluster_selection_epsilons, "cluster_selection_methods":cluster_selection_methods}
umap_params = {"n_neighbors":n_neighbors, "learning_rate":learning_rate, "min_dist":min_dist, "local_connectivity":local_connectivity}
gs = umap_hdbscan_grid_sweep(inpath, "all", outpath, hdbscan_params,umap_params)
# gs.run(20)
# gs.save("test_hdbscan/lsi_sweep.csv")
# job1 = hdbscan_lsi_job(infile=inpath, outpath=outpath, name="test", lsi_dims=500, min_cluster_size=2, min_samples=1,cluster_selection_epsilon=0,cluster_selection_method='eom')
# job1.run()
# print(job1.get_info())
# df = pd.read_csv("test_hdbscan/selection_data.csv")
# test_select_hdbscan_clustering()
# check_clusters = pd.read_feather("test_hdbscan/500_2_2_0.1_eom.feather")
# silscores = pd.read_feather("test_hdbscan/silhouette_samples500_2_2_0.1_eom.feather")
# c = check_clusters.merge(silscores,on='subreddit')# fire.Fire(select_hdbscan_clustering)
class umap_hdbscan_grid_sweep(twoway_grid_sweep):
def __init__(self,
inpath,
outpath,
umap_params,
hdbscan_params):
super().__init__(umap_hdbscan_job, inpath, outpath, self.namer, umap_params, hdbscan_params)
def namer(self,
min_cluster_size,
min_samples,
cluster_selection_epsilon,
cluster_selection_method,
n_components,
n_neighbors,
learning_rate,
min_dist,
local_connectivity,
densmap
):
return f"mcs-{min_cluster_size}_ms-{min_samples}_cse-{cluster_selection_epsilon}_csm-{cluster_selection_method}_nc-{n_components}_nn-{n_neighbors}_lr-{learning_rate}_md-{min_dist}_lc-{local_connectivity}_dm-{densmap}"
@dataclass
class umap_hdbscan_clustering_result(hdbscan_clustering_result):
n_components:int
n_neighbors:int
learning_rate:float
min_dist:float
local_connectivity:int
densmap:bool
class umap_hdbscan_job(twoway_clustering_job):
def __init__(self, infile, outpath, name,
umap_args = {"n_components":2,"n_neighbors":15, "learning_rate":1, "min_dist":1, "local_connectivity":1,'densmap':False},
hdbscan_args = {"min_cluster_size":2, "min_samples":1, "cluster_selection_epsilon":0, "cluster_selection_method":'eom'},
*args,
**kwargs):
super().__init__(infile,
outpath,
name,
call1=umap_hdbscan_job._umap_embedding,
call2=umap_hdbscan_job._hdbscan_clustering,
args1=umap_args,
args2=hdbscan_args,
*args,
**kwargs
)
self.n_components = umap_args['n_components']
self.n_neighbors = umap_args['n_neighbors']
self.learning_rate = umap_args['learning_rate']
self.min_dist = umap_args['min_dist']
self.local_connectivity = umap_args['local_connectivity']
self.densmap = umap_args['densmap']
self.min_cluster_size = hdbscan_args['min_cluster_size']
self.min_samples = hdbscan_args['min_samples']
self.cluster_selection_epsilon = hdbscan_args['cluster_selection_epsilon']
self.cluster_selection_method = hdbscan_args['cluster_selection_method']
def after_run(self):
coords = self.step1.emedding_
self.cluster_data['x'] = coords[:,0]
self.cluster_data['y'] = coords[:,1]
super().after_run()
def _umap_embedding(mat, **umap_args):
print(f"running umap embedding. umap_args:{umap_args}")
umapmodel = umap.UMAP(metric='precomputed', **umap_args)
umapmodel = umapmodel.fit(mat)
return umapmodel
def _hdbscan_clustering(mat, umapmodel, **hdbscan_args):
print(f"running hdbascan clustering. hdbscan_args:{hdbscan_args}")
umap_coords = umapmodel.transform(mat)
clusterer = hdbscan.HDBSCAN(metric='euclidean',
core_dist_n_jobs=cpu_count(),
**hdbscan_args
)
clustering = clusterer.fit(umap_coords)
return(clustering)
def get_info(self):
result = super().get_info()
self.result = umap_hdbscan_clustering_result(**result.__dict__,
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples,
cluster_selection_epsilon=self.cluster_selection_epsilon,
cluster_selection_method=self.cluster_selection_method,
n_components = self.n_components,
n_neighbors = self.n_neighbors,
learning_rate = self.learning_rate,
min_dist = self.min_dist,
local_connectivity=self.local_connectivity,
densmap=self.densmap
)
return self.result
def run_umap_hdbscan_grid_sweep(savefile, inpath, outpath, n_neighbors = [15], n_components=[2], learning_rate=[1], min_dist=[1], local_connectivity=[1],
densmap=[False],
min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom']):
"""Run umap + hdbscan clustering once or more with different parameters.
Usage:
umap_hdbscan_clustering.py --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --n_neighbors=<csv> --learning_rate=<csv> --min_dist=<csv> --local_connectivity=<csv> --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=<csv "eom"|"leaf">
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to feather data containing a labeled matrix of subreddit similarities.
outpath: path to output fit kmeans clusterings.
n_neighbors: umap parameter takes integers greater than 1
learning_rate: umap parameter takes positive real values
min_dist: umap parameter takes positive real values
local_connectivity: umap parameter takes positive integers
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilon: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_method: "eom" or "leaf" eom gives larger clusters.
"""
umap_args = {'n_neighbors':list(map(int, n_neighbors)),
'learning_rate':list(map(float,learning_rate)),
'min_dist':list(map(float,min_dist)),
'local_connectivity':list(map(int,local_connectivity)),
'n_components':list(map(int, n_components)),
'densmap':list(map(bool,densmap))
}
hdbscan_args = {'min_cluster_size':list(map(int,min_cluster_sizes)),
'min_samples':list(map(int,min_samples)),
'cluster_selection_epsilon':list(map(float,cluster_selection_epsilons)),
'cluster_selection_method':cluster_selection_methods}
obj = umap_hdbscan_grid_sweep(inpath,
outpath,
umap_args,
hdbscan_args)
obj.run(cores=10)
obj.save(savefile)
def KNN_distances_plot(mat,outname,k=2):
nbrs = NearestNeighbors(n_neighbors=k,algorithm='auto',metric='precomputed').fit(mat)
distances, indices = nbrs.kneighbors(mat)
d2 = distances[:,-1]
df = pd.DataFrame({'dist':d2})
df = df.sort_values("dist",ascending=False)
df['idx'] = np.arange(0,d2.shape[0]) + 1
p = pn.qplot(x='idx',y='dist',data=df,geom='line') + pn.scales.scale_y_continuous(minor_breaks = np.arange(0,50)/50,
breaks = np.arange(0,10)/10)
p.save(outname,width=16,height=10)
def make_KNN_plots():
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='terms_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors_knn_dist2.png')
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors-tf_10k.feather"
subreddits, mat = read_similarity_mat(similarities)
mat = sim_to_dist(mat)
KNN_distances_plot(mat,k=2,outname='authors-tf_knn_dist2.png')
if __name__ == "__main__":
fire.Fire(run_umap_hdbscan_grid_sweep)
# test_select_hdbscan_clustering()
#fire.Fire(select_hdbscan_clustering)

View File

@@ -0,0 +1,113 @@
from umap_hdbscan_clustering import umap_hdbscan_job, umap_hdbscan_grid_sweep, umap_hdbscan_clustering_result
from lsi_base import twoway_lsi_grid_sweep, lsi_mixin, lsi_result_mixin
from grid_sweep import twoway_grid_sweep
import fire
from dataclasses import dataclass
@dataclass
class umap_hdbscan_clustering_result_lsi(umap_hdbscan_clustering_result, lsi_result_mixin):
pass
class umap_hdbscan_lsi_job(umap_hdbscan_job, lsi_mixin):
def __init__(self, infile, outpath, name, umap_args, hdbscan_args, lsi_dims):
super().__init__(
infile,
outpath,
name,
umap_args,
hdbscan_args
)
super().set_lsi_dims(lsi_dims)
def get_info(self):
partial_result = super().get_info()
self.result = umap_hdbscan_clustering_result_lsi(**partial_result.__dict__,
lsi_dimensions=self.lsi_dims)
return self.result
class umap_hdbscan_lsi_grid_sweep(twoway_lsi_grid_sweep):
def __init__(self,
inpath,
lsi_dims,
outpath,
umap_args,
hdbscan_args
):
super().__init__(umap_hdbscan_lsi_job,
_umap_hdbscan_lsi_grid_sweep,
inpath,
lsi_dims,
outpath,
umap_args,
hdbscan_args
)
class _umap_hdbscan_lsi_grid_sweep(twoway_grid_sweep):
def __init__(self,
inpath,
outpath,
lsi_dim,
umap_args,
hdbscan_args,
):
self.lsi_dim = lsi_dim
self.jobtype = umap_hdbscan_lsi_job
super().__init__(self.jobtype, inpath, outpath, self.namer, umap_args, hdbscan_args, lsi_dim)
def namer(self, *args, **kwargs):
s = umap_hdbscan_grid_sweep.namer(self, *args, **kwargs)
s += f"_lsi-{self.lsi_dim}"
return s
def run_umap_hdbscan_lsi_grid_sweep(savefile, inpath, outpath, n_neighbors = [15], n_components=[2], learning_rate=[1], min_dist=[1], local_connectivity=[1],
densmap=[False],
min_cluster_sizes=[2], min_samples=[1], cluster_selection_epsilons=[0], cluster_selection_methods=['eom'], lsi_dimensions='all'):
"""Run hdbscan clustering once or more with different parameters.
Usage:
hdbscan_clustering_lsi --savefile=SAVEFILE --inpath=INPATH --outpath=OUTPATH --min_cluster_sizes=<csv> --min_samples=<csv> --cluster_selection_epsilons=<csv> --cluster_selection_methods=[eom]> --lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
Keword arguments:
savefile: path to save the metadata and diagnostics
inpath: path to folder containing feather files with LSI similarity labeled matrices of subreddit similarities.
outpath: path to output fit clusterings.
min_cluster_sizes: one or more integers indicating the minumum cluster size
min_samples: one ore more integers indicating the minimum number of samples used in the algorithm
cluster_selection_epsilons: one or more similarity thresholds for transition from dbscan to hdbscan
cluster_selection_methods: one or more of "eom" or "leaf" eom gives larger clusters.
lsi_dimensions: either "all" or one or more available lsi similarity dimensions at INPATH.
"""
umap_args = {'n_neighbors':list(map(int, n_neighbors)),
'learning_rate':list(map(float,learning_rate)),
'min_dist':list(map(float,min_dist)),
'local_connectivity':list(map(int,local_connectivity)),
'n_components':list(map(int, n_components)),
'densmap':list(map(bool,densmap))
}
hdbscan_args = {'min_cluster_size':list(map(int,min_cluster_sizes)),
'min_samples':list(map(int,min_samples)),
'cluster_selection_epsilon':list(map(float,cluster_selection_epsilons)),
'cluster_selection_method':cluster_selection_methods}
obj = umap_hdbscan_lsi_grid_sweep(inpath,
lsi_dimensions,
outpath,
umap_args,
hdbscan_args
)
obj.run(10)
obj.save(savefile)
if __name__ == "__main__":
fire.Fire(run_umap_hdbscan_lsi_grid_sweep)

View File

@@ -1,26 +0,0 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=tf_subreddit_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
source ./bin/activate
module load parallel_sql
echo $(which perl)
conda list pyarrow
which python3
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env bash
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete ## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh echo "#!/usr/bin/bash" > job_script.sh
#echo "source $(pwd)/../bin/activate" >> job_script.sh #echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py

View File

@@ -1,12 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import json import json
from datetime import datetime from datetime import datetime
from multiprocessing import Pool from multiprocessing import Pool
from itertools import islice from itertools import islice
from helper import find_dumps, open_fileset from helper import open_input_file, find_dumps
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
from pathlib import Path
import fire
def parse_comment(comment, names= None): def parse_comment(comment, names= None):
if names is None: if names is None:
@@ -46,19 +49,14 @@ def parse_comment(comment, names= None):
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" def parse_dump(partition):
files = list(find_dumps(dumpdir, base_pattern="RC_20*")) dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
pool = Pool(28) stream = open_input_file(dumpdir)
rows = map(parse_comment, stream)
stream = open_fileset(files) schema = pa.schema([
N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
schema = pa.schema([
pa.field('id', pa.string(), nullable=True), pa.field('id', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True),
pa.field('link_id', pa.string(), nullable=True), pa.field('link_id', pa.string(), nullable=True),
@@ -76,35 +74,18 @@ schema = pa.schema([
pa.field('is_submitter', pa.bool_(), nullable=True), pa.field('is_submitter', pa.bool_(), nullable=True),
pa.field('body', pa.string(), nullable=True), pa.field('body', pa.string(), nullable=True),
pa.field('error', pa.string(), nullable=True), pa.field('error', pa.string(), nullable=True),
]) ])
from pathlib import Path p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") p.mkdir(exist_ok=True,parents=True)
if not p.is_dir(): N=10000
if p.exists(): with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
p.unlink() schema=schema,
p.mkdir() compression='snappy',
flavor='spark') as writer:
else: while True:
list(map(Path.unlink,p.glob('*')))
part_size = int(1e7)
part = 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
while True:
if n_output > part_size:
if part > 1:
writer.close()
part = part + 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
n_output += N
chunk = islice(rows,N) chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names) pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema) table = pa.Table.from_pandas(pddf,schema=schema)
@@ -112,4 +93,19 @@ while True:
break break
writer.write_table(table) writer.write_table(table)
writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
with open("comments_task_list.sh",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
if __name__ == '__main__':
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@@ -2,12 +2,19 @@
# spark script to make sorted, and partitioned parquet files # spark script to make sorted, and partitioned parquet files
import pyspark
from pyspark.sql import functions as f from pyspark.sql import functions as f
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy') conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
conf = conf.set("spark.sql.shuffle.partitions",2000)
conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200)
sc = spark.sparkContext
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit') df = df.drop('subreddit')
@@ -21,9 +28,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit') df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy') df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
df = df.repartition('author') df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy') df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')

View File

@@ -24,8 +24,7 @@ def open_fileset(files):
for fh in files: for fh in files:
print(fh) print(fh)
lines = open_input_file(fh) lines = open_input_file(fh)
for line in lines: yield from lines
yield line
def open_input_file(input_filename): def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename): if re.match(r'.*\.7z$', input_filename):
@@ -39,7 +38,7 @@ def open_input_file(input_filename):
elif re.match(r'.*\.xz', input_filename): elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename] cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename): elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename] cmd = ['/kloneusr/bin/zstd','-dck', input_filename, '--memory=2048MB --stdout']
elif re.match(r'.*\.gz',input_filename): elif re.match(r'.*\.gz',input_filename):
cmd = ['gzip','-dc', input_filename] cmd = ['gzip','-dc', input_filename]
try: try:

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash #!/usr/bin/bash
start_spark_cluster.sh start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000 singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py
stop-all.sh singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh

4
datasets/submissions_2_parquet.sh Normal file → Executable file
View File

@@ -1,8 +1,8 @@
#!/usr/bin/env bash
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs ## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash
./parse_submissions.sh srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@@ -3,26 +3,23 @@
# two stages: # two stages:
# 1. from gz to arrow parquet (this script) # 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
from datetime import datetime from datetime import datetime
from multiprocessing import Pool from pathlib import Path
from itertools import islice from itertools import islice
from helper import find_dumps, open_fileset from helper import find_dumps, open_fileset
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
import simdjson
import fire import fire
import os import os
import json
parser = simdjson.Parser()
def parse_submission(post, names = None): def parse_submission(post, names = None):
if names is None: if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try: try:
post = parser.parse(post) post = json.loads(post)
except (ValueError) as e: except (ValueError) as e:
# print(e) # print(e)
# print(post) # print(post)
@@ -92,8 +89,7 @@ def parse_dump(partition):
pa.field('quarantine',pa.bool_(),nullable=True), pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)]) pa.field('error',pa.string(),nullable=True)])
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"): Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer: with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True: while True:
@@ -108,7 +104,7 @@ def parse_dump(partition):
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"): def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*")) files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("parse_submissions_task_list",'w') as of: with open("submissions_task_list.sh",'w') as of:
for fpath in files: for fpath in files:
partition = os.path.split(fpath)[1] partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n') of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')

View File

@@ -4,9 +4,9 @@ from pathlib import Path
import fire import fire
import numpy as np import numpy as np
import sys import sys
sys.path.append("..") # sys.path.append("..")
sys.path.append("../similarities") # sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf # from similarities.similarities_helper import pull_tfidf
# this is the mean of the ratio of the overlap to the focal size. # this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member # mean shared membership per focal community member

View File

@@ -8,7 +8,7 @@ import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
#shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text #shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2 shasums = shasums1
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
for l in shasums.strip().split('\n'): for l in shasums.strip().split('\n'):

View File

@@ -1,8 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
module load parallel_sql
source ./bin/activate source ./bin/activate
python3 tf_comments.py gen_task_list python3 tf_comments.py gen_task_list
psu --del --Y
cat tf_task_list | psu --load
for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done; for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done;

View File

@@ -2,12 +2,17 @@
from pyspark.sql import functions as f from pyspark.sql import functions as f
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import fire
spark = SparkSession.builder.getOrCreate() def main(inparquet, outparquet, colname):
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/") spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inparquet)
df = df.repartition(2000,'term') df = df.repartition(2000,colname)
df = df.sort(['term','week','subreddit']) df = df.sort([colname,'week','subreddit'])
df = df.sortWithinPartitions(['term','week','subreddit']) df = df.sortWithinPartitions([colname,'week','subreddit'])
df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') df.write.parquet(outparquet,mode='overwrite',compression='snappy')
if __name__ == '__main__':
fire.Fire(main)

View File

@@ -14,21 +14,29 @@ from nltk.util import ngrams
import string import string
from random import random from random import random
from redditcleaner import clean from redditcleaner import clean
from pathlib import Path
# compute term frequencies for comments in each subreddit by week # compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'): def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"): dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") outputdir = Path(outputdir)
samppath = outputdir / "reddit_comment_ngrams_10p_sample"
if not samppath.exists():
samppath.mkdir(parents=True, exist_ok=True)
ngram_output = partition.replace("parquet","txt") ngram_output = partition.replace("parquet","txt")
if excluded_users is not None:
excluded_users = set(map(str.strip,open(excluded_users)))
df = df.filter(~ (f.col("author").isin(excluded_users)))
ngram_path = samppath / ngram_output
if mwe_pass == 'first': if mwe_pass == 'first':
if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"): if ngram_path.exists():
os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}") ngram_path.unlink()
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@@ -62,8 +70,10 @@ def weekly_tf(partition, mwe_pass = 'first'):
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
mwe_path = outputdir / "multiword_expressions.feather"
if mwe_pass != 'first': if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather') mwe_dataset = pd.read_feather(mwe_path)
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False) mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase) mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases] mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -115,7 +125,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
for sentence in sentences: for sentence in sentences:
if random() <= 0.1: if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4)))) grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file: with open(ngram_path,'a') as gram_file:
for ng in grams: for ng in grams:
gram_file.write(' '.join(ng) + '\n') gram_file.write(' '.join(ng) + '\n')
for token in sentence: for token in sentence:
@@ -150,7 +160,14 @@ def weekly_tf(partition, mwe_pass = 'first'):
outchunksize = 10000 outchunksize = 10000
with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: termtf_outputdir = (outputdir / "comment_terms")
termtf_outputdir.mkdir(parents=True, exist_ok=True)
authortf_outputdir = (outputdir / "comment_authors")
authortf_outputdir.mkdir(parents=True, exist_ok=True)
termtf_path = termtf_outputdir / partition
authortf_path = authortf_outputdir / partition
with pq.ParquetWriter(termtf_path, schema=schema, compression='snappy', flavor='spark') as writer, \
pq.ParquetWriter(authortf_path, schema=author_schema, compression='snappy', flavor='spark') as author_writer:
while True: while True:
@@ -179,12 +196,12 @@ def weekly_tf(partition, mwe_pass = 'first'):
author_writer.close() author_writer.close()
def gen_task_list(mwe_pass='first'): def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
with open("tf_task_list",'w') as outfile: with open(tf_task_list,'w') as outfile:
for f in files: for f in files:
if f.endswith(".parquet"): if f.endswith(".parquet"):
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n") outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
if __name__ == "__main__": if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list, fire.Fire({"gen_task_list":gen_task_list,

91
ngrams/top_comment_phrases.py Normal file → Executable file
View File

@@ -1,58 +1,69 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f from pyspark.sql import functions as f
from pyspark.sql import Window from pyspark.sql import Window
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import numpy as np import numpy as np
import fire
spark = SparkSession.builder.getOrCreate() from pathlib import Path
df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
df = df.withColumnRenamed("value","phrase")
# count phrase occurrances
phrases = df.groupby('phrase').count()
phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
# count overall def main(ngram_dir="/gscratch/comdata/output/reddit_ngrams"):
N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount spark = SparkSession.builder.getOrCreate()
ngram_dir = Path(ngram_dir)
ngram_sample = ngram_dir / "reddit_comment_ngrams_10p_sample"
df = spark.read.text(str(ngram_sample))
print(f'analyzing PMI on a sample of {N} phrases') df = df.withColumnRenamed("value","phrase")
logN = np.log(N)
phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
# count term occurrances # count phrase occurrances
phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' ')) phrases = df.groupby('phrase').count()
terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')]) phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
win = Window.partitionBy('term') # count overall
terms = terms.withColumn('termCount',f.sum('phraseCount').over(win)) N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
terms = terms.withColumnRenamed('count','termCount')
terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb') print(f'analyzing PMI on a sample of {N} phrases')
terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb') logN = np.log(N)
terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb')) phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
# join phrases to term counts # count term occurrances
phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
win = Window.partitionBy('term')
terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
terms = terms.withColumnRenamed('count','termCount')
terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
# join phrases to term counts
df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI']) df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
df = df.sort(['phrasePWMI'],descending=True) df = df.sort(['phrasePWMI'],descending=True)
df = df.sortWithinPartitions(['phrasePWMI'],descending=True) df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy')
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/") pwmi_dir = ngram_dir / "reddit_comment_ngrams_pwmi.parquet/"
df.write.parquet(str(pwmi_dir), mode='overwrite', compression='snappy')
df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none') df = spark.read.parquet(str(pwmi_dir))
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet") df.write.csv(str(ngram_dir / "reddit_comment_ngrams_pwmi.csv/"),mode='overwrite',compression='none')
df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions. df = spark.read.parquet(str(pwmi_dir))
# df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas() # choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather") #
df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv") df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas()
df.to_feather(ngram_dir / "multiword_expressions.feather")
df.to_csv(ngram_dir / "multiword_expressions.csv")
if __name__ == '__main__':
fire.Fire(main)

View File

@@ -1,8 +1,10 @@
#all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet #all: /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms_130k.parquet /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_130k.parquet
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh # srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh # srun_singularity_huge=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity_huge.sh
base_data=/gscratch/comdata/output srun=srun -p compute-bigmem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40
similarity_data=${base_data}/reddit_similarity srun_huge=srun -p compute-hugemem -A comdata --mem-per-cpu=9g --time=200:00:00 -c 40
similarity_data=/gscratch/scrubbed/comdata/reddit_similarity
tfidf_data=${similarity_data}/tfidf tfidf_data=${similarity_data}/tfidf
tfidf_weekly_data=${similarity_data}/tfidf_weekly tfidf_weekly_data=${similarity_data}/tfidf_weekly
similarity_weekly_data=${similarity_data}/weekly similarity_weekly_data=${similarity_data}/weekly
@@ -10,7 +12,10 @@ lsi_components=[10,50,100,200,300,400,500,600,700,850,1000,1500]
lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI lsi_similarities: ${similarity_data}/subreddit_comment_terms_10k_LSI ${similarity_data}/subreddit_comment_authors-tf_10k_LSI ${similarity_data}/subreddit_comment_authors_10k_LSI ${similarity_data}/subreddit_comment_terms_30k_LSI ${similarity_data}/subreddit_comment_authors-tf_30k_LSI ${similarity_data}/subreddit_comment_authors_30k_LSI
all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
all: ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather
#all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.parquet ${tfidf_data}/comment_terms_10k.parquet ${tfidf_data}/comment_authors_100k.parquet ${tfidf_data}/comment_authors_30k.parquet ${tfidf_data}/comment_authors_10k.parquet ${similarity_data}/subreddit_comment_authors_30k.feather ${similarity_data}/subreddit_comment_authors_10k.feather ${similarity_data}/subreddit_comment_terms_10k.feather ${similarity_data}/subreddit_comment_terms_30k.feather ${similarity_data}/subreddit_comment_authors-tf_30k.feather ${similarity_data}/subreddit_comment_authors-tf_10k.feather ${similarity_data}/subreddit_comment_terms_100k.feather ${similarity_data}/subreddit_comment_authors_100k.feather ${similarity_data}/subreddit_comment_authors-tf_100k.feather ${similarity_weekly_data}/comment_terms.parquet
#${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet #${tfidf_weekly_data}/comment_terms_100k.parquet ${tfidf_weekly_data}/comment_authors_100k.parquet ${tfidf_weekly_data}/comment_terms_30k.parquet ${tfidf_weekly_data}/comment_authors_30k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_authors_30k.parquet
@@ -18,103 +23,106 @@ all: ${tfidf_data}/comment_terms_100k.parquet ${tfidf_data}/comment_terms_30k.pa
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet # all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms.parquet ${similarity_weekly_data}/comment_terms.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet ${srun} python3 weekly_cosine_similarities.py terms --topN=10000 --outfile=${similarity_weekly_data}/comment_terms.parquet
${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_terms_10k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000 ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k.feather --topN=10000
${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_terms_10k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200 ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=200
${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_terms_30k_LSI: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 ${srun_huge} python3 lsi_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=200 --inpath=$<
${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_terms_30k.feather: ${tfidf_data}/comment_terms_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 ${srun_huge} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 ${srun_huge} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k.feather --topN=10000 --inpath=$<
${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2 ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2 ${srun_huge} python3 lsi_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_30k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors-tf_30k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k.feather --topN=30000 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_10k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors-tf_10k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000 ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k.feather --topN=10000
${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors-tf_10k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=2 ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_10k_LSI --topN=10000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors-tf_30k_LSI: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=2 ${srun_huge} python3 lsi_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_30k_LSI --topN=30000 --n_components=${lsi_components} --min_df=10 --inpath=$<
${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_terms_100k.feather: ${tfidf_data}/comment_terms_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000 ${srun} python3 cosine_similarities.py term --outfile=${similarity_data}/subreddit_comment_terms_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000 ${srun} python3 cosine_similarities.py author --outfile=${similarity_data}/subreddit_comment_authors_100k.feather --topN=100000
${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py ${similarity_data}/subreddit_comment_authors-tf_100k.feather: ${tfidf_data}/comment_authors_100k.parquet similarities_helper.py
${srun_singularity} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000 ${srun} python3 cosine_similarities.py author-tf --outfile=${similarity_data}/subreddit_comment_authors-tf_100k.feather --topN=100000
${tfidf_data}/comment_terms_100k.feather/: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${similarity_data}/subreddits_by_num_comments_nonsfw.csv:
mkdir -p ${tfidf_data}/ start_spark_and_run.sh 3 top_subreddits_by_comments.py
start_spark_and_run.sh 4 tfidf.py terms --topN=100000 --outpath=${tfidf_data}/comment_terms_100k.feather
${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
mkdir -p ${tfidf_data}/ # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=30000 --outpath=${tfidf_data}/comment_terms_30k.feather start_spark_and_run.sh 3 tfidf.py terms --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_terms_100k.parquet
${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_terms_30k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
mkdir -p ${tfidf_data}/ # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py terms --topN=10000 --outpath=${tfidf_data}/comment_terms_10k.feather start_spark_and_run.sh 3 tfidf.py terms --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_terms_30k.feather
${tfidf_data}/comment_authors_100k.feather: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_terms_10k.feather: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
mkdir -p ${tfidf_data}/ # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=100000 --outpath=${tfidf_data}/comment_authors_100k.feather start_spark_and_run.sh 3 tfidf.py terms --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_terms_10k.feather
${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
mkdir -p ${tfidf_data}/ # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=10000 --outpath=${tfidf_data}/comment_authors_10k.parquet start_spark_and_run.sh 3 tfidf.py authors --topN=100000 --inpath=$< --outpath=${tfidf_data}/comment_authors_100k.parquet
${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_authors_10k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
mkdir -p ${tfidf_data}/ # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 4 tfidf.py authors --topN=30000 --outpath=${tfidf_data}/comment_authors_30k.parquet start_spark_and_run.sh 3 tfidf.py authors --topN=10000 --inpath=$< --outpath=${tfidf_data}/comment_authors_10k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 4 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet # mkdir -p ${tfidf_data}/
start_spark_and_run.sh 3 tfidf.py authors --topN=30000 --inpath=$< --outpath=${tfidf_data}/comment_authors_30k.parquet
${tfidf_data}/tfidf_weekly/comment_terms_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 3 tfidf.py terms_weekly --topN=100000 --outpath=${similarity_data}/tfidf_weekly/comment_authors_100k.parquet
${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv ${tfidf_data}/tfidf_weekly/comment_authors_100k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_ppnum_comments.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=100000 --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=100000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_100k.parquet
${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet start_spark_and_run.sh 2 tfidf.py terms_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet: /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv
start_spark_and_run.sh 4 tfidf.py authors_weekly --topN=30000 --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet start_spark_and_run.sh 3 tfidf.py authors_weekly --topN=30000 --inpath=$< --outpath=${tfidf_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet ${similarity_weekly_data}/comment_terms_100k.parquet: weekly_cosine_similarities.py similarities_helper.py ${tfidf_weekly_data}/comment_terms_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet ${srun} python3 weekly_cosine_similarities.py terms --topN=100000 --outfile=${similarity_weekly_data}/comment_terms_100k.parquet
${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_100k.parquet ${similarity_weekly_data}/comment_authors_100k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_100k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet ${srun} python3 weekly_cosine_similarities.py authors --topN=100000 --outfile=${similarity_weekly_data}/comment_authors_100k.parquet
${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_terms_30k.parquet ${similarity_weekly_data}/comment_terms_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_terms_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet ${srun} python3 weekly_cosine_similarities.py terms --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments.csv ${tfidf_weekly_data}/comment_authors_30k.parquet ,${similarity_weekly_data}/comment_authors_30k.parquet: weekly_cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet ${similarity_data}/subreddits_by_num_comments_nonsfw.csv ${tfidf_weekly_data}/comment_authors_30k.parquet
${srun_singularity} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet ${srun} python3 weekly_cosine_similarities.py authors --topN=30000 --outfile=${similarity_weekly_data}/comment_authors_30k.parquet
# ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv # ${tfidf_weekly_data}/comment_authors_130k.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv
# start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000 # start_spark_and_run.sh 1 tfidf.py authors_weekly --topN=130000
# /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet # /gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet

View File

@@ -1,12 +1,12 @@
import pandas as pd import pandas as pd
import fire import fire
from pathlib import Path from pathlib import Path
from cdsc_ecology_utils.similarity import similarities, column_similarities from similarities_helper import similarities, column_similarities
from functools import partial from functools import partial
def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'): def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf). # change so that these take in an input as an optional argument (for speed, but also for idf).
def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None): def term_cosine_similarities(outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):

View File

@@ -1,4 +1,4 @@
#!/usr/bin/bash #!/usr/bin/bash
start_spark_cluster.sh start_spark_cluster.sh
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif spark-submit --master spark://$(hostname):7077 top_subreddits_by_comments.py singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 tfidf.py authors --topN=100000 --inpath=/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet --outpath=/gscratch/scrubbed/comdata/reddit_similarity/tfidf/comment_authors_100k.parquet
singularity exec /gscratch/comdata/users/nathante/cdsc_base.sif stop-all.sh singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh

View File

@@ -1,23 +1,24 @@
import pandas as pd import pandas as pd
import fire import fire
from pathlib import Path from pathlib import Path
from cdsc_ecology_utils.similarity.similarity_functions import lsi_column_similarities, similarities, from similarities_helper import *
#from similarities_helper import similarities, lsi_column_similarities #from similarities_helper import similarities, lsi_column_similarities
from functools import partial from functools import partial
inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_terms_compex.parquet/" # inpath = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
term_colname='term' # term_colname='authors'
outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI' # outfile='/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_test_compex_LSI'
n_components=[10,50,100] # n_components=[10,50,100]
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" # included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
n_iter=5 # n_iter=5
random_state=1968 # random_state=1968
algorithm='arpack' # algorithm='randomized'
topN = None # topN = None
from_date=None # from_date=None
to_date=None # to_date=None
min_df=None # min_df=None
max_df=None # max_df=None
def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None): def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, tfidf_colname='tf_idf',n_components=100,n_iter=5,random_state=1968,algorithm='arpack',lsi_model=None):
print(n_components,flush=True) print(n_components,flush=True)
@@ -30,7 +31,7 @@ def lsi_similarities(inpath, term_colname, outfile, min_df=None, max_df=None, in
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model) simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm=algorithm,lsi_model_save=lsi_model)
return similarities(inpath=inpath, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_communities=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname) return similarities(inpath=inpath, simfunc=simfunc, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
# change so that these take in an input as an optional argument (for speed, but also for idf). # change so that these take in an input as an optional argument (for speed, but also for idf).
def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',outfile=None, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, algorithm='arpack', n_components=300,n_iter=5,random_state=1968): def term_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet',outfile=None, min_df=None, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None, algorithm='arpack', n_components=300,n_iter=5,random_state=1968):
@@ -62,7 +63,7 @@ def author_lsi_similarities(inpath='/gscratch/comdata/output/reddit_similarity/t
n_components=n_components n_components=n_components
) )
def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,n_components=300,n_iter=5,random_state=1968): def author_tf_similarities(inpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet',outfile=None, min_df=2, max_df=None, included_subreddits=None, topN=None, from_date=None, to_date=None,algorithm='arpack',n_components=300,n_iter=5,random_state=1968):
return lsi_similarities(inpath, return lsi_similarities(inpath,
'author', 'author',
outfile, outfile,

View File

@@ -262,6 +262,7 @@ def lsi_column_similarities(tfidfmat,n_components=300,n_iter=10,random_state=196
lsimat = mod.transform(tfidfmat.T) lsimat = mod.transform(tfidfmat.T)
if lsi_model_save is not None: if lsi_model_save is not None:
Path(lsi_model_save).parent.mkdir(exist_ok=True, parents=True)
pickle.dump(mod, open(lsi_model_save,'wb')) pickle.dump(mod, open(lsi_model_save,'wb'))
sims_list = [] sims_list = []

View File

@@ -1,10 +1,12 @@
import fire import fire
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql import functions as f from pyspark.sql import functions as f
from cdsc_ecology_utils.similarity.similarity_functions import tfidf_dataset, \ from similarities_helper import tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
build_weekly_tfidf_dataset, select_topN_communities from functools import partial
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits): inpath = '/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet'
# include_terms is a path to a parquet file that contains a column of term_colname + '_id' to include.
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=None, min_df=None, max_df=None):
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath) df = spark.read.parquet(inpath)
@@ -14,52 +16,73 @@ def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_
if included_subreddits is not None: if included_subreddits is not None:
include_subs = set(map(str.strip,open(included_subreddits))) include_subs = set(map(str.strip,open(included_subreddits)))
else: else:
include_subs = select_topN_communities(topN) include_subs = select_topN_subreddits(topN)
dfwriter = func(df, include_subs, term_colname) include_subs = spark.sparkContext.broadcast(include_subs)
# term_id = term_colname + "_id"
if included_terms is not None:
terms_df = spark.read.parquet(included_terms)
terms_df = terms_df.select(term_colname).distinct()
df = df.join(terms_df, on=term_colname, how='left_semi')
dfwriter = func(df, include_subs.value, term_colname)
dfwriter.parquet(outpath,mode='overwrite',compression='snappy') dfwriter.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop() spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits): def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits, min_df, max_df):
return _tfidf_wrapper(tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits) tfidf_func = partial(tfidf_dataset, max_df=max_df, min_df=min_df)
return _tfidf_wrapper(tfidf_func, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, static_tfidf_path, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits, included_terms=static_tfidf_path)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", def tfidf_authors(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=None, topN=None,
included_subreddits=None): included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath, return tfidf(inpath,
outpath, outpath,
topN, topN,
'author', 'author',
['[deleted]','AutoModerator'], ['[deleted]','AutoModerator'],
included_subreddits=included_subreddits included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
) )
def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", def tfidf_terms(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=None, topN=None,
included_subreddits=None): included_subreddits=None,
min_df=None,
max_df=None):
return tfidf(inpath, return tfidf(inpath,
outpath, outpath,
topN, topN,
'term', 'term',
[], [],
included_subreddits=included_subreddits included_subreddits=included_subreddits,
min_df=min_df,
max_df=max_df
) )
def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=None, topN=None,
included_subreddits=None): included_subreddits=None):
return tfidf_weekly(inpath, return tfidf_weekly(inpath,
outpath, outpath,
static_tfidf_path,
topN, topN,
'author', 'author',
['[deleted]','AutoModerator'], ['[deleted]','AutoModerator'],
@@ -67,6 +90,7 @@ def tfidf_authors_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_
) )
def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
static_tfidf_path="/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet",
outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=None, topN=None,
included_subreddits=None): included_subreddits=None):
@@ -74,6 +98,7 @@ def tfidf_terms_weekly(inpath="/gscratch/comdata/output/reddit_ngrams/comment_te
return tfidf_weekly(inpath, return tfidf_weekly(inpath,
outpath, outpath,
static_tfidf_path,
topN, topN,
'term', 'term',
[], [],

View File

@@ -17,7 +17,7 @@ df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments")) df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit') df = df.join(prop_nsfw,on='subreddit')
#df = df.filter(df.prop_nsfw < 0.5) df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc()) win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win)) df = df.withColumn('comments_rank', f.rank().over(win))
@@ -26,4 +26,4 @@ df = df.toPandas()
df = df.sort_values("n_comments") df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nsfw.csv', index=False) df.to_csv('/gscratch/scrubbed/comdata/reddit_similarity/subreddits_by_num_comments_nonsfw.csv', index=False)

View File

@@ -13,18 +13,23 @@ from similarities_helper import pull_tfidf, column_similarities, write_weekly_si
from scipy.sparse import csr_matrix from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count from multiprocessing import Pool, cpu_count
from functools import partial from functools import partial
import pickle
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet" # tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors_tfidf.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" # #tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data//comment_authors_compex.parquet"
min_df=None # min_df=2
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" # included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None # max_df = None
topN=100 # topN=100
term_colname='author' # term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet' # # outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None # # included_subreddits=None
outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors.parquet"; infile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_authors_tfidf.parquet"; included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"; lsi_model="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/2000_authors_LSIMOD.pkl"; n_components=1500; algorithm="randomized"; term_colname='author'; tfidf_path=infile; random_state=1968;
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms): # static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
# dftest = spark.read.parquet(static_tfidf)
def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
term_id_new = term + '_id_new' term_id_new = term + '_id_new'
@@ -32,20 +37,19 @@ def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df,
entries = pull_tfidf(infile = tfidf_path, entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname, term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits, included_subreddits=included_subreddits,
topN=topN, topN=topN,
week=week, week=week.isoformat(),
rescale_idf=False) rescale_idf=False)
tfidf_colname='tf_idf' tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s # if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0])) mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
print('computing similarities') print('computing similarities')
print(simfunc)
sims = simfunc(mat) sims = simfunc(mat)
del mat del mat
sims = next(sims)[0]
sims = pd.DataFrame(sims) sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values sims['_subreddit'] = subreddit_names.subreddit.values
@@ -56,18 +60,20 @@ def pull_weeks(batch):
return set(batch.to_pandas()['week']) return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week. # This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs): def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kwargs):
print(args)
print(kwargs)
term_colname= kwargs.get('term_colname') term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl" # lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/1000_author_LSIMOD.pkl"
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model) lsi_model = pickle.load(open(lsi_model,'rb'))
#simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=random_state,algorithm='randomized',lsi_model=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model) simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=kwargs.get('random_state'),lsi_model=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet') #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities): def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=None,max_df=None):
print(outfile) print(outfile)
# do this step in parallel if we have the memory for it. # do this step in parallel if we have the memory for it.
# should be doable with pool.map # should be doable with pool.map
@@ -84,12 +90,14 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None,
spark.stop() spark.stop()
print(f"computing weekly similarities") print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms) week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms)
pool = Pool(cpu_count()) for week in weeks:
week_similarities_helper(week)
# pool = Pool(cpu_count())
list(pool.imap(week_similarities_helper,weeks)) # list(pool.imap(week_similarities_helper, weeks))
pool.close() # pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine? # with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
@@ -97,10 +105,11 @@ def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/
return cosine_similarities_weekly(infile, return cosine_similarities_weekly(infile,
outfile, outfile,
'author', 'author',
min_df,
max_df, max_df,
included_subreddits, included_subreddits,
topN) topN,
min_df=2
)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None): def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile, return cosine_similarities_weekly(infile,
@@ -112,32 +121,29 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re
topN) topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None): def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile, return cosine_similarities_weekly_lsi(infile,
outfile, outfile,
'author', 'author',
min_df, included_subreddits=included_subreddits,
max_df,
included_subreddits,
topN,
n_components=n_components, n_components=n_components,
lsi_model=lsi_model) lsi_model=lsi_model
)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None): def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile, return cosine_similarities_weekly_lsi(infile,
outfile, outfile,
'term', 'term',
min_df, included_subreddits=included_subreddits,
max_df,
included_subreddits,
topN,
n_components=n_components, n_components=n_components,
lsi_model=lsi_model) lsi_model=lsi_model,
)
if __name__ == "__main__": if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly, fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly, 'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi, 'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly 'terms-lsi':term_cosine_similarities_weekly_lsi
}) })

View File

@@ -12,10 +12,6 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"): output="data/subreddit_timeseries.parquet"):
clusters = load_clusters(term_clusters_path, author_clusters_path)
densities = load_densities(term_densities_path, author_densities_path)
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet") df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
@@ -26,11 +22,15 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count() ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit') ts = ts.repartition('subreddit')
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner') if term_densities_path is not None and author_densities_path is not None:
densities = load_densities(term_densities_path, author_densities_path)
spk_densities = spark.createDataFrame(densities) spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner') ts = ts.join(spk_densities, on='subreddit', how='inner')
clusters = load_clusters(term_clusters_path, author_clusters_path)
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite') ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__": if __name__ == "__main__":