1
0

Refactor and reorganze.

This commit is contained in:
Nate E TeBlunthuis
2020-12-08 17:32:20 -08:00
parent a60747292e
commit e6294b5b90
47 changed files with 731 additions and 313 deletions

View File

@@ -0,0 +1,73 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name, term_colname)
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
'term',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases)
def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
'author',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases=False)
if __name__ == "__main__":
fire.Fire({'term':term_cosine_similarities,
'author':author_cosine_similarities})

View File

@@ -0,0 +1,24 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from similarities_helper import build_weekly_tfidf_dataset
import pandas as pd
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
# remove [deleted] and AutoModerator (TODO remove other bots)
# df = df.filter(df.author != '[deleted]')
# df = df.filter(df.author != 'AutoModerator')
df = build_weekly_tfidf_dataset(df, include_subs, 'term')
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
spark.stop()

View File

@@ -0,0 +1 @@
nathante@n2347.hyak.local.31061:1602221800

View File

@@ -0,0 +1 @@
nathante@n2347.hyak.local.31061:1602221800

2
similarities/Makefile Normal file
View File

@@ -0,0 +1,2 @@
/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet

View File

@@ -0,0 +1,73 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name, term_colname)
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
'term',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases)
def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
'author',
outfile,
min_df,
included_subreddits,
topN,
exclude_phrases=False)
if __name__ == "__main__":
fire.Fire({'term':term_cosine_similarities,
'author':author_cosine_similarities})

4
similarities/job_script.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet
stop-all.sh

View File

@@ -0,0 +1,278 @@
from pyspark.sql import Window
from pyspark.sql import functions as f
from enum import Enum
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from scipy.sparse import csr_matrix
import pandas as pd
import numpy as np
import pathlib
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
def read_tfidf_matrix_weekly(path, term_colname, week):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new],filter=ds.field('week')==week).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def write_weekly_similarities(path, sims, week, names):
sims['week'] = week
p = pathlib.Path(path)
if not p.is_dir():
p.mkdir()
# reformat as a pairwise list
sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
sims.to_parquet(p / week.isoformat())
def read_tfidf_matrix(path,term_colname):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
sub_ids = tfidf.select(['subreddit_id','week']).distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
# only use terms in at least min_df included subreddits in a given week
new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
# reset the term ids
term_ids = tfidf.select([term_id,'week']).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
tfidf = tfidf.join(term_ids,[term_id,'week'])
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf = tfidf.repartition('week')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return(tempdir)
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
# try computing cosine similarities using spark
def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
tfidf = tfidf.cache()
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
# put like 10 subredis in each partition
# step 2 make it into a distributed.RowMatrix
coordMat = CoordinateMatrix(entries)
coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
# this needs to be an IndexedRowMatrix()
mat = coordMat.toRowMatrix()
#goal: build a matrix of subreddit columns and tf-idfs rows
sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
return (sim_dist, tfidf)
def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
df = df.join(max_subreddit_terms, on=['subreddit','week'])
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
# group by term. term is unique
idf = df.groupby([term,'week']).count()
N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
idf = idf.join(N_docs, on=['week'])
# add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select([term,'week']).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit','week']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
df = df.join(subreddits,on=['subreddit','week'])
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
idf = idf.join(terms,on=[term,'week'])
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term,'week'])
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
df = df.join(max_subreddit_terms, on='subreddit')
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
# group by term. term is unique
idf = df.groupby([term]).count()
N_docs = df.select('subreddit').distinct().count()
# add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select(term).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
df = df.join(subreddits,on='subreddit')
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=term) # subreddit-term-id is unique
idf = idf.join(terms,on=term)
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term])
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"):
rankdf = pd.read_csv(path)
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
return included_subreddits

73
similarities/tfidf.py Normal file
View File

@@ -0,0 +1,73 @@
import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.filter(~ f.col(term_colname).isin(exclude))
include_subs = select_topN_subreddits(topN)
df = func(df, include_subs, term_colname)
df.write.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude):
return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude)
def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=25000):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
['[deleted]','AutoModerator']
)
def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=25000):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',
[]
)
def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=25000):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
['[deleted]','AutoModerator']
)
def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=25000):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',
[]
)
if __name__ == "__main__":
fire.Fire({'authors':tfidf_authors,
'terms':tfidf_terms,
'authors_weekly':tfidf_authors_weekly,
'terms_weekly':tfidf_terms_weekly})

View File

@@ -0,0 +1,22 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
# remove /u/ pages
df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)

View File

@@ -0,0 +1,73 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import *
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
tfidf = spark.read.parquet(tfidf_path)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
# the ids can change each week.
subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
weeks = list(subreddit_names.week.drop_duplicates())
for week in weeks:
print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
print('computing similarities')
sims = column_similarities(mat)
del mat
names = subreddit_names.loc[subreddit_names.week == week]
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
sims['subreddit'] = names.subreddit.values
write_weekly_similarities(outfile, sims, week, names)
def author_cosine_similarities_weekly(outfile, min_df=None , included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
outfile,
'author',
min_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,
included_subreddits,
topN)
if __name__ == "__main__":
fire.Fire({'author':author_cosine_similarities_weekly,
'term':term_cosine_similarities_weekly})