13
0
cdsc_reddit/similarities/weekly_cosine_similarities.py

144 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pyarrow.dataset as ds
import pandas as pd
import fire
from itertools import islice, chain
from pathlib import Path
from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities
from scipy.sparse import csr_matrix
from multiprocessing import Pool, cpu_count
from functools import partial
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_10k.parquet"
tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
min_df=None
included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"
max_df = None
topN=100
term_colname='author'
# outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet'
# included_subreddits=None
def _week_similarities(week, simfunc, tfidf_path, term_colname, min_df, max_df, included_subreddits, topN, outdir:Path, subreddit_names, nterms):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
print(f"loading matrix: {week}")
entries = pull_tfidf(infile = tfidf_path,
term_colname=term_colname,
min_df=min_df,
max_df=max_df,
included_subreddits=included_subreddits,
topN=topN,
week=week,
rescale_idf=False)
tfidf_colname='tf_idf'
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=(nterms,subreddit_names.shape[0]))
print('computing similarities')
sims = simfunc(mat)
del mat
sims = pd.DataFrame(sims)
sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['_subreddit'] = subreddit_names.subreddit.values
outfile = str(Path(outdir) / str(week))
write_weekly_similarities(outfile, sims, week, subreddit_names)
def pull_weeks(batch):
return set(batch.to_pandas()['week'])
# This requires a prefit LSI model, since we shouldn't fit different LSI models for every week.
def cosine_similarities_weekly_lsi(n_components=100, lsi_model=None, *args, **kwargs):
term_colname= kwargs.get('term_colname')
#lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_terms_compex_LSI/1000_term_LSIMOD.pkl"
# simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=n_iter,random_state=random_state,algorithm='randomized',lsi_model_load=lsi_model)
simfunc = partial(lsi_column_similarities,n_components=n_components,n_iter=kwargs.get('n_iter'),random_state=kwargs.get('random_state'),algorithm=kwargs.get('algorithm'),lsi_model_load=lsi_model)
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, max_df=None, included_subreddits = None, topN = 500, simfunc=column_similarities):
print(outfile)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(tfidf_path)
# load subreddits + topN
subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id")
nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max
weeks = df.select(f.col("week")).distinct().toPandas().week.values
spark.stop()
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df,max_df=max_df,included_subreddits=included_subreddits,topN=topN, subreddit_names=subreddit_names,nterms=nterms)
pool = Pool(cpu_count())
list(pool.imap(week_similarities_helper,weeks))
pool.close()
# with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None):
return cosine_similarities_weekly(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=None,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500,n_components=100,lsi_model=None):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
min_df,
max_df,
included_subreddits,
topN,
n_components=n_components,
lsi_model=lsi_model)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly,
'authors-lsi':author_cosine_similarities_weekly_lsi,
'terms-lsi':term_cosine_similarities_weekly
})