Refactor tfidf code to for code resuse.
This commit is contained in:
		
							parent
							
								
									772f3a8fbd
								
							
						
					
					
						commit
						5632a971c6
					
				
							
								
								
									
										116
									
								
								similarities_helper.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								similarities_helper.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,116 @@ | |||||||
|  | from pyspark.sql import Window | ||||||
|  | from pyspark.sql import functions as f | ||||||
|  | from enum import Enum | ||||||
|  | from pyspark.mllib.linalg.distributed import CoordinateMatrix | ||||||
|  | 
 | ||||||
|  | class tf_weight(Enum): | ||||||
|  |     MaxTF = 1 | ||||||
|  |     Norm05 = 2 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold): | ||||||
|  |     term = term_colname | ||||||
|  |     term_id = term + '_id' | ||||||
|  |     term_id_new = term + '_id_new' | ||||||
|  | 
 | ||||||
|  |     if min_df is None: | ||||||
|  |         min_df = 0.1 * len(included_subreddits) | ||||||
|  | 
 | ||||||
|  |     tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits)) | ||||||
|  |     tfidf = tfidf.cache() | ||||||
|  | 
 | ||||||
|  |     # reset the subreddit ids | ||||||
|  |     sub_ids = tfidf.select('subreddit_id').distinct() | ||||||
|  |     sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id"))) | ||||||
|  |     tfidf = tfidf.join(sub_ids,'subreddit_id') | ||||||
|  | 
 | ||||||
|  |     # only use terms in at least min_df included subreddits | ||||||
|  |     new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count')) | ||||||
|  | #    new_count = new_count.filter(f.col('new_count') >= min_df) | ||||||
|  |     tfidf = tfidf.join(new_count,term_id,how='inner') | ||||||
|  |      | ||||||
|  |     # reset the term ids | ||||||
|  |     term_ids = tfidf.select([term_id]).distinct() | ||||||
|  |     term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id))) | ||||||
|  |     tfidf = tfidf.join(term_ids,term_id) | ||||||
|  | 
 | ||||||
|  |     tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old") | ||||||
|  |     # tfidf = tfidf.withColumnRenamed("idf","idf_old") | ||||||
|  |     # tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count"))) | ||||||
|  |     tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf) | ||||||
|  | 
 | ||||||
|  |     # step 1 make an rdd of entires | ||||||
|  |     # sorted by (dense) spark subreddit id | ||||||
|  |     #    entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd | ||||||
|  |   | ||||||
|  |     n_partitions = int(len(included_subreddits)*2 / 5) | ||||||
|  | 
 | ||||||
|  |     entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions) | ||||||
|  | 
 | ||||||
|  |     # put like 10 subredis in each partition | ||||||
|  | 
 | ||||||
|  |     # step 2 make it into a distributed.RowMatrix | ||||||
|  |     coordMat = CoordinateMatrix(entries) | ||||||
|  | 
 | ||||||
|  |     coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions)) | ||||||
|  | 
 | ||||||
|  |     # this needs to be an IndexedRowMatrix() | ||||||
|  |     mat = coordMat.toRowMatrix() | ||||||
|  | 
 | ||||||
|  |     #goal: build a matrix of subreddit columns and tf-idfs rows | ||||||
|  |     sim_dist = mat.columnSimilarities(threshold=similarity_threshold) | ||||||
|  | 
 | ||||||
|  |     return (sim_dist, tfidf) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05): | ||||||
|  | 
 | ||||||
|  |     term = term_colname | ||||||
|  |     term_id = term + '_id' | ||||||
|  |     # aggregate counts by week. now subreddit-term is distinct | ||||||
|  |     df = df.filter(df.subreddit.isin(include_subs)) | ||||||
|  |     df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf')) | ||||||
|  | 
 | ||||||
|  |     max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique | ||||||
|  |     max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') | ||||||
|  | 
 | ||||||
|  |     df = df.join(max_subreddit_terms, on='subreddit') | ||||||
|  | 
 | ||||||
