13
0
cdsc_reddit/term_cosine_similarity.py

128 lines
5.3 KiB
Python
Raw Normal View History

2020-11-02 18:40:02 +00:00
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
import scipy
# outfile='test_similarities_500.feather';
# min_df = None;
# included_subreddits=None; topN=100; exclude_phrases=True;
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
2020-11-02 18:40:02 +00:00
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
if included_subreddits is None:
rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
2020-11-02 18:40:02 +00:00
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term).contains("_"))
2020-11-02 18:40:02 +00:00
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name,'term')
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
2020-11-02 18:40:02 +00:00
p = Path(outfile)
2020-11-02 18:40:02 +00:00
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
2020-11-02 18:40:02 +00:00
sims.to_feather(outfile)
tempdir.cleanup()
path = "term_tfidf_entriesaukjy5gv.parquet"
2020-11-02 18:40:02 +00:00
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
# '''
# Compute similarities between subreddits based on tfi-idf vectors of comment texts
# included_subreddits : string
# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
# similarity_threshold : double (default = 0)
# set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
# min_df : int (default = 0.1 * (number of included_subreddits)
# exclude terms that appear in fewer than this number of documents.
# outfile: string
# where to output csv and feather outputs
# '''
# print(outfile)
# print(exclude_phrases)
# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
# if included_subreddits is None:
# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
# included_subreddits = {s.strip('\n') for s in included_subreddits}
# else:
# included_subreddits = set(open(included_subreddits))
# if exclude_phrases == True:
# tfidf = tfidf.filter(~f.col(term).contains("_"))
# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
# p = Path(outfile)
# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
# #instead of toLocalMatrix() why not read as entries and put strait into numpy
# sim_entries = pd.read_parquet(output_parquet)
# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
# spark.stop()
# df['subreddit_id_new'] = df['subreddit_id_new'] - 1
# df = df.sort_values('subreddit_id_new').reset_index(drop=True)
# df = df.set_index('subreddit_id_new')
# similarities = sim_entries.join(df, on='i')
# similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
# similarities = similarities.join(df, on='j')
# similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
# similarities.to_feather(output_feather)
# similarities.to_csv(output_csv)
# return similarities
2020-11-02 18:40:02 +00:00
if __name__ == '__main__':
fire.Fire(term_cosine_similarities)