diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index 45327c7..235b4cf 100755 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -3,6 +3,7 @@ from pyspark.sql import functions as f from pyspark.sql import SparkSession from pyspark.sql import Window import numpy as np +import duckdb import pyarrow import pyarrow.dataset as ds import pandas as pd @@ -72,23 +73,21 @@ def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kw return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) -#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet') +#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet') def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=None,max_df=None): print(outfile) # do this step in parallel if we have the memory for it. # should be doable with pool.map + conn = duckdb.connect() + subreddit_names = conn.execute(f"SELECT DISTINCT subreddit, subreddit_id from read_parquet('{tfidf_path}/*/*.parquet') ORDER BY subreddit_id;").df() - spark = SparkSession.builder.getOrCreate() - df = spark.read.parquet(tfidf_path) - - # load subreddits + topN - - subreddit_names = df.select(['subreddit','subreddit_id']).distinct().toPandas() - subreddit_names = subreddit_names.sort_values("subreddit_id") - nterms = df.select(f.max(f.col(term_colname + "_id")).alias('max')).collect()[0].max - weeks = df.select(f.col("week")).distinct().toPandas().week.values - spark.stop() + nterms = conn.execute(f"SELECT MAX({term_colname + '_id'}) as nterms FROM read_parquet('{tfidf_path}/*/*.parquet')").df() + nterms = nterms.nterms.values + weeks = conn.execute(f"SELECT DISTINCT week FROM read_parquet('{tfidf_path}/*/*.parquet')").df() + weeks = weeks.week.values + conn.close() + print(f"computing weekly similarities") week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms)