support remapping term_ids.
This commit is contained in:
parent
72a4e686ef
commit
b4f9ce0ad2
@ -31,7 +31,7 @@ outfile="/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/simi
|
|||||||
# static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
|
# static_tfidf = "/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf/comment_authors_compex.parquet"
|
||||||
# dftest = spark.read.parquet(static_tfidf)
|
# dftest = spark.read.parquet(static_tfidf)
|
||||||
|
|
||||||
def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None, clusters=None):
|
def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subreddits, outdir:Path, subreddit_names, nterms, topN=None, min_df=None, max_df=None, clusters=None, term_ids=None):
|
||||||
term = term_colname
|
term = term_colname
|
||||||
term_id = term + '_id'
|
term_id = term + '_id'
|
||||||
term_id_new = term + '_id_new'
|
term_id_new = term + '_id_new'
|
||||||
@ -44,6 +44,10 @@ def _week_similarities(week, simfunc, tfidf_path, term_colname, included_subredd
|
|||||||
week=week,
|
week=week,
|
||||||
rescale_idf=False)
|
rescale_idf=False)
|
||||||
|
|
||||||
|
if term_ids is not None:
|
||||||
|
entries = duckdb.sql(f"SELECT A.{tfidf_colname}, B.{term_id} AS {term_id_new}, A.subreddit_id_new FROM entries AS A JOIN term_ids AS B ON A.{term_id_new} == B.{term_id_old}").df()
|
||||||
|
|
||||||
|
|
||||||
tfidf_colname='tf_idf'
|
tfidf_colname='tf_idf'
|
||||||
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
|
# if the max subreddit id we found is less than the number of subreddit names then we have to fill in 0s
|
||||||
shape = (nterms,subreddit_names.shape[0])
|
shape = (nterms,subreddit_names.shape[0])
|
||||||
@ -79,7 +83,7 @@ def cosine_similarities_weekly_lsi(*args, n_components=100, lsi_model=None, **kw
|
|||||||
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
|
return cosine_similarities_weekly(*args, simfunc=simfunc, **kwargs)
|
||||||
|
|
||||||
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet')
|
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/competitive_exclusion_reddit/data/tfidf_weekly/comment_submission_terms_tfidf.parquet')
|
||||||
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1):
|
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subreddits = None, topN = None, simfunc=column_similarities, min_df=0, max_df=None, static_tfidf_path=None, clusters=None, min_date=None, max_date=None, cores=1, term_ids=None):
|
||||||
print(outfile)
|
print(outfile)
|
||||||
# do this step in parallel if we have the memory for it.
|
# do this step in parallel if we have the memory for it.
|
||||||
# should be doable with pool.map
|
# should be doable with pool.map
|
||||||
@ -111,7 +115,7 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre
|
|||||||
q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE)"
|
q = q + f" WHERE CAST(week AS DATE) >= CAST('{min_date}' AS DATE)"
|
||||||
|
|
||||||
weeks = conn.execute(q).df()
|
weeks = conn.execute(q).df()
|
||||||
weeks = weeks.week.values
|
weeks = weeks.week.values
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
if clusters is not None:
|
if clusters is not None:
|
||||||
@ -119,7 +123,7 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, included_subre
|
|||||||
clusters = duckdb.sql("SELECT A.subreddit AS sr_i, B.subreddit AS sr_j FROM clusters_raw AS A JOIN clusters_raw AS B ON A.cluster == B.cluster WHERE A.cluster != -1 AND B.cluster != -1").df()
|
clusters = duckdb.sql("SELECT A.subreddit AS sr_i, B.subreddit AS sr_j FROM clusters_raw AS A JOIN clusters_raw AS B ON A.cluster == B.cluster WHERE A.cluster != -1 AND B.cluster != -1").df()
|
||||||
|
|
||||||
print(f"computing weekly similarities")
|
print(f"computing weekly similarities")
|
||||||
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters)
|
week_similarities_helper = partial(_week_similarities,simfunc=simfunc, tfidf_path=tfidf_path, term_colname=term_colname, outdir=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=None, subreddit_names=subreddit_names,nterms=nterms, clusters = clusters, term_ids=term_ids)
|
||||||
|
|
||||||
if cores > 1:
|
if cores > 1:
|
||||||
with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine?
|
with Pool(cores) as pool: # maybe it can be done with 128 cores on the huge machine?
|
||||||
|
Loading…
Reference in New Issue
Block a user