13
0

Merge remote-tracking branch 'refs/remotes/origin/master' into master

This commit is contained in:
Nathan TeBlunthuis 2020-11-17 16:33:14 -08:00
commit 13eb95b3b0
10 changed files with 247 additions and 65 deletions

View File

@ -71,8 +71,8 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
similarities = similarities.join(df, on='j') similarities = similarities.join(df, on='j')
similarities = similarities.rename(columns={'subreddit':"subreddit_j"}) similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
similarities.write_feather(output_feather) similarities.to_feather(output_feather)
similarities.write_csv(output_csv) similarities.to_csv(output_csv)
return similarities return similarities
if __name__ == '__main__': if __name__ == '__main__':

45
clustering.py Normal file
View File

@ -0,0 +1,45 @@
import pandas as pd
import numpy as np
from sklearn.cluster import AffinityPropagation
import fire
def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968):
'''
similarities: feather file with a dataframe of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
'''
df = pd.read_feather(similarities)
n = df.shape[0]
mat = np.array(df.drop('subreddit',1))
mat[range(n),range(n)] = 1
preference = np.quantile(mat,preference_quantile)
clustering = AffinityPropagation(damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
copy=False,
preference=preference,
affinity='precomputed',
random_state=random_state).fit(mat)
print(f"clustering took {clustering.n_iter_} iterations")
clusters = clustering.labels_
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': df.subreddit,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count()
print(f"the largest cluster has {cluster_sizes.subreddit.max()} members")
print(f"the median cluster has {cluster_sizes.subreddit.median()} members")
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
cluster_data.to_feather(output)
if __name__ == "__main__":
fire.Fire(affinity_clustering)

View File

@ -1,35 +1,34 @@
import fire
import pyarrow import pyarrow
import pandas as pd import pandas as pd
from numpy import random from numpy import random
import numpy as np import numpy as np
from sklearn.manifold import TSNE from sklearn.manifold import TSNE
df = pd.read_feather("reddit_term_similarity_3000.feather") similarities = "term_similarities_10000.feather"
df = df.sort_values(['i','j'])
n = max(df.i.max(),df.j.max()) def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=10000, early_exaggeration=20):
'''
similarities: feather file with a dataframe of similarity scores
learning_rate: parameter controlling how fast the model converges. Too low and you get outliers. Too high and you get a ball.
perplexity: number of neighbors to use. the default of 50 is often good.
def zero_pad(grp): '''
p = grp.shape[0] df = pd.read_feather(similarities)
grp = grp.sort_values('j')
return np.concatenate([np.zeros(n-p),np.ones(1),np.array(grp.value)])
col_names = df.sort_values('j').loc[:,['subreddit_j']].drop_duplicates() n = df.shape[0]
first_name = list(set(df.subreddit_i) - set(df.subreddit_j))[0] mat = np.array(df.drop('subreddit',1),dtype=np.float64)
col_names = [first_name] + list(col_names.subreddit_j) mat[range(n),range(n)] = 1
mat = df.groupby('i').apply(zero_pad) mat[mat > 1] = 1
mat.loc[n] = np.concatenate([np.zeros(n),np.ones(1)])
mat = np.stack(mat)
mat = mat + np.tril(mat.transpose(),k=-1)
dist = 2*np.arccos(mat)/np.pi dist = 2*np.arccos(mat)/np.pi
tsne_model = TSNE(2,learning_rate=750,perplexity=50,n_iter=10000,metric='precomputed',early_exaggeration=20,n_jobs=-1) tsne_model = TSNE(2,learning_rate=750,perplexity=50,n_iter=10000,metric='precomputed',early_exaggeration=20,n_jobs=-1)
tsne_fit_model = tsne_model.fit(dist) tsne_fit_model = tsne_model.fit(dist)
tsne_fit_whole = tsne_fit_model.fit_transform(dist) tsne_fit_whole = tsne_fit_model.fit_transform(dist)
plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], 'subreddit':col_names}) plot_data = pd.DataFrame({'x':tsne_fit_whole[:,0],'y':tsne_fit_whole[:,1], 'subreddit':df.subreddit})
plot_data.to_feather("tsne_subreddit_fit.feather") plot_data.to_feather(output)
if __name__ == "__main__":
fire.Fire(fit_tsne)

View File

@ -2,11 +2,67 @@ from pyspark.sql import Window
from pyspark.sql import functions as f from pyspark.sql import functions as f
from enum import Enum from enum import Enum
from pyspark.mllib.linalg.distributed import CoordinateMatrix from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from scipy.sparse import csr_matrix
import pandas as pd
import numpy as np
class tf_weight(Enum): class tf_weight(Enum):
MaxTF = 1 MaxTF = 1
Norm05 = 2 Norm05 = 2
def read_tfidf_matrix(path,term_colname):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
# tfidf = tfidf.withColumnRenamed("idf","idf_old")
# tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname term = term_colname

View File

