diff --git a/datasets/add_months.sh b/datasets/add_months.sh index fff2f72..0065d24 100755 --- a/datasets/add_months.sh +++ b/datasets/add_months.sh @@ -1,8 +1,9 @@ #!/usr/bin/env bash # # Add one or more new months to the existing parquet datasets using a -# layered append. The live datasets are never touched until the final -# copy step, so they remain safe and queryable throughout. +# layered append. Designed to run on a single fat node (e.g. cpu-g2 with +# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see +# add_months_multinode.sh. # # Usage: # add_months.sh YYYY-MM [YYYY-MM ...] @@ -22,18 +23,16 @@ # ./add_months.sh 2025-01 2025-02 # # Workflow: -# Part 1 — parse new .zst files into per-month parquets (compute node) -# Part 2 — sort into staging directories, not the live datasets (fat node) -# Verify — inspect staging before committing (manual step, not scripted) +# Part 1 — parse new .zst files into per-month parquets (parallel) +# Part 2 — sort into staging directories, not the live datasets (Spark) +# [script exits here — verify staging before continuing] # Copy — move staging files into live datasets (run manually after verify) # Cleanup — remove temp and staging dirs (run manually after copy) # -# Every command below is independently runnable for debugging. The copy -# and cleanup steps are intentionally left as separate commands so you can -# verify the staging output before touching the live datasets. -# # NOTE: This script and its workflow are written but not yet tested. # Remove this notice after a successful end-to-end run. +# +# Every command below is independently runnable for debugging. set -e cd "$(dirname "$0")" @@ -62,7 +61,7 @@ LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" -# --- Part 1: parse new months in parallel (run on a compute node) ----------- +# --- Part 1: parse new months in parallel ----------------------------------- printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \ > add_months_comments_tasks.txt @@ -76,26 +75,30 @@ parallel --joblog add_months_comments_joblog.txt --results add_months_comments_l parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ < add_months_submissions_tasks.txt -# --- Part 2: sort new months into staging (not the live datasets) ----------- -# -# start_spark_and_run.sh calls salloc — run from a login node, or replace -# with start_spark_cluster.sh + spark-submit if already on a suitable node. +# --- Part 2: sort new months into staging (Spark, single fat node) ---------- -start_spark_and_run.sh 1 comments_part2.py \ +source "$SPARK_CONF_DIR/spark-env.sh" +start_spark_cluster.sh + +spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ + comments_part2.py \ --indir="$TEMP_COMMENTS" \ --out_by_subreddit="$STAGING_COMMENTS_SUB" \ --out_by_author="$STAGING_COMMENTS_AUTH" -start_spark_and_run.sh 1 submissions_part2.py \ +spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ + submissions_part2.py \ --indir="$TEMP_SUBMISSIONS" \ --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ --out_by_author="$STAGING_SUBMISSIONS_AUTH" +stop-all.sh + # --- Verify: inspect staging before copying to live ------------------------- # -# The script stops here (exit 0 below). Check the staging output looks right -# before running the copy step manually. The live datasets are untouched at -# this point. Example checks: +# The script stops here. Check the staging output looks right before running +# the copy step manually. The live datasets are untouched at this point. +# Example checks: # # ls -lah "$STAGING_COMMENTS_SUB" | head # python3 -c " diff --git a/datasets/add_months_multinode.sh b/datasets/add_months_multinode.sh new file mode 100755 index 0000000..e16b373 --- /dev/null +++ b/datasets/add_months_multinode.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# +# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to +# allocate a Spark cluster across multiple nodes via salloc. Run this +# from a login node. +# +# For the common single-fat-node case, use add_months.sh instead. +# +# Usage: +# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...] +# +# Example (2 nodes, 3 months): +# add_months_multinode.sh 2 2025-01 2025-02 2025-03 +# +# Override dump directories via environment variables if needed: +# +# COMMENTS_DUMPDIR=/path/to/new/comments \ +# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ +# ./add_months_multinode.sh 2 2025-01 2025-02 +# +# NOTE: This script and its workflow are written but not yet tested. +# Remove this notice after a successful end-to-end run. + +set -e +cd "$(dirname "$0")" + +NODES="${1:-}" +if [ -z "$NODES" ] || [ $# -lt 2 ]; then + echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2 + exit 1 +fi +shift +MONTHS=("$@") + +COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" +SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" + +TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" +TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" +STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet" +STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet" +STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet" +STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet" +LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet" +LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" +LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" +LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" + +# --- Part 1: parse new months in parallel ----------------------------------- + +printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \ + > add_months_comments_tasks.txt + +printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \ + > add_months_submissions_tasks.txt + +parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \ + < add_months_comments_tasks.txt + +parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ + < add_months_submissions_tasks.txt + +# --- Part 2: sort new months into staging (multi-node Spark cluster) -------- + +start_spark_and_run.sh "$NODES" comments_part2.py \ + --indir="$TEMP_COMMENTS" \ + --out_by_subreddit="$STAGING_COMMENTS_SUB" \ + --out_by_author="$STAGING_COMMENTS_AUTH" + +start_spark_and_run.sh "$NODES" submissions_part2.py \ + --indir="$TEMP_SUBMISSIONS" \ + --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ + --out_by_author="$STAGING_SUBMISSIONS_AUTH" + +# --- Verify staging, then copy and cleanup manually ------------------------- +# +# See add_months.sh for verify/copy/cleanup commands — they are identical. + +exit 0 + +find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \; +find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \; +find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \; +find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \; + +rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" +rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" +rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"