From d0f37fe33a2ad0d0584de7d2139c9176b99be4aa Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Sat, 11 Jan 2025 19:52:54 -0800 Subject: [PATCH] limit output to only the subreddits in clusters. --- similarities/similarities_helper.py | 11 ++++++++--- similarities/weekly_cosine_similarities.py | 15 ++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/similarities/similarities_helper.py b/similarities/similarities_helper.py index f8b2412..a96bfb5 100644 --- a/similarities/similarities_helper.py +++ b/similarities/similarities_helper.py @@ -17,6 +17,7 @@ from datetime import datetime from pathlib import Path import pickle import pytz +import duckdb class tf_weight(Enum): MaxTF = 1 @@ -179,15 +180,19 @@ def similarities(inpath, simfunc, term_colname, outfile, min_df=None, max_df=Non else: proc_sims(sims, outfile) -def write_weekly_similarities(path, sims, week, names): +def write_weekly_similarities(path, sims, week, names, clusters=None): sims['week'] = week p = pathlib.Path(path) if not p.is_dir(): p.mkdir(exist_ok=True,parents=True) - + # reformat as a pairwise list sims = sims.melt(id_vars=['_subreddit','week'],value_vars=names.subreddit.values) - sims.to_parquet(p / week) + + if clusters is not None: + cluster_sims = duckdb.sql("SELECT sims.* FROM sims SEMI JOIN clusters ON _subreddit == sr_i AND variable == sr_j").df() + + cluster_sims.to_parquet(p / week) def column_overlaps(mat): non_zeros = (mat != 0).astype('double') diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index 9ea6dfb..0963697 100755 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -31,7 +31,7 @@ outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/simi # static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" # dftest = spark.read.parquet(static_tfidf) -def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None): +def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None, clusters=None): term = term_colname term_id = term + '_id' term_id_new = term + '_id_new' @@ -57,8 +57,9 @@ def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subredd sims = pd.DataFrame(sims) sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) sims['_subreddit'] = subreddit_names.subreddit.values + outfile = str(Path(outdir) / str(week)) - write_weekly_similarities(outfile, sims, week, subreddit_names) + write_weekly_similarities(outfile, sims, week, subreddit_names, clusters=clusters) def pull_weeks(batch): return set(batch.to_pandas()['week']) @@ -78,7 +79,7 @@ def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kw return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet') -def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None): +def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters_path=None): print(outfile) # do this step in parallel if we have the memory for it. # should be doable with pool.map @@ -98,9 +99,13 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre weeks = conn.execute(f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')").df() weeks = weeks.week.values conn.close() - + + if clusters is not None: + clusters_raw = pd.read_feather(clusters) + clusters = duckdb.sql("SELECT A.subreddit AS sr_i, B.subreddit AS sr_j FROM clusters_raw AS A JOIN clusters_raw AS B ON A.cluster == B.cluster WHERE A.cluster != -1 AND B.cluster != -1").df() + print(f"computing weekly similarities") - week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms) + week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters) # for week in weeks: # week_similarities_helper(week)