@ -8,38 +8,23 @@ import pandas as pd
import fire import fire
from itertools import islice from itertools import islice
from pathlib import Path from pathlib import Path
from similarities_helper import cosine_similarities from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
import scipy
# outfile='test_similarities_500.feather';
# min_df = None;
# included_subreddits=None; topN=100; exclude_phrases=True;
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf() conf = spark.sparkContext.getConf()
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
'''
Compute similarities between subreddits based on tfi-idf vectors of comment texts
included_subreddits : string
Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
similarity_threshold : double (default = 0)
set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
min_df : int (default = 0.1 * (number of included_subreddits)
exclude terms that appear in fewer than this number of documents.
outfile: string
where to output csv and feather outputs
'''
print(outfile) print(outfile)
print(exclude_phrases) print(exclude_phrases)
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet') tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
if included_subreddits is None: if included_subreddits is None:
included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
included_subreddits = {s.strip('\n') for s in included_subreddits} included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
else: else:
included_subreddits = set(open(included_subreddits)) included_subreddits = set(open(included_subreddits))
@ -47,7 +32,23 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
if exclude_phrases == True: if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term).contains("_")) tfidf = tfidf.filter(~f.col(term).contains("_"))
sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold) print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name,'term')
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile) p = Path(outfile)
@ -55,25 +56,72 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv")) output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet")) output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') sims.to_feather(outfile)
tempdir.cleanup()
path = "term_tfidf_entriesaukjy5gv.parquet"
#instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
spark.stop() # def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
df['subreddit_id_new'] = df['subreddit_id_new'] - 1 # '''
df = df.sort_values('subreddit_id_new').reset_index(drop=True) # Compute similarities between subreddits based on tfi-idf vectors of comment texts
df = df.set_index('subreddit_id_new')
similarities = sim_entries.join(df, on='i') # included_subreddits : string
similarities = similarities.rename(columns={'subreddit':"subreddit_i"}) # Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
similarities = similarities.join(df, on='j')
similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
similarities.write_feather(output_feather) # similarity_threshold : double (default = 0)
similarities.write_csv(output_csv) # set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
return similarities # https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
# min_df : int (default = 0.1 * (number of included_subreddits)
# exclude terms that appear in fewer than this number of documents.
# outfile: string
# where to output csv and feather outputs
# '''
# print(outfile)
# print(exclude_phrases)
# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
# if included_subreddits is None:
# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
# included_subreddits = {s.strip('\n') for s in included_subreddits}
# else:
# included_subreddits = set(open(included_subreddits))
# if exclude_phrases == True:
# tfidf = tfidf.filter(~f.col(term).contains("_"))
# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
# p = Path(outfile)
# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
# #instead of toLocalMatrix() why not read as entries and put strait into numpy
# sim_entries = pd.read_parquet(output_parquet)
# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
# spark.stop()
# df['subreddit_id_new'] = df['subreddit_id_new'] - 1
# df = df.sort_values('subreddit_id_new').reset_index(drop=True)
# df = df.set_index('subreddit_id_new')
# similarities = sim_entries.join(df, on='i')
# similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
# similarities = similarities.join(df, on='j')
# similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
# similarities.to_feather(output_feather)
# similarities.to_csv(output_csv)
# return similarities
if __name__ == '__main__': if __name__ == '__main__':
fire.Fire(term_cosine_similarities) fire.Fire(term_cosine_similarities)

View File

@ -0,0 +1,30 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
# remove /u/ pages
df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank',f.rank().over(win))
df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)

View File

@ -0,0 +1 @@
../../.git/annex/objects/Qk/wG/SHA256E-s145210--14a2ad6660d1e4015437eff556ec349dd10a115a4f96594152a29e83d00aa784/SHA256E-s145210--14a2ad6660d1e4015437eff556ec349dd10a115a4f96594152a29e83d00aa784

View File

@ -0,0 +1 @@
../../.git/annex/objects/w7/2f/SHA256E-s44458--f1c5247775ecf06514a0ff9e523e944bc8fcd9d0fdb6f214cc1329b759d4354e/SHA256E-s44458--f1c5247775ecf06514a0ff9e523e944bc8fcd9d0fdb6f214cc1329b759d4354e

View File

@ -0,0 +1 @@
../../.git/annex/objects/WX/v3/SHA256E-s190874--c2aea719f989dde297ca5f13371e156693c574e44acd9a0e313e5e3a3ad4b543/SHA256E-s190874--c2aea719f989dde297ca5f13371e156693c574e44acd9a0e313e5e3a3ad4b543

View File

@ -0,0 +1 @@
../../.git/annex/objects/mq/2z/SHA256E-s58834--2e7b3ee11f47011fd9b34bddf8f1e788d35ab9c9e0bb6a1301b0b916135400cf/SHA256E-s58834--2e7b3ee11f47011fd9b34bddf8f1e788d35ab9c9e0bb6a1301b0b916135400cf