1
0

make multiproc configurable

This commit is contained in:
Nathan TeBlunthuis 2025-01-11 21:21:53 -08:00
parent b2f1c1342f
commit 561a6704a3

View File

@ -79,7 +79,7 @@ def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kw
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None):
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1):
print(outfile)
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
@ -91,12 +91,20 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre
if max_df is not None:
q = q + f"AND count <= {max_df}"
nterms = conn.execute(q).df()
else:
nterms = conn.execute(f"SELECT MAX({term_colname + '_id'}) as nterms FROM read_parquet('{tfidf_path}/*/*.parquet')").df()
nterms = nterms.nterms.values
nterms = int(nterms[0])
weeks = conn.execute(f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')").df()
q = f"SELECT DISTINCT CAST(CAST(week AS DATE) AS STRING) AS week FROM read_parquet('{tfidf_path}/*/*.parquet')"
if min_date is not None and max_date is not None:
q = q + " WHERE CAST(week AS DATE) >= CAST({min_date} AS DATE) AND CAST(week AS DATE) <= CAST({max_date} AS DATE)"
elif max_date is not None:
q = q + "WHERE CAST(week AS DATE) <= CAST({max_date} AS DATE)"
elif min_date is not None:
q = q + "WHERE CAST(week AS DATE) >= CAST({min_date} AS DATE)"
weeks = conn.execute(q).df()
weeks = weeks.week.values
conn.close()
@ -107,13 +115,13 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre
print(f"computing weekly similarities")
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters)
# for week in weeks:
# week_similarities_helper(week)
with Pool(128) as pool: # maybe it can be done with 128 cores on the huge machine?
list(pool.imap_unordered(week_similarities_helper, weeks, 5))
pool.close()
if cores > 1:
with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine?
list(pool.imap_unordered(week_similarities_helper, weeks, 5))
pool.close()
else:
for week in weeks:
week_similarities_helper(week)
def author_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', min_df=2, max_df=None, included_subreddits=None, topN=500, static_tfidf_path=None, clusters=None):
@ -141,20 +149,23 @@ def term_cosine_similarities_weekly(outfile, infile='/gscratch/comdata/output/re
clusters=clusters)
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=2, clusters=None):
def author_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors_test.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None, min_df=2, clusters=None, min_date=None, max_date=None,cores=1):
return cosine_similarities_weekly_lsi(infile,
outfile,
'author',
min_df=min_df,
included_subreddits=included_subreddits,
n_components=n_components,
lsi_model=lsi_model,
static_tfidf_path=static_tfidf_path,
min_df=min_df,
clusters=clusters
clusters=clusters,
min_date=min_date,
max_date=max_date,
cores=cores
)
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None):
def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', included_subreddits=None, n_components=100,lsi_model=None,static_tfidf_path=None,clusters=None,cores=1):
return cosine_similarities_weekly_lsi(infile,
outfile,
'term',
@ -162,7 +173,10 @@ def term_cosine_similarities_weekly_lsi(outfile, infile = '/gscratch/comdata/out
n_components=n_components,
lsi_model=lsi_model,
static_tfidf_path=static_tfidf_path,
clusters=clusters
clusters=clusters,
min_date=min_date,
max_date=max_date,
cores=cores
)
if __name__ == "__main__":