#!/usr/bin/env python3 from pyspark.sql import functions as f from pyspark.sql import SparkSession from pyspark.sql import Window import numpy as np import duckdb import pyarrow import pyarrow.dataset as ds import pandas as pd import fire from itertools import islice, chain from pathlib import Path from similarities_helper import pull_tfidf, column_similarities, write_weekly_similarities, lsi_column_similarities from scipy.sparse import csr_matrix from multiprocessing import Pool, cpu_count from functools import partial import pickle import pytz # tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors_tfidf.parquet" # #tfidf_path = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data//comment_authors_compex.parquet" # min_df=2 # included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt" # max_df = None # topN=100 # term_colname='author' # # outfile = '/gscratch/comdata/output/reddit_similarity/weekly/comment_authors_test.parquet' # # included_subreddits=None outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity_weekly/comment_authors.parquet"; infile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_authors_tfidf.parquet"; included_subreddits="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/included_subreddits.txt"; lsi_model="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/2000_authors_LSIMOD.pkl"; n_components=1500; algorithm="randomized"; term_colname='author'; tfidf_path=infile; random_state=1968; # static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet" # dftest = spark.read.parquet(static_tfidf) def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None, clusters=None): term = term_colname term_id = term + '_id' term_id_new = term + '_id_new' print(f"loading matrix: {week}") entries = pull_tfidf(infile = tfidf_path, term_colname=term_colname, included_subreddits=included_subreddits, topN=topN, week=week, rescale_idf=False) tfidf_colname='tf_idf' # if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s shape = (nterms,subreddit_names.shape[0]) print(shape) mat = csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1)),shape=shape) print('computing similarities') print(simfunc) sims = simfunc(mat) del mat sims = next(sims)[0] sims = pd.DataFrame(sims) sims = sims.rename({i: sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1) sims['_subreddit'] = subreddit_names.subreddit.values outfile = str(Path(outdir) / str(week)) write_weekly_similarities(outfile, sims, week, subreddit_names, clusters=clusters) def pull_weeks(batch): return set(batch.to_pandas()['week']) # This requires a prefit LSI model, since we shouldn't fit different LSI models for every week. def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kwargs): print(args) print(kwargs) term_colname= kwargs.get('term_colname') # lsi_model = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/similarity/comment_authors_compex_LSI/1000_author_LSIMOD.pkl" #simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=random_state,algorithm='randomized',lsi_model=lsi_model) if isinstance(lsi_model,str): lsi_model = pickle.load(open(lsi_model,'rb')) simfunc = partial(lsi_column_similarities,n_components=n_components,random_state=kwargs.get('random_state'),lsi_model=lsi_model) return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet') def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1): print(outfile) # do this step in parallel if we have the memory for it. # should be doable with pool.map conn = duckdb.connect() subreddit_names = conn.execute(f"SELECT DISTINCT subreddit, subreddit_id from read_parquet('{tfidf_path}/*/*.parquet') ORDER BY subreddit_id;").df() if static_tfidf_path is not None: q = f"SELECT COUNT(DISTINCT({term_colname + '_id'})) as nterms FROM read_parquet('{static_tfidf_path}/*.parquet')" if min_df is not None and max_df is not None: q = q + f" WHERE count >= {min_df} AND count <= {max_df}" else: if min_df is not None: q = q + f" WHERE count >= {min_df}" if max_df is not None: q = q + f" WHERE count <= {max_df}" nterms = conn.execute(q).df() else: nterms = conn.execute(f"SELECT MAX({term_colname + '_id'}) as nterms FROM read_parquet('{tfidf_path}/*/*.parquet')").df() nterms = nterms.nterms.values nterms = int(nterms[0]) q = f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')" if min_date is not None and max_date is not None: q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE) AND CAST(week AS DATE) <= CAST('{max_date}' AS DATE)" elif max_date is not None: q = q + f" WHERE CAST(week AS DATE) <= CAST('{max_date}' AS DATE)" elif min_date is not None: q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE)" weeks = conn.execute(q).df() weeks = weeks.week.values conn.close() if clusters is not None: clusters_raw = pd.read_feather(clusters) clusters = duckdb.sql("SELECT A.subreddit AS sr_i, B.subreddit AS sr_j FROM clusters_raw AS A JOIN clusters_raw AS B ON A.cluster == B.cluster WHERE A.cluster != -1 AND B.cluster != -1").df() print(f"computing weekly similarities") week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters) if cores > 1: with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine? list(pool.imap_unordered(week_similarities_helper, weeks, 5)) pool.close() else: for week in weeks: week_similarities_helper(week) def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500, static_tfidf_path=None, clusters=None): return cosine_similarities_weekly(infile, outfile, 'author', max_df, included_subreddits, topN, min_df=min_df, max_df=max_df, static_tfidf_path=static_tfidf_path, clusters=clusters ) def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', min_df=None, max_df=None, included_subreddits=None, topN=None, static_tfidf_path=None, clusters=None): return cosine_similarities_weekly(infile, outfile, 'term', min_df, max_df, included_subreddits, topN, static_tfidf_path=static_tfidf_path, clusters=clusters) def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=None, clusters=None, min_date=None, max_date=None,cores=1): return cosine_similarities_weekly_lsi(infile, outfile, 'author', min_df=min_df, included_subreddits=included_subreddits, n_components=n_components, lsi_model=lsi_model, static_tfidf_path=static_tfidf_path, clusters=clusters, min_date=min_date, max_date=max_date, cores=cores ) def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None,cores=1): return cosine_similarities_weekly_lsi(infile, outfile, 'term', included_subreddits=included_subreddits, n_components=n_components, lsi_model=lsi_model, static_tfidf_path=static_tfidf_path, clusters=clusters, min_date=min_date, max_date=max_date, cores=cores ) if __name__ == "__main__": fire.Fire({'authors':author_cosine_similarities_weekly, 'terms':term_cosine_similarities_weekly, 'authors-lsi':author_cosine_similarities_weekly_lsi, 'terms-lsi':term_cosine_similarities_weekly_lsi })