#!/usr/bin/env bash # # Add one or more new months to the existing parquet datasets using a # layered append. Designed to run on a single fat node (e.g. cpu-g2 with # 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see # add_months_multinode.sh. # # Usage: # add_months.sh [--clean] YYYY-MM [YYYY-MM ...] # # Example: # add_months.sh 2025-01 2025-02 2025-03 # # If temp or staging directories from a previous run exist, the script # will exit with an error. Pass --clean to wipe them before starting: # # The new .zst dump files must live at: # $COMMENTS_DUMPDIR/RC_YYYY-MM.zst # $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst # # Override the dump directories via environment variables if the new files # are not in the standard locations: # # COMMENTS_DUMPDIR=/path/to/new/comments \ # SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ # ./add_months.sh 2025-01 2025-02 # # Workflow: # Part 1 — parse new .zst files into per-month parquets (parallel) # Part 2 — sort into staging directories, not the live datasets (Spark) # [script exits here — verify staging before continuing] # Copy — move staging files into live datasets (run manually after verify) # Cleanup — remove temp and staging dirs (run manually after copy) # # NOTE: This script and its workflow are written but not yet tested. # Remove this notice after a successful end-to-end run. # # Every command below is independently runnable for debugging. set -e cd "$(dirname "$0")" CLEAN=0 if [ "${1:-}" = "--clean" ]; then CLEAN=1 shift fi if [ $# -eq 0 ]; then echo "Usage: $0 [--clean] YYYY-MM [YYYY-MM ...]" >&2 exit 1 fi COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" PYTHON="${PYTHON:-python3}" # Part 1 temp dirs (per-month parquets, parsed from .zst) TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" # Staging dirs (sorted new layer; inspected before copying to live) STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet" STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet" STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet" STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet" # Live dataset dirs LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet" LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet" LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet" LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet" # --- Check for leftover output from a previous run -------------------------- EXISTING=() for d in "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" \ "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" \ "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"; do [ -e "$d" ] && EXISTING+=("$d") done if [ ${#EXISTING[@]} -gt 0 ]; then if [ $CLEAN -eq 1 ]; then echo "Removing leftover files from previous run..." rm -rf "${EXISTING[@]}" rm -f add_months_tasks.txt add_months_joblog.txt rm -rf add_months_logs/ else echo "Error: leftover files from a previous run exist:" >&2 printf ' %s\n' "${EXISTING[@]}" >&2 echo "Re-run with --clean to remove them before starting." >&2 exit 1 fi fi # --- Part 1: parse new months in parallel (comments and submissions together) - printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \ > add_months_tasks.txt printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \ >> add_months_tasks.txt parallel --joblog add_months_joblog.txt --results add_months_logs \ < add_months_tasks.txt # --- Part 2: sort new months into staging (Spark, single fat node) ---------- source "$SPARK_CONF_DIR/spark-env.sh" start_spark_cluster.sh spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ comments_part2.py \ --indir="$TEMP_COMMENTS" \ --out_by_subreddit="$STAGING_COMMENTS_SUB" \ --out_by_author="$STAGING_COMMENTS_AUTH" spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \ submissions_part2.py \ --indir="$TEMP_SUBMISSIONS" \ --out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \ --out_by_author="$STAGING_SUBMISSIONS_AUTH" stop-all.sh # --- Verify: inspect staging before copying to live ------------------------- # # The script stops here. Check the staging output looks right before running # the copy step manually. The live datasets are untouched at this point. # Example checks: # # ls -lah "$STAGING_COMMENTS_SUB" | head # python3 -c " # import pyarrow.parquet as pq, os # f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0] # t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc']) # print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py()) # " exit 0 # --- Copy: add staging files into live datasets ----------------------------- # # Run these lines manually after verifying staging. This is the only step # that touches the live datasets. It only adds new files — existing files # are never deleted or overwritten. find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \; find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \; find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \; find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \; # --- Cleanup: remove temp and staging dirs ---------------------------------- # # Run after confirming the copy succeeded and the live datasets look right. rm -f add_months_tasks.txt add_months_joblog.txt rm -rf add_months_logs/ rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"