1
0

commit changes from smap project.

This commit is contained in:
2022-01-19 13:57:02 -08:00
parent 541e125b28
commit 7b130a30af
6 changed files with 160 additions and 122 deletions

View File

@@ -12,10 +12,6 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"):
clusters = load_clusters(term_clusters_path, author_clusters_path)
densities = load_densities(term_densities_path, author_densities_path)
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
@@ -26,11 +22,15 @@ def build_cluster_timeseries(term_clusters_path="/gscratch/comdata/output/reddit
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit')
spk_clusters = spark.createDataFrame(clusters)
if term_densities_path is not None and author_densities_path is not None:
densities = load_densities(term_densities_path, author_densities_path)
spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner')
clusters = load_clusters(term_clusters_path, author_clusters_path)
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":