1
0
cdsc_reddit/similarities/weekly_cosine_similarities.py
2025-01-12 01:03:52 -08:00

204 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import duckdb
import pyarrow
import pyarrow.dataset as ds
import pandas as pd
import fire
from itertools import islice, chain
from pathlib import Path
from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities
from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count
from functools import partial
import pickle
import pytz
# tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors_tfidf.parquet"
# #tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data//comment_authors_compex.parquet"
# min_df=2
# included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
# max_df = None
# topN=100
# term_colname='author'
# # outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# # included_subreddits=None
outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors.parquet"; infile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_authors_tfidf.parquet"; included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"; lsi_model="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/2000_authors_LSIMOD.pkl"; n_components=1500; algorithm="randomized"; term_colname='author'; tfidf_path=infile; random_state=1968;
# static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
# dftest = spark.read.parquet(static_tfidf)
def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None, clusters=None, term_ids=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname,
included_subreddits=included_subreddits,
topN=topN,
week=week,
rescale_idf=False)
tfidf_colname='tf_idf'
if term_ids is not None:
entries = duckdb.sql(f"SELECT A.{tfidf_colname}, B.{term_id} AS {term_id_new}, A.subreddit_id_new FROM entries AS A JOIN read_parquet('{term_ids}') AS B ON A.{term_id_new} == B.{'old_'+term_id}").df()
nterms = duckdb.sql(f"SELECT MAX({term_colname}_id AS nterms FROM read_parquet('{term_ids}')").df()
nterms = list(nterms.nterms.values)[0]
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
shape = (nterms,subreddit_names.shape[0])
print(shape)
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=shape)
print('computing similarities')
print(simfunc)
sims = simfunc(mat)
del mat
sims = next(sims)[0]
sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, subreddit_names, clusters=clusters)
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kwargs):
print(args)
print(kwargs)
term_colname= kwargs.get('term_colname')
# lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/1000_author_LSIMOD.pkl"
#simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=random_state,algorithm='randomized',lsi_model=lsi_model)
if isinstance(lsi_model,str):
lsi_model = pickle.load(open(lsi_model,'rb'))
simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=kwargs.get('random_state'),lsi_model=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1, term_ids=None):
print(outfile)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
conn = duckdb.connect()
subreddit_names = conn.execute(f"SELECT DISTINCT subreddit, subreddit_id from read_parquet('{tfidf_path}/*/*.parquet') ORDER BY subreddit_id;").df()
if static_tfidf_path is not None:
q = f"SELECT COUNT(DISTINCT({term_colname + '_id'})) as nterms FROM read_parquet('{static_tfidf_path}/*.parquet')"
if min_df is not None and max_df is not None:
q = q + f" WHERE count >= {min_df} AND count <= {max_df}"
else:
if min_df is not None:
q = q + f" WHERE count >= {min_df}"
if max_df is not None:
q = q + f" WHERE count <= {max_df}"
nterms = conn.execute(q).df()
else:
nterms = conn.execute(f"SELECT MAX({term_colname + '_id'}) as nterms FROM read_parquet('{tfidf_path}/*/*.parquet')").df()
nterms = nterms.nterms.values
nterms = int(nterms[0])
q = f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')"
if min_date is not None and max_date is not None:
q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE) AND CAST(week AS DATE) <= CAST('{max_date}' AS DATE)"
elif max_date is not None:
q = q + f" WHERE CAST(week AS DATE) <= CAST('{max_date}' AS DATE)"
elif min_date is not None:
q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE)"
weeks = conn.execute(q).df()
weeks = weeks.week.values
conn.close()
if clusters is not None:
clusters_raw = pd.read_feather(clusters)
clusters = duckdb.sql("SELECT A.subreddit AS sr_i, B.subreddit AS sr_j FROM clusters_raw AS A JOIN clusters_raw AS B ON A.cluster == B.cluster WHERE A.cluster != -1 AND B.cluster != -1").df()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters, term_ids=term_ids)
if cores > 1:
with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine?
list(pool.imap_unordered(week_similarities_helper, weeks, 5))
pool.close()
else:
for week in weeks:
week_similarities_helper(week)
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500, static_tfidf_path=None, clusters=None):
return cosine_similarities_weekly(infile,
outfile,
'author',
max_df,
included_subreddits,
topN,
min_df=min_df,
max_df=max_df,
static_tfidf_path=static_tfidf_path,
clusters=clusters
)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None, static_tfidf_path=None, clusters=None,term_ids=None):
return cosine_similarities_weekly(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
static_tfidf_path=static_tfidf_path,
clusters=clusters,
term_ids=term_ids)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=None, clusters=None, min_date=None, max_date=None,cores=1,term_ids=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df=min_df,
included_subreddits=included_subreddits,
n_components=n_components,
lsi_model=lsi_model,
static_tfidf_path=static_tfidf_path,
clusters=clusters,
min_date=min_date,
max_date=max_date,
cores=cores,
term_ids=term_ids
)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None,cores=1,term_ids=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
included_subreddits=included_subreddits,
n_components=n_components,
lsi_model=lsi_model,
static_tfidf_path=static_tfidf_path,
clusters=clusters,
min_date=min_date,
max_date=max_date,
cores=cores,
term_ids=term_ids
)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly_lsi
})