|  |     df = df.withColumn("relative_tf", df.tf / df.sr_max_tf) | ||||||
|  | 
 | ||||||
|  |     # group by term. term is unique | ||||||
|  |     idf = df.groupby([term]).count() | ||||||
|  | 
 | ||||||
|  |     N_docs = df.select('subreddit').distinct().count() | ||||||
|  | 
 | ||||||
|  |     # add a little smoothing to the idf | ||||||
|  |     idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1) | ||||||
|  | 
 | ||||||
|  |     # collect the dictionary to make a pydict of terms to indexes | ||||||
|  |     terms = idf.select(term).distinct() # terms are distinct | ||||||
|  |     terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct | ||||||
|  | 
 | ||||||
|  |     # make subreddit ids | ||||||
|  |     subreddits = df.select(['subreddit']).distinct() | ||||||
|  |     subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) | ||||||
|  | 
 | ||||||
|  |     df = df.join(subreddits,on='subreddit') | ||||||
|  | 
 | ||||||
|  |     # map terms to indexes in the tfs and the idfs | ||||||
|  |     df = df.join(terms,on=term) # subreddit-term-id is unique | ||||||
|  | 
 | ||||||
|  |     idf = idf.join(terms,on=term) | ||||||
|  | 
 | ||||||
|  |     # join on subreddit/term to create tf/dfs indexed by term | ||||||
|  |     df = df.join(idf, on=[term_id, term]) | ||||||
|  | 
 | ||||||
|  |     # agg terms by subreddit to make sparse tf/df vectors | ||||||
|  |      | ||||||
|  |     if tf_family == tf_weight.MaxTF: | ||||||
|  |         df = df.withColumn("tf_idf",  df.relative_tf * df.idf) | ||||||
|  |     else: # tf_fam = tf_weight.Norm05 | ||||||
|  |         df = df.withColumn("tf_idf",  (0.5 + 0.5 * df.relative_tf) * df.idf) | ||||||
|  | 
 | ||||||
|  |     return df | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @ -1,43 +1,19 @@ | |||||||
| from pyspark.sql import functions as f |  | ||||||
| from pyspark.sql import SparkSession | from pyspark.sql import SparkSession | ||||||
|  | from similarities_helper import build_tfidf_dataset | ||||||
| 
 | 
 | ||||||
|  | ## TODO:need to exclude automoderator / bot posts. | ||||||
|  | ## TODO:need to exclude better handle hyperlinks.  | ||||||
| spark = SparkSession.builder.getOrCreate() | spark = SparkSession.builder.getOrCreate() | ||||||
| df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") |  | ||||||
| 
 | 
 | ||||||
| max_subreddit_week_authors = df.groupby(['subreddit','week']).max('tf') | df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet") | ||||||
| max_subreddit_week_authors = max_subreddit_week_authors.withColumnRenamed('max(tf)','sr_week_max_tf') |  | ||||||
| 
 | 
 | ||||||
| df = df.join(max_subreddit_week_authors, ['subreddit','week']) | include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) | ||||||
|  | include_subs = {s.strip('\n') for s in include_subs} | ||||||
|  | df = df.filter(df.author != '[deleted]') | ||||||
|  | df = df.filter(df.author != 'AutoModerator') | ||||||
| 
 | 
 | ||||||
| df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf) | df = build_tfidf_dataset(df, include_subs, 'author') | ||||||
| 
 | 
 | ||||||
| # group by term / week | df.cache() | ||||||
| idf = df.groupby(['author','week']).count() |  | ||||||
| 
 | 
 | ||||||
| idf = idf.withColumnRenamed('count','idf') | df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy') | ||||||
| 
 |  | ||||||
| # output: term | week | df |  | ||||||
| #idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') |  | ||||||
| 
 |  | ||||||
| # collect the dictionary to make a pydict of terms to indexes |  | ||||||
| authors = idf.select('author').distinct() |  | ||||||
| authors = authors.withColumn('author_id',f.monotonically_increasing_id()) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # map terms to indexes in the tfs and the idfs |  | ||||||
| df = df.join(authors,on='author') |  | ||||||
| 
 |  | ||||||
