13
0

Updating to support wang-style user overlaps.

This commit is contained in:
Nate E TeBlunthuis 2020-12-24 22:38:04 -08:00
parent 56269deee3
commit 4e20dce188
11 changed files with 193 additions and 70 deletions

View File

@ -1,4 +1,10 @@
srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28' #srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
affinity/subreddit_comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet all:/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather /gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather
/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# $srun_cdsc python3 # $srun_cdsc python3
clustering.py /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather affinity/subreddit_comment_authors_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85 ./clustering.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather /gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85
/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
# $srun_cdsc python3
./clustering.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather /gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather ---max_iter=1000 --convergence_iter=15 --preference_quantile=0.9 --damping=0.5

0
clustering/clustering.py Normal file → Executable file
View File

4
density/job_script.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 overlap_density.py wang_overlaps --inpath=/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet --to_date=2020-04-13
stop-all.sh

View File

@ -2,6 +2,14 @@ import pandas as pd
from pandas.core.groupby import DataFrameGroupBy as GroupBy from pandas.core.groupby import DataFrameGroupBy as GroupBy
import fire import fire
import numpy as np import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import read_tfidf_matrix, reindex_tfidf, reindex_tfidf_time_interval
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member
# the input is the author tf-idf matrix
def overlap_density(inpath, outpath, agg = pd.DataFrame.sum): def overlap_density(inpath, outpath, agg = pd.DataFrame.sum):
df = pd.read_feather(inpath) df = pd.read_feather(inpath)
@ -20,6 +28,16 @@ def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum):
res.to_feather(outpath) res.to_feather(outpath)
return res return res
# inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet";
# min_df=1;
# included_subreddits=None;
# topN=10000;
# outpath="/gscratch/comdata/output/reddit_density/wang_overlaps_10000.feather"
# to_date=2019-10-28
def author_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather", def author_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather",
outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", agg=pd.DataFrame.sum): outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", agg=pd.DataFrame.sum):
if type(agg) == str: if type(agg) == str:
@ -54,4 +72,5 @@ if __name__ == "__main__":
fire.Fire({'authors':author_overlap_density, fire.Fire({'authors':author_overlap_density,
'terms':term_overlap_density, 'terms':term_overlap_density,
'author_weekly':author_overlap_density_weekly, 'author_weekly':author_overlap_density_weekly,
'term_weekly':term_overlap_density_weekly}) 'term_weekly':term_overlap_density_weekly,
'wang_overlaps':wang_overlap_density})

View File

@ -1,3 +1,11 @@
all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.feather
/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.feather
/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.feather

View File

@ -1,64 +1,21 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import pandas as pd import pandas as pd
import fire import fire
from pathlib import Path from pathlib import Path
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, select_topN_subreddits, column_similarities from similarities_helper import similarities
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False,from_date=None, to_date=None):
return similiarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date)
def cosine_similarities(infile, term_colname, outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False): def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name, term_colname)
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def term_cosine_similarities(outfile, min_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet', return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
'term', 'term',
outfile, outfile,
min_df, min_df,
included_subreddits, included_subreddits,
topN, topN,
exclude_phrases) exclude_phrasesby.)
def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000): def author_cosine_similarities(outfile, min_df=2, included_subreddits=None, topN=10000, from_date=None, to_date=None):
return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet', return cosine_similarities('/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
'author', 'author',
outfile, outfile,

View File

@ -1,4 +1,4 @@
#!/usr/bin/bash #!/usr/bin/bash
start_spark_cluster.sh start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet spark-submit --master spark://$(hostname):18899 wang_similarity.py --infile=/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet --max_df=10 --outfile=/gscratch/comdata/output/reddit_similarity/wang_similarity_1000_max10.feather
stop-all.sh stop-all.sh

View File

