From 811a0d87c4d394c2c7849a613f6aec2d81e49138 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Thu, 18 May 2023 10:29:08 -0700 Subject: [PATCH] changes from dirty branch. --- clustering/Makefile | 2 +- clustering/clustering_base.py | 11 ++++++++--- clustering/umap_hdbscan_clustering.py | 2 +- datasets/comments_2_parquet_part2.py | 19 ++++++++++--------- datasets/job_script.sh | 6 ++++-- datasets/submissions_2_parquet_part1.py | 4 ++-- datasets/submissions_2_parquet_part2.py | 8 ++++---- 7 files changed, 30 insertions(+), 22 deletions(-) diff --git a/clustering/Makefile b/clustering/Makefile index 559a85c..6f25a7d 100644 --- a/clustering/Makefile +++ b/clustering/Makefile @@ -4,7 +4,7 @@ similarity_data=/gscratch/comdata/output/reddit_similarity clustering_data=/gscratch/comdata/output/reddit_clustering kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000] -umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10] +umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10,15,25] hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15] diff --git a/clustering/clustering_base.py b/clustering/clustering_base.py index ced627d..98a260e 100644 --- a/clustering/clustering_base.py +++ b/clustering/clustering_base.py @@ -21,9 +21,9 @@ class clustering_job: self.subreddits, self.mat = self.read_distance_mat(self.infile) self.clustering = self.call(self.mat, *self.args, **self.kwargs) self.cluster_data = self.process_clustering(self.clustering, self.subreddits) - self.score = self.silhouette() self.outpath.mkdir(parents=True, exist_ok=True) self.cluster_data.to_feather(self.outpath/(self.name + ".feather")) + self.hasrun = True self.cleanup() @@ -62,6 +62,7 @@ class clustering_job: else: score = None self.silsampout = None + return score def read_distance_mat(self, similarities, use_threads=True): @@ -81,9 +82,13 @@ class clustering_job: self.n_clusters = len(set(clusters)) print(f"found {self.n_clusters} clusters") - cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_}) + + self.score = self.silhouette() + print(f"silhouette_score:{self.score}") + + cluster_sizes = cluster_data.groupby("cluster").count().reset_index() print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members") @@ -125,7 +130,7 @@ class twoway_clustering_job(clustering_job): self.after_run() self.cleanup() - def after_run(): + def after_run(self): self.score = self.silhouette() self.outpath.mkdir(parents=True, exist_ok=True) print(self.outpath/(self.name+".feather")) diff --git a/clustering/umap_hdbscan_clustering.py b/clustering/umap_hdbscan_clustering.py index 5633d77..cf4acbb 100644 --- a/clustering/umap_hdbscan_clustering.py +++ b/clustering/umap_hdbscan_clustering.py @@ -110,7 +110,7 @@ class umap_hdbscan_job(twoway_clustering_job): self.cluster_selection_method = hdbscan_args['cluster_selection_method'] def after_run(self): - coords = self.step1.emedding_ + coords = self.step1.embedding_ self.cluster_data['x'] = coords[:,0] self.cluster_data['y'] = coords[:,1] super().after_run() diff --git a/datasets/comments_2_parquet_part2.py b/datasets/comments_2_parquet_part2.py index 1031c68..5b9a131 100755 --- a/datasets/comments_2_parquet_part2.py +++ b/datasets/comments_2_parquet_part2.py @@ -9,7 +9,7 @@ from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet") -conf = conf.set("spark.sql.shuffle.partitions",2000) +conf = conf.set("spark.sql.shuffle.partitions",2400) conf = conf.set('spark.sql.crossJoin.enabled',"true") conf = conf.set('spark.debug.maxToStringFields',200) sc = spark.sparkContext @@ -25,12 +25,13 @@ df = df.withColumn("Month",f.month(f.col("CreatedAt"))) df = df.withColumn("Year",f.year(f.col("CreatedAt"))) df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) -df = df.repartition('subreddit') -df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy') +# df = df.repartition(1200,'subreddit') +# df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) +# df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) +# df2.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy') -df = df.repartition('author') -df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') +#df = spark.read.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet") +df = df.repartition(2400,'author','subreddit',"Year","Month","Day") +df3 = df.sort(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True) +df3 = df3.sortWithinPartitions(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True) +df3.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') diff --git a/datasets/job_script.sh b/datasets/job_script.sh index 5b5a7d3..ca994d5 100755 --- a/datasets/job_script.sh +++ b/datasets/job_script.sh @@ -1,4 +1,6 @@ #!/usr/bin/bash +source ~/.bashrc +echo $(hostname) start_spark_cluster.sh -singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py -singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh +spark-submit --verbose --master spark://$(hostname):43015 submissions_2_parquet_part2.py +stop-all.sh diff --git a/datasets/submissions_2_parquet_part1.py b/datasets/submissions_2_parquet_part1.py index 77ae09f..d1a8a3d 100755 --- a/datasets/submissions_2_parquet_part1.py +++ b/datasets/submissions_2_parquet_part1.py @@ -58,7 +58,7 @@ def parse_submission(post, names = None): def parse_dump(partition): N=10000 - stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"]) + stream = open_fileset([f"/gscratch/comdata/raw_data/submissions/{partition}"]) rows = map(parse_submission,stream) schema = pa.schema([ pa.field('id', pa.string(),nullable=True), @@ -102,7 +102,7 @@ def parse_dump(partition): writer.close() -def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"): +def gen_task_list(dumpdir="/gscratch/comdata/raw_data/submissions"): files = list(find_dumps(dumpdir,base_pattern="RS_20*.*")) with open("submissions_task_list.sh",'w') as of: for fpath in files: diff --git a/datasets/submissions_2_parquet_part2.py b/datasets/submissions_2_parquet_part2.py index 3a58617..7dc4f74 100644 --- a/datasets/submissions_2_parquet_part2.py +++ b/datasets/submissions_2_parquet_part2.py @@ -29,14 +29,14 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3]) # next we gotta resort it all. -df = df.repartition("subreddit") -df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True) +df = df.repartition(800,"subreddit","Year","Month") +df2 = df.sort(["subreddit","Year","Month","CreatedAt","id"],ascending=True) df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True) df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') # # we also want to have parquet files sorted by author then reddit. -df = df.repartition("author") -df3 = df.sort(["author","CreatedAt","id"],ascending=True) +df = df.repartition(800,"author","subreddit","Year","Month") +df3 = df.sort(["author","Year","Month","CreatedAt","id"],ascending=True) df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True) df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')