From 926e9bc364ac155895f245b4f569dca8b51b34e8 Mon Sep 17 00:00:00 2001 From: Benjamin Mako Hill Date: Mon, 25 May 2026 18:35:12 -0700 Subject: [PATCH] datasets/: fat-node add_months.sh; multinode variant as separate script add_months.sh now targets a single fat node directly: starts a local Spark cluster via start_spark_cluster.sh, submits jobs, stops the cluster. No salloc needed. add_months_multinode.sh is a new script for the multi-node case using start_spark_and_run.sh from a login node. Usage takes NODES as first arg. Co-Authored-By: Claude Sonnet 4.6 --- datasets/add_months.sh | 41 ++++++++------- datasets/add_months_multinode.sh | 88 ++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 19 deletions(-) create mode 100755 datasets/add_months_multinode.sh diff --git a/datasets/add_months.sh b/datasets/add_months.sh index fff2f72..0065d24 100755 --- a/datasets/add_months.sh +++ b/datasets/add_months.sh @@ -1,8 +1,9 @@ #!/usr/bin/env bash # # Add one or more new months to the existing parquet datasets using a -# layered append. The live datasets are never touched until the final -# copy step, so they remain safe and queryable throughout. +# layered append. Designed to run on a single fat node (e.g. cpu-g2 with +# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see +# add_months_multinode.sh. # # Usage: # add_months.sh YYYY-MM [YYYY-MM ...] @@ -22,18 +23,16 @@ # ./add_months.sh 2025-01 2025-02 # # Workflow: -# Part 1 — parse new .zst files into per-month parquets (compute node) -# Part 2 — sort into staging directories, not the live datasets (fat node) -# Verify — inspect staging before committing (manual step, not scripted) +# Part 1 — parse new .zst files into per-month parquets (parallel) +# Part 2 — sort into staging directories, not the live datasets (Spark) +# [script exits here — verify staging before continuing] # Copy — move staging files into live datasets (run manually after verify) # Cleanup — remove temp and staging dirs (run manually after copy) # -# Every command below is independently runnable for debugging. The copy -# and cleanup steps are intentionally left as separate commands so you can -# verify the staging output before touching the live datasets. -# # NOTE: This script and its workflow are written but not yet tested. # Remove this notice after a successful end-to-end run. +# +# Every command below is independently runnable for debugging. set -e cd "$(dirname "$0")" @@ -62,7 +61,7 @@ LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" -# --- Part 1: parse new months in parallel (run on a compute node) ----------- +# --- Part 1: parse new months in parallel ----------------------------------- printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \ > add_months_comments_tasks.txt @@ -76,26 +75,30 @@ parallel --joblog add_months_comments_joblog.txt --results add_months_comments_l parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ < add_months_submissions_tasks.txt -# --- Part 2: sort new months into staging (not the live datasets) ----------- -# -# start_spark_and_run.sh calls salloc — run from a login node, or replace -# with start_spark_cluster.sh + spark-submit if already on a suitable node. +# --- Part 2: sort new months into staging (Spark, single fat node) ---------- -start_spark_and_run.sh 1 comments_part2.py \ +source "$SPARK_CONF_DIR/spark-env.sh" +start_spark_cluster.sh + +spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ + comments_part2.py \ --indir="$TEMP_COMMENTS" \ --out_by_subreddit="$STAGING_COMMENTS_SUB" \ --out_by_author="$STAGING_COMMENTS_AUTH" -start_spark_and_run.sh 1 submissions_part2.py \ +spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ + submissions_part2.py \ --indir="$TEMP_SUBMISSIONS" \ --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ --out_by_author="$STAGING_SUBMISSIONS_AUTH" +stop-all.sh + # --- Verify: inspect staging before copying to live ------------------------- # -# The script stops here (exit 0 below). Check the staging output looks right -# before running the copy step manually. The live datasets are untouched at -# this point. Example checks: +# The script stops here. Check the staging output looks right before running +# the copy step manually. The live datasets are untouched at this point. +# Example checks: # # ls -lah "$STAGING_COMMENTS_SUB" | head # python3 -c " diff --git a/datasets/add_months_multinode.sh b/datasets/add_months_multinode.sh new file mode 100755 index 0000000..e16b373 --- /dev/null +++ b/datasets/add_months_multinode.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# +# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to +# allocate a Spark cluster across multiple nodes via salloc. Run this +# from a login node. +# +# For the common single-fat-node case, use add_months.sh instead. +# +# Usage: +# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...] +# +# Example (2 nodes, 3 months): +# add_months_multinode.sh 2 2025-01 2025-02 2025-03 +# +# Override dump directories via environment variables if needed: +# +# COMMENTS_DUMPDIR=/path/to/new/comments \ +# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ +# ./add_months_multinode.sh 2 2025-01 2025-02 +# +# NOTE: This script and its workflow are written but not yet tested. +# Remove this notice after a successful end-to-end run. + +set -e +cd "$(dirname "$0")" + +NODES="${1:-}" +if [ -z "$NODES" ] || [ $# -lt 2 ]; then + echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2 + exit 1 +fi +shift +MONTHS=("$@") + +COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" +SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" + +TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" +TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" +STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet" +STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet" +STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet" +STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet" +LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet" +LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" +LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" +LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" + +# --- Part 1: parse new months in parallel ----------------------------------- + +printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \ + > add_months_comments_tasks.txt + +printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \ + > add_months_submissions_tasks.txt + +parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \ + < add_months_comments_tasks.txt + +parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ + < add_months_submissions_tasks.txt + +# --- Part 2: sort new months into staging (multi-node Spark cluster) -------- + +start_spark_and_run.sh "$NODES" comments_part2.py \ + --indir="$TEMP_COMMENTS" \ + --out_by_subreddit="$STAGING_COMMENTS_SUB" \ + --out_by_author="$STAGING_COMMENTS_AUTH" + +start_spark_and_run.sh "$NODES" submissions_part2.py \ + --indir="$TEMP_SUBMISSIONS" \ + --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ + --out_by_author="$STAGING_SUBMISSIONS_AUTH" + +# --- Verify staging, then copy and cleanup manually ------------------------- +# +# See add_months.sh for verify/copy/cleanup commands — they are identical. + +exit 0 + +find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \; +find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \; +find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \; +find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \; + +rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" +rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" +rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"