13
0

Update code for building simlarity matrices.

This commit is contained in:
Nate E TeBlunthuis 2020-11-17 12:52:48 -08:00
parent e794214653
commit 82d184d9c6
3 changed files with 175 additions and 41 deletions

View File

@ -2,11 +2,67 @@ from pyspark.sql import Window
from pyspark.sql import functions as f from pyspark.sql import functions as f
from enum import Enum from enum import Enum
from pyspark.mllib.linalg.distributed import CoordinateMatrix from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from scipy.sparse import csr_matrix
import pandas as pd
import numpy as np
class tf_weight(Enum): class tf_weight(Enum):
MaxTF = 1 MaxTF = 1
Norm05 = 2 Norm05 = 2
def read_tfidf_matrix(path,term_colname):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
# tfidf = tfidf.withColumnRenamed("idf","idf_old")
# tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname term = term_colname

View File

@ -8,38 +8,23 @@ import pandas as pd
import fire import fire
from itertools import islice from itertools import islice
from pathlib import Path from pathlib import Path
from similarities_helper import cosine_similarities from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
import scipy
spark = SparkSession.builder.getOrCreate() # outfile='test_similarities_500.feather';
conf = spark.sparkContext.getConf() # min_df = None;
# included_subreddits=None; topN=100; exclude_phrases=True;
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
'''
Compute similarities between subreddits based on tfi-idf vectors of comment texts
included_subreddits : string
Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
similarity_threshold : double (default = 0)
set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
min_df : int (default = 0.1 * (number of included_subreddits)
exclude terms that appear in fewer than this number of documents.
outfile: string
where to output csv and feather outputs
'''
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile) print(outfile)
print(exclude_phrases) print(exclude_phrases)
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
if included_subreddits is None: if included_subreddits is None:
included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
included_subreddits = {s.strip('\n') for s in included_subreddits} included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
else: else:
included_subreddits = set(open(included_subreddits)) included_subreddits = set(open(included_subreddits))
@ -47,7 +32,23 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
if exclude_phrases == True: if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term).contains("_")) tfidf = tfidf.filter(~f.col(term).contains("_"))
sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold) print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name,'term')
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile) p = Path(outfile)
@ -55,25 +56,72 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') sims.to_feather(outfile)
tempdir.cleanup()
path = "term_tfidf_entriesaukjy5gv.parquet"
#instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
spark.stop() # def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
df['subreddit_id_new'] = df['subreddit_id_new'] - 1 # '''
df = df.sort_values('subreddit_id_new').reset_index(drop=True) # Compute similarities between subreddits based on tfi-idf vectors of comment texts
df = df.set_index('subreddit_id_new')
# included_subreddits : string
# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
similarities = sim_entries.join(df, on='i') # similarity_threshold : double (default = 0)
similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) # set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
similarities = similarities.join(df, on='j') # https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
similarities.to_feather(output_feather) # min_df : int (default = 0.1 * (number of included_subreddits)
similarities.to_csv(output_csv) # exclude terms that appear in fewer than this number of documents.
return similarities
# outfile: string
# where to output csv and feather outputs
# '''
# print(outfile)
# print(exclude_phrases)
# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
# if included_subreddits is None:
# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
# included_subreddits = {s.strip('\n') for s in included_subreddits}
# else:
# included_subreddits = set(open(included_subreddits))
# if exclude_phrases == True:
# tfidf = tfidf.filter(~f.col(term).contains("_"))
# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
# p = Path(outfile)
# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
# #instead of toLocalMatrix() why not read as entries and put strait into numpy
# sim_entries = pd.read_parquet(output_parquet)
# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
# spark.stop()
# df['subreddit_id_new'] = df['subreddit_id_new'] - 1
# df = df.sort_values('subreddit_id_new').reset_index(drop=True)
# df = df.set_index('subreddit_id_new')
# similarities = sim_entries.join(df, on='i')
# similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
# similarities = similarities.join(df, on='j')
# similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
# similarities.to_feather(output_feather)
# similarities.to_csv(output_csv)
# return similarities
if __name__ == '__main__': if __name__ == '__main__':
fire.Fire(term_cosine_similarities) fire.Fire(term_cosine_similarities)

View File

@ -0,0 +1,30 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
# remove /u/ pages
df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank',f.rank().over(win))
df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)