#!/usr/bin/env bash # # Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to # allocate a Spark cluster across multiple nodes via salloc. Run this # from a login node. # # For the common single-fat-node case, use add_months.sh instead. # # Usage: # add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...] # # Example (2 nodes, 3 months): # add_months_multinode.sh 2 2025-01 2025-02 2025-03 # # Override dump directories via environment variables if needed: # # COMMENTS_DUMPDIR=/path/to/new/comments \ # SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ # ./add_months_multinode.sh 2 2025-01 2025-02 # # NOTE: This script and its workflow are written but not yet tested. # Remove this notice after a successful end-to-end run. set -e cd "$(dirname "$0")" NODES="${1:-}" if [ -z "$NODES" ] || [ $# -lt 2 ]; then echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2 exit 1 fi shift MONTHS=("$@") COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" PYTHON="${PYTHON:-python3}" TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet" STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet" STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet" STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet" LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet" LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" # --- Part 1: parse new months in parallel ----------------------------------- printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \ > add_months_comments_tasks.txt printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \ > add_months_submissions_tasks.txt parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \ < add_months_comments_tasks.txt parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ < add_months_submissions_tasks.txt # --- Part 2: sort new months into staging (multi-node Spark cluster) -------- start_spark_and_run.sh "$NODES" comments_part2.py \ --indir="$TEMP_COMMENTS" \ --out_by_subreddit="$STAGING_COMMENTS_SUB" \ --out_by_author="$STAGING_COMMENTS_AUTH" start_spark_and_run.sh "$NODES" submissions_part2.py \ --indir="$TEMP_SUBMISSIONS" \ --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ --out_by_author="$STAGING_SUBMISSIONS_AUTH" # --- Verify staging, then copy and cleanup manually ------------------------- # # See add_months.sh for verify/copy/cleanup commands — they are identical. exit 0 find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \; find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \; find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \; find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \; rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"