@ -1,3 +1,4 @@
from pyspark.sql import SparkSession
from pyspark.sql import Window from pyspark.sql import Window
from pyspark.sql import functions as f from pyspark.sql import functions as f
from enum import Enum from enum import Enum
@ -5,15 +6,108 @@ from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
import pyarrow import pyarrow
import pyarrow.dataset as ds import pyarrow.dataset as ds
from scipy.sparse import csr_matrix from scipy.sparse import csr_matrix, issparse
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import pathlib import pathlib
from datetime import datetime
from pathlib import Path
class tf_weight(Enum): class tf_weight(Enum):
MaxTF = 1 MaxTF = 1
Norm05 = 2 Norm05 = 2
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet"
def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf_weekly = spark.read.parquet(infile)
# create the time interval
if from_date is not None:
if type(from_date) is str:
from_date = datetime.fromisoformat(from_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
if to_date is not None:
if type(to_date) is str:
to_date = datetime.fromisoformat(to_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read_parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
return(tempdir, subreddit_names)
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
return (tempdir, subreddit_names)
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
if from_date is not None or to_date is not None:
tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname='author', min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
else:
tempdir, subreddit_names = reindex_tfidf(infile, term_colname='author', min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
print("loading matrix")
# mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
mat = read_tfidf_matrix(tempdir.name, term_colname)
print('computing similarities')
sims = simfunc(mat)
del mat
if issparse(sims):
sims = sims.todense()
print(f"shape of sims:{sims.shape}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
sims = pd.DataFrame(sims)
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def read_tfidf_matrix_weekly(path, term_colname, week): def read_tfidf_matrix_weekly(path, term_colname, week):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
@ -33,8 +127,6 @@ def write_weekly_similarities(path, sims, week, names):
sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values) sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
sims.to_parquet(p / week.isoformat()) sims.to_parquet(p / week.isoformat())
def read_tfidf_matrix(path,term_colname): def read_tfidf_matrix(path,term_colname):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
@ -44,6 +136,15 @@ def read_tfidf_matrix(path,term_colname):
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas() entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1)))) return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def column_overlaps(mat):
non_zeros = (mat != 0).astype('double')
intersection = non_zeros.T @ non_zeros
card1 = non_zeros.sum(axis=0)
den = np.add.outer(card1,card1) - intersection
return intersection / den
def column_similarities(mat): def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32)) norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm) mat = mat.multiply(1/norm)
@ -51,13 +152,16 @@ def column_similarities(mat):
return(sims) return(sims)
def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits): def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
term_id_new = term + '_id_new' term_id_new = term + '_id_new'
if min_df is None: if min_df is None:
min_df = 0.1 * len(included_subreddits) min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
@ -86,19 +190,22 @@ def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits):
return(tempdir) return(tempdir)
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits): def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
term_id_new = term + '_id_new' term_id_new = term + '_id_new'
if min_df is None: if min_df is None:
min_df = 0.1 * len(included_subreddits) min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids # reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct() sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id"))) sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id') tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits # only use terms in at least min_df included subreddits
@ -221,15 +328,9 @@ def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weig
return df return df
def _calc_tfidf(df, term_colname, tf_family):
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname term = term_colname
term_id = term + '_id' term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
@ -240,9 +341,7 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
# group by term. term is unique # group by term. term is unique
idf = df.groupby([term]).count() idf = df.groupby([term]).count()
N_docs = df.select('subreddit').distinct().count() N_docs = df.select('subreddit').distinct().count()
# add a little smoothing to the idf # add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1) idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
@ -272,6 +371,18 @@ def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm
return df return df
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
df = _calc_tfidf(df, term_colname, tf_family)
return df
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"): def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv"):
rankdf = pd.read_csv(path) rankdf = pd.read_csv(path)
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values) included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)

View File

@ -0,0 +1,18 @@
from similarities_helper import similarities
import numpy as np
import fire
def wang_similarity(mat):
non_zeros = (mat != 0).astype(np.float32)
intersection = non_zeros.T @ non_zeros
return intersection
infile="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet"; outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather"; min_df=1; included_subreddits=None; topN=10000; exclude_phrases=False; from_date=None; to_date=None
def wang_overlaps(infile, outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather", min_df=1, max_df=None, included_subreddits=None, topN=10000, exclude_phrases=False, from_date=None, to_date=None):
return similarities(infile=infile, simfunc=wang_similarity, term_colname='author', outfile=outfile, min_df=min_df, max_df=None, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases, from_date=from_date, to_date=to_date)
if __name__ == "__main__":
fire.Fire(wang_overlaps)

View File

@ -35,7 +35,7 @@ def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None,
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1 subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop() spark.stop()
d weeks = sorted(list(subreddit_names.week.drop_duplicates())) weeks = sorted(list(subreddit_names.week.drop_duplicates()))
for week in weeks: for week in weeks:
print(f"loading matrix: {week}") print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week) mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)