diff --git a/datasets/add_months.sh b/datasets/add_months.sh index 3da60a6..c046235 100755 --- a/datasets/add_months.sh +++ b/datasets/add_months.sh @@ -1,8 +1,8 @@ #!/usr/bin/env bash # # Add one or more new months to the existing parquet datasets using a -# layered append. Part 1 runs on a compute node; Part 2 must be launched -# from a login node (it calls salloc via start_spark_and_run.sh). +# layered append. The live datasets are never touched until the final +# copy step, so they remain safe and queryable throughout. # # Usage: # add_months.sh YYYY-MM [YYYY-MM ...] @@ -21,15 +21,19 @@ # SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ # ./add_months.sh 2025-01 2025-02 # -# How layering works: Part 2 appends a new set of sorted partition files -# alongside the existing ones. Spark and DuckDB read all layers together -# transparently. Run merge_layers.sh to collapse layers into one when n -# gets large. Run build_from_scratch.sh to rebuild everything from raw dumps. +# Workflow: +# Part 1 — parse new .zst files into per-month parquets (compute node) +# Part 2 — sort into staging directories, not the live datasets (fat node) +# Verify — inspect staging before committing (manual step, not scripted) +# Copy — move staging files into live datasets (run manually after verify) +# Cleanup — remove temp and staging dirs (run manually after copy) +# +# Every command below is independently runnable for debugging. The copy +# and cleanup steps are intentionally left as separate commands so you can +# verify the staging output before touching the live datasets. # # NOTE: This script and its workflow are written but not yet tested. # Remove this notice after a successful end-to-end run. -# -# Every command below is independently runnable for debugging. set -e cd "$(dirname "$0")" @@ -42,37 +46,79 @@ fi COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" +# Part 1 temp dirs (per-month parquets, parsed from .zst) TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" -# --- Part 1: parse new months in parallel ----------------------------------- +# Staging dirs (sorted new layer; inspected before copying to live) +STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet" +STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet" +STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet" +STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet" + +# Live dataset dirs +LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet" +LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" +LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" +LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" + +# --- Part 1: parse new months in parallel (run on a compute node) ----------- -# build task lists for the specified months printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \ > add_months_comments_tasks.txt printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \ > add_months_submissions_tasks.txt -# parse all new comment months in parallel parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \ < add_months_comments_tasks.txt -# parse all new submission months in parallel parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ < add_months_submissions_tasks.txt -# --- Part 2: sort and append new layer (run from a login node) -------------- +# --- Part 2: sort new months into staging (not the live datasets) ----------- # -# start_spark_and_run.sh calls salloc; run these two lines from a login node, -# not from within an existing compute allocation. +# start_spark_and_run.sh calls salloc — run from a login node, or replace +# with start_spark_cluster.sh + spark-submit if already on a suitable node. -# append new comment layer to reddit_comments_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 comments_part2.py --indir="$TEMP_COMMENTS" --mode=append +start_spark_and_run.sh 1 comments_part2.py \ + --indir="$TEMP_COMMENTS" \ + --out_by_subreddit="$STAGING_COMMENTS_SUB" \ + --out_by_author="$STAGING_COMMENTS_AUTH" -# append new submission layer to reddit_submissions_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 submissions_part2.py --indir="$TEMP_SUBMISSIONS" --mode=append +start_spark_and_run.sh 1 submissions_part2.py \ + --indir="$TEMP_SUBMISSIONS" \ + --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ + --out_by_author="$STAGING_SUBMISSIONS_AUTH" -# --- cleanup: remove temporary Part 1 files --------------------------------- +# --- Verify: inspect staging before copying to live ------------------------- +# +# Stop here and check that the staging output looks right before running +# the copy step. The live datasets are untouched at this point. Example: +# +# ls -lah "$STAGING_COMMENTS_SUB" | head +# python3 -c " +# import pyarrow.parquet as pq, os +# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0] +# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc']) +# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py()) +# " + +# --- Copy: add staging files into live datasets ----------------------------- +# +# Run these lines manually after verifying staging. This is the only step +# that touches the live datasets. It only adds new files — existing files +# are never deleted or overwritten. + +find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \; +find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \; +find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \; +find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \; + +# --- Cleanup: remove temp and staging dirs ---------------------------------- +# +# Run after confirming the copy succeeded and the live datasets look right. rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" +rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" +rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH" diff --git a/datasets/comments_part2.py b/datasets/comments_part2.py index bb98ac0..f767b5f 100755 --- a/datasets/comments_part2.py +++ b/datasets/comments_part2.py @@ -6,8 +6,8 @@ Must be launched from a login node via the Hyak-provided wrapper: start_spark_and_run.sh 1 comments_part2.py --indir=/path/to/parquets --mode=append --indir defaults to the temp comments dir in dumps_helper.py. ---mode defaults to 'overwrite'; use 'append' to add a new layer without -touching existing partition files (see add_months.sh). +--out_by_subreddit and --out_by_author default to the live dataset paths; +override them to write to staging directories first (see add_months.sh). """ import fire @@ -15,4 +15,7 @@ from dumps_helper import COMMENTS, sort_and_write if __name__ == "__main__": - fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(COMMENTS, indir=indir, mode=mode)) + fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None: + sort_and_write(COMMENTS, indir=indir, + out_by_subreddit=out_by_subreddit, + out_by_author=out_by_author)) diff --git a/datasets/dumps_helper.py b/datasets/dumps_helper.py index 1d85420..bfd6ae6 100644 --- a/datasets/dumps_helper.py +++ b/datasets/dumps_helper.py @@ -256,13 +256,14 @@ def gen_task_list(config, script_name, dumpdir=None, tasklist=None): # --- Part 2: spark sort + repartition -------------------------------------- -def sort_and_write(config, indir=None, mode='overwrite'): +def sort_and_write(config, indir=None, out_by_subreddit=None, out_by_author=None): """Read a directory of per-source parquets, sort and repartition twice - (once by subreddit, once by author), and write the two final datasets. + (once by subreddit, once by author), and write the two output datasets. - indir defaults to config['outdir']. mode is passed to parquet write and - may be 'overwrite' (default, used by build_from_scratch) or 'append' - (used by add_months to layer new data alongside existing files). + indir defaults to config['outdir']. + out_by_subreddit and out_by_author default to config['output_by_subreddit'] + and config['output_by_author']. Override them to write to staging directories + instead of the live datasets (see add_months.sh). Pyspark is imported lazily so Part 1 callers don't pay the Spark startup cost. @@ -270,6 +271,9 @@ def sort_and_write(config, indir=None, mode='overwrite'): from pyspark.sql import SparkSession, functions as f indir = indir or config['outdir'] + out_by_subreddit = out_by_subreddit or config['output_by_subreddit'] + out_by_author = out_by_author or config['output_by_author'] + spark = SparkSession.builder.appName(config['app_name']).getOrCreate() df = spark.read.parquet(indir, compression='snappy') @@ -286,12 +290,12 @@ def sort_and_write(config, indir=None, mode='overwrite'): sub_keys = config['subreddit_sort_keys'] df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) - df_sub.write.parquet(config['output_by_subreddit'], mode=mode, compression='snappy') + df_sub.write.parquet(out_by_subreddit, mode='overwrite', compression='snappy') auth_keys = config['author_sort_keys'] df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) - df_auth.write.parquet(config['output_by_author'], mode=mode, compression='snappy') + df_auth.write.parquet(out_by_author, mode='overwrite', compression='snappy') def merge_layers(config): diff --git a/datasets/submissions_part2.py b/datasets/submissions_part2.py index 7a45bb9..d585b8d 100755 --- a/datasets/submissions_part2.py +++ b/datasets/submissions_part2.py @@ -6,8 +6,8 @@ Must be launched from a login node via the Hyak-provided wrapper: start_spark_and_run.sh 1 submissions_part2.py --indir=/path/to/parquets --mode=append --indir defaults to the temp submissions dir in dumps_helper.py. ---mode defaults to 'overwrite'; use 'append' to add a new layer without -touching existing partition files (see add_months.sh). +--out_by_subreddit and --out_by_author default to the live dataset paths; +override them to write to staging directories first (see add_months.sh). """ import fire @@ -15,4 +15,7 @@ from dumps_helper import SUBMISSIONS, sort_and_write if __name__ == "__main__": - fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(SUBMISSIONS, indir=indir, mode=mode)) + fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None: + sort_and_write(SUBMISSIONS, indir=indir, + out_by_subreddit=out_by_subreddit, + out_by_author=out_by_author))