13
0

Some improvements to run affinity clustering on larger dataset and

compute density.
This commit is contained in:
Nate E TeBlunthuis 2020-12-12 20:42:47 -08:00
parent e6294b5b90
commit 56269deee3
15 changed files with 84 additions and 84 deletions

4
clustering/Makefile Normal file
View File

@ -0,0 +1,4 @@
srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
affinity/subreddit_comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
# $srun_cdsc python3
clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather affinity/subreddit_comment_authors_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85

View File

@ -1,12 +1,15 @@
#!/usr/bin/env python3
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from sklearn.cluster import AffinityPropagation from sklearn.cluster import AffinityPropagation
import fire import fire
def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968): def affinity_clustering(similarities, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
''' '''
similarities: feather file with a dataframe of similarity scores similarities: feather file with a dataframe of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits. preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
''' '''
df = pd.read_feather(similarities) df = pd.read_feather(similarities)
@ -16,6 +19,8 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv
preference = np.quantile(mat,preference_quantile) preference = np.quantile(mat,preference_quantile)
print(f"preference is {preference}")
print("data loaded") print("data loaded")
clustering = AffinityPropagation(damping=damping, clustering = AffinityPropagation(damping=damping,
@ -24,6 +29,7 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv
copy=False, copy=False,
preference=preference, preference=preference,
affinity='precomputed', affinity='precomputed',
verbose=verbose,
random_state=random_state).fit(mat) random_state=random_state).fit(mat)

7
density/Makefile Normal file
View File

@ -0,0 +1,7 @@
all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscratch/comdata/output/reddit_density/comment_authors_10000.feather
/gscratch/comdata/output/reddit_density/comment_terms_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
python3 overlap_density.py terms --inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/comment_authors_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
python3 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather" --agg=pd.DataFrame.sum

View File

@ -0,0 +1,57 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy as GroupBy
import fire
import numpy as np
def overlap_density(inpath, outpath, agg = pd.DataFrame.sum):
df = pd.read_feather(inpath)
df = df.drop('subreddit',1)
np.fill_diagonal(df.values,0)
df = agg(df, 0).reset_index()
df = df.rename({0:'overlap_density'},axis='columns')
df.to_feather(outpath)
return df
def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum):
df = pd.read_parquet(inpath)
# exclude the diagonal
df = df.loc[df.subreddit != df.variable]
res = agg(df.groupby(['subreddit','week'])).reset_index()
res.to_feather(outpath)
return res
def author_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather",
outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", agg=pd.DataFrame.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density(inpath, outpath, agg)
def term_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather",
outpath="/gscratch/comdata/output/reddit_density/comment_term_similarity_10000.feather", agg=pd.DataFrame.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density(inpath, outpath, agg)
def author_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/subreddit_authors_10000_weekly.parquet",
outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000_weekly.feather", agg=GroupBy.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density_weekly(inpath, outpath, agg)
def term_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet",
outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000_weekly.parquet", agg=GroupBy.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density_weekly(inpath, outpath, agg)
if __name__ == "__main__":
fire.Fire({'authors':author_overlap_density,
'terms':term_overlap_density,
'author_weekly':author_overlap_density_weekly,
'term_weekly':term_overlap_density_weekly})

View File

@ -1 +0,0 @@
nathante@n2347.hyak.local.31061:1602221800

View File

@ -1 +0,0 @@
nathante@n2347.hyak.local.31061:1602221800

View File

@ -1,73 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name, term_colname)
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
'term',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases)
def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
'author',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases=False)
if __name__ == "__main__":
fire.Fire({'term':term_cosine_similarities,
'author':author_cosine_similarities})

View File

@ -1 +0,0 @@
nathante@n2347.hyak.local.31061:1602221800

View File

@ -1 +0,0 @@
nathante@n2347.hyak.local.31061:1602221800

View File

@ -1,2 +1,5 @@
/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather
/gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet

View File

@ -3,7 +3,7 @@ from pyspark.sql import SparkSession
import pandas as pd import pandas as pd
import fire import fire
from pathlib import Path from pathlib import Path
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits, column_similarities
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):

View File

@ -1,4 +1,4 @@
#!/usr/bin/bash #!/usr/bin/bash
start_spark_cluster.sh start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet
stop-all.sh stop-all.sh

View File

@ -45,7 +45,7 @@ def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/commen
[] []
) )
def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=25000): topN=25000):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet", return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
@ -55,7 +55,7 @@ def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfi
['[deleted]','AutoModerator'] ['[deleted]','AutoModerator']
) )
def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=25000): topN=25000):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet", return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",

View File

@ -35,7 +35,7 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None,
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop() spark.stop()
weeks = list(subreddit_names.week.drop_duplicates()) d weeks = sorted(list(subreddit_names.week.drop_duplicates()))
for week in weeks: for week in weeks:
print(f"loading matrix: {week}") print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week) mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)

0
visualization/Makefile Normal file
View File