datasets/: fat-node add_months.sh; multinode variant as separate script
add_months.sh now targets a single fat node directly: starts a local Spark cluster via start_spark_cluster.sh, submits jobs, stops the cluster. No salloc needed. add_months_multinode.sh is a new script for the multi-node case using start_spark_and_run.sh from a login node. Usage takes NODES as first arg. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Add one or more new months to the existing parquet datasets using a
|
||||
# layered append. The live datasets are never touched until the final
|
||||
# copy step, so they remain safe and queryable throughout.
|
||||
# layered append. Designed to run on a single fat node (e.g. cpu-g2 with
|
||||
# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see
|
||||
# add_months_multinode.sh.
|
||||
#
|
||||
# Usage:
|
||||
# add_months.sh YYYY-MM [YYYY-MM ...]
|
||||
@@ -22,18 +23,16 @@
|
||||
# ./add_months.sh 2025-01 2025-02
|
||||
#
|
||||
# Workflow:
|
||||
# Part 1 — parse new .zst files into per-month parquets (compute node)
|
||||
# Part 2 — sort into staging directories, not the live datasets (fat node)
|
||||
# Verify — inspect staging before committing (manual step, not scripted)
|
||||
# Part 1 — parse new .zst files into per-month parquets (parallel)
|
||||
# Part 2 — sort into staging directories, not the live datasets (Spark)
|
||||
# [script exits here — verify staging before continuing]
|
||||
# Copy — move staging files into live datasets (run manually after verify)
|
||||
# Cleanup — remove temp and staging dirs (run manually after copy)
|
||||
#
|
||||
# Every command below is independently runnable for debugging. The copy
|
||||
# and cleanup steps are intentionally left as separate commands so you can
|
||||
# verify the staging output before touching the live datasets.
|
||||
#
|
||||
# NOTE: This script and its workflow are written but not yet tested.
|
||||
# Remove this notice after a successful end-to-end run.
|
||||
#
|
||||
# Every command below is independently runnable for debugging.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
@@ -62,7 +61,7 @@ LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
||||
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
||||
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
||||
|
||||
# --- Part 1: parse new months in parallel (run on a compute node) -----------
|
||||
# --- Part 1: parse new months in parallel -----------------------------------
|
||||
|
||||
printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
|
||||
> add_months_comments_tasks.txt
|
||||
@@ -76,26 +75,30 @@ parallel --joblog add_months_comments_joblog.txt --results add_months_comments_l
|
||||
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
|
||||
< add_months_submissions_tasks.txt
|
||||
|
||||
# --- Part 2: sort new months into staging (not the live datasets) -----------
|
||||
#
|
||||
# start_spark_and_run.sh calls salloc — run from a login node, or replace
|
||||
# with start_spark_cluster.sh + spark-submit if already on a suitable node.
|
||||
# --- Part 2: sort new months into staging (Spark, single fat node) ----------
|
||||
|
||||
start_spark_and_run.sh 1 comments_part2.py \
|
||||
source "$SPARK_CONF_DIR/spark-env.sh"
|
||||
start_spark_cluster.sh
|
||||
|
||||
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
||||
comments_part2.py \
|
||||
--indir="$TEMP_COMMENTS" \
|
||||
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
||||
--out_by_author="$STAGING_COMMENTS_AUTH"
|
||||
|
||||
start_spark_and_run.sh 1 submissions_part2.py \
|
||||
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
||||
submissions_part2.py \
|
||||
--indir="$TEMP_SUBMISSIONS" \
|
||||
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
||||
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
||||
|
||||
stop-all.sh
|
||||
|
||||
# --- Verify: inspect staging before copying to live -------------------------
|
||||
#
|
||||
# The script stops here (exit 0 below). Check the staging output looks right
|
||||
# before running the copy step manually. The live datasets are untouched at
|
||||
# this point. Example checks:
|
||||
# The script stops here. Check the staging output looks right before running
|
||||
# the copy step manually. The live datasets are untouched at this point.
|
||||
# Example checks:
|
||||
#
|
||||
# ls -lah "$STAGING_COMMENTS_SUB" | head
|
||||
# python3 -c "
|
||||
|
||||
88
datasets/add_months_multinode.sh
Executable file
88
datasets/add_months_multinode.sh
Executable file
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to
|
||||
# allocate a Spark cluster across multiple nodes via salloc. Run this
|
||||
# from a login node.
|
||||
#
|
||||
# For the common single-fat-node case, use add_months.sh instead.
|
||||
#
|
||||
# Usage:
|
||||
# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...]
|
||||
#
|
||||
# Example (2 nodes, 3 months):
|
||||
# add_months_multinode.sh 2 2025-01 2025-02 2025-03
|
||||
#
|
||||
# Override dump directories via environment variables if needed:
|
||||
#
|
||||
# COMMENTS_DUMPDIR=/path/to/new/comments \
|
||||
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
||||
# ./add_months_multinode.sh 2 2025-01 2025-02
|
||||
#
|
||||
# NOTE: This script and its workflow are written but not yet tested.
|
||||
# Remove this notice after a successful end-to-end run.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
NODES="${1:-}"
|
||||
if [ -z "$NODES" ] || [ $# -lt 2 ]; then
|
||||
echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2
|
||||
exit 1
|
||||
fi
|
||||
shift
|
||||
MONTHS=("$@")
|
||||
|
||||
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
|
||||
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
|
||||
|
||||
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
|
||||
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
|
||||
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
|
||||
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
|
||||
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
|
||||
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
|
||||
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
|
||||
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
||||
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
||||
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
||||
|
||||
# --- Part 1: parse new months in parallel -----------------------------------
|
||||
|
||||
printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \
|
||||
> add_months_comments_tasks.txt
|
||||
|
||||
printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \
|
||||
> add_months_submissions_tasks.txt
|
||||
|
||||
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
|
||||
< add_months_comments_tasks.txt
|
||||
|
||||
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
|
||||
< add_months_submissions_tasks.txt
|
||||
|
||||
# --- Part 2: sort new months into staging (multi-node Spark cluster) --------
|
||||
|
||||
start_spark_and_run.sh "$NODES" comments_part2.py \
|
||||
--indir="$TEMP_COMMENTS" \
|
||||
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
||||
--out_by_author="$STAGING_COMMENTS_AUTH"
|
||||
|
||||
start_spark_and_run.sh "$NODES" submissions_part2.py \
|
||||
--indir="$TEMP_SUBMISSIONS" \
|
||||
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
||||
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
||||
|
||||
# --- Verify staging, then copy and cleanup manually -------------------------
|
||||
#
|
||||
# See add_months.sh for verify/copy/cleanup commands — they are identical.
|
||||
|
||||
exit 0
|
||||
|
||||
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
|
||||
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
|
||||
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
|
||||
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
|
||||
|
||||
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
|
||||
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
|
||||
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"
|
||||
Reference in New Issue
Block a user