| idf = idf.join(authors,on='author') |  | ||||||
| 
 |  | ||||||
| # join on subreddit/term/week to create tf/dfs indexed by term |  | ||||||
| df = df.join(idf, on=['author_id','week','author']) |  | ||||||
| 
 |  | ||||||
| # agg terms by subreddit to make sparse tf/df vectors |  | ||||||
| df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf) |  | ||||||
|   |  | ||||||
| df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('author_id','tf_idf')).alias('tfidf_maps')) |  | ||||||
|   |  | ||||||
| df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps')) |  | ||||||
| 
 |  | ||||||
| # output: subreddit | week | tf/df |  | ||||||
| df.write.json('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy') |  | ||||||
|  | |||||||
| @ -1,6 +1,7 @@ | |||||||
| from pyspark.sql import functions as f | from pyspark.sql import functions as f | ||||||
| from pyspark.sql import SparkSession | from pyspark.sql import SparkSession | ||||||
| from pyspark.sql import Window | from pyspark.sql import Window | ||||||
|  | from similarities_helper import build_tfidf_dataset | ||||||
| 
 | 
 | ||||||
| ## TODO:need to exclude automoderator / bot posts. | ## TODO:need to exclude automoderator / bot posts. | ||||||
| ## TODO:need to exclude better handle hyperlinks.  | ## TODO:need to exclude better handle hyperlinks.  | ||||||
| @ -11,43 +12,6 @@ df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parq | |||||||
| include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) | include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) | ||||||
| include_subs = {s.strip('\n') for s in include_subs} | include_subs = {s.strip('\n') for s in include_subs} | ||||||
| 
 | 
 | ||||||
| # aggregate counts by week. now subreddit-term is distinct | df = build_tfidf_dataset(df, include_subs, 'term') | ||||||
| df = df.filter(df.subreddit.isin(include_subs)) |  | ||||||
| df = df.groupBy(['subreddit','term']).agg(f.sum('tf').alias('tf')) |  | ||||||
| 
 |  | ||||||
| max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique |  | ||||||
| max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf') |  | ||||||
| 
 |  | ||||||
| df = df.join(max_subreddit_terms, on='subreddit') |  | ||||||
| 
 |  | ||||||
| df = df.withColumn("relative_tf", df.tf / df.sr_max_tf) |  | ||||||
| 
 |  | ||||||
| # group by term. term is unique |  | ||||||
| idf = df.groupby(['term']).count() |  | ||||||
| 
 |  | ||||||
| N_docs = df.select('subreddit').distinct().count() |  | ||||||
| 
 |  | ||||||
| idf = idf.withColumn('idf',f.log(N_docs/f.col('count'))) |  | ||||||
| 
 |  | ||||||
| # collect the dictionary to make a pydict of terms to indexes |  | ||||||
| terms = idf.select('term').distinct() # terms are distinct |  | ||||||
| terms = terms.withColumn('term_id',f.row_number().over(Window.orderBy("term"))) # term ids are distinct |  | ||||||
| 
 |  | ||||||
| # make subreddit ids |  | ||||||
| subreddits = df.select(['subreddit']).distinct() |  | ||||||
| subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit"))) |  | ||||||
| 
 |  | ||||||
| df = df.join(subreddits,on='subreddit') |  | ||||||
| 
 |  | ||||||
| # map terms to indexes in the tfs and the idfs |  | ||||||
| df = df.join(terms,on='term') # subreddit-term-id is unique |  | ||||||
| 
 |  | ||||||
| idf = idf.join(terms,on='term') |  | ||||||
| 
 |  | ||||||
| # join on subreddit/term to create tf/dfs indexed by term |  | ||||||
| df = df.join(idf, on=['term_id','term']) |  | ||||||
| 
 |  | ||||||
| # agg terms by subreddit to make sparse tf/df vectors |  | ||||||
| df = df.withColumn("tf_idf", (0.5 + (0.5 * df.relative_tf) * df.idf)) |  | ||||||
| 
 | 
 | ||||||
| df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') | df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user