From 561a6704a30d6208ad6c81b0a5e01684951c8a54 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Sat, 11 Jan 2025 21:21:53 -0800 Subject: [PATCH] make multiproc configurable --- similarities/weekly_cosine_similarities.py | 44 ++++++++++++++-------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/similarities/weekly_cosine_similarities.py b/similarities/weekly_cosine_similarities.py index ebd6453..90b8e60 100755 --- a/similarities/weekly_cosine_similarities.py +++ b/similarities/weekly_cosine_similarities.py @@ -79,7 +79,7 @@ def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kw return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs) #tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet') -def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None): +def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1): print(outfile) # do this step in parallel if we have the memory for it. # should be doable with pool.map @@ -91,12 +91,20 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre if max_df is not None: q = q + f"AND count <= {max_df}" nterms = conn.execute(q).df() - else: nterms = conn.execute(f"SELECT MAX({term_colname + '_id'}) as nterms FROM read_parquet('{tfidf_path}/*/*.parquet')").df() nterms = nterms.nterms.values nterms = int(nterms[0]) - weeks = conn.execute(f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')").df() + q = f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')" + + if min_date is not None and max_date is not None: + q = q + " WHERE CAST(week AS DATE) >= CAST({min_date} AS DATE) AND CAST(week AS DATE) <= CAST({max_date} AS DATE)" + elif max_date is not None: + q = q + "WHERE CAST(week AS DATE) <= CAST({max_date} AS DATE)" + elif min_date is not None: + q = q + "WHERE CAST(week AS DATE) >= CAST({min_date} AS DATE)" + + weeks = conn.execute(q).df() weeks = weeks.week.values conn.close() @@ -107,13 +115,13 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre print(f"computing weekly similarities") week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters) - # for week in weeks: - # week_similarities_helper(week) - - with Pool(128) as pool: # maybe it can be done with 128 cores on the huge machine? - list(pool.imap_unordered(week_similarities_helper, weeks, 5)) - pool.close() - + if cores > 1: + with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine? + list(pool.imap_unordered(week_similarities_helper, weeks, 5)) + pool.close() + else: + for week in weeks: + week_similarities_helper(week) def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500, static_tfidf_path=None, clusters=None): @@ -141,20 +149,23 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re clusters=clusters) -def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=2, clusters=None): +def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=2, clusters=None, min_date=None, max_date=None,cores=1): return cosine_similarities_weekly_lsi(infile, outfile, 'author', + min_df=min_df, included_subreddits=included_subreddits, n_components=n_components, lsi_model=lsi_model, static_tfidf_path=static_tfidf_path, - min_df=min_df, - clusters=clusters + clusters=clusters, + min_date=min_date, + max_date=max_date, + cores=cores ) -def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None): +def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None,cores=1): return cosine_similarities_weekly_lsi(infile, outfile, 'term', @@ -162,7 +173,10 @@ def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/out n_components=n_components, lsi_model=lsi_model, static_tfidf_path=static_tfidf_path, - clusters=clusters + clusters=clusters, + min_date=min_date, + max_date=max_date, + cores=cores ) if __name__ == "__main__":