GNU parallel spawns fresh shells that don't inherit the active venv. Using an explicit PYTHON path ensures the right interpreter is used in parallel tasks. Defaults to python3 but can be overridden: PYTHON=/path/to/venv/bin/python3 ./add_months.sh ... Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
132 lines
5.3 KiB
Bash
Executable File
132 lines
5.3 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
#
|
|
# Add one or more new months to the existing parquet datasets using a
|
|
# layered append. Designed to run on a single fat node (e.g. cpu-g2 with
|
|
# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see
|
|
# add_months_multinode.sh.
|
|
#
|
|
# Usage:
|
|
# add_months.sh YYYY-MM [YYYY-MM ...]
|
|
#
|
|
# Example:
|
|
# add_months.sh 2025-01 2025-02 2025-03
|
|
#
|
|
# The new .zst dump files must live at:
|
|
# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst
|
|
# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst
|
|
#
|
|
# Override the dump directories via environment variables if the new files
|
|
# are not in the standard locations:
|
|
#
|
|
# COMMENTS_DUMPDIR=/path/to/new/comments \
|
|
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
|
# ./add_months.sh 2025-01 2025-02
|
|
#
|
|
# Workflow:
|
|
# Part 1 — parse new .zst files into per-month parquets (parallel)
|
|
# Part 2 — sort into staging directories, not the live datasets (Spark)
|
|
# [script exits here — verify staging before continuing]
|
|
# Copy — move staging files into live datasets (run manually after verify)
|
|
# Cleanup — remove temp and staging dirs (run manually after copy)
|
|
#
|
|
# NOTE: This script and its workflow are written but not yet tested.
|
|
# Remove this notice after a successful end-to-end run.
|
|
#
|
|
# Every command below is independently runnable for debugging.
|
|
|
|
set -e
|
|
cd "$(dirname "$0")"
|
|
|
|
if [ $# -eq 0 ]; then
|
|
echo "Usage: $0 YYYY-MM [YYYY-MM ...]" >&2
|
|
exit 1
|
|
fi
|
|
|
|
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
|
|
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
|
|
PYTHON="${PYTHON:-python3}"
|
|
|
|
# Part 1 temp dirs (per-month parquets, parsed from .zst)
|
|
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
|
|
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
|
|
|
|
# Staging dirs (sorted new layer; inspected before copying to live)
|
|
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
|
|
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
|
|
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
|
|
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
|
|
|
|
# Live dataset dirs
|
|
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
|
|
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
|
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
|
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
|
|
|
# --- Part 1: parse new months in parallel -----------------------------------
|
|
|
|
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
|
|
> add_months_comments_tasks.txt
|
|
|
|
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
|
|
> add_months_submissions_tasks.txt
|
|
|
|
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
|
|
< add_months_comments_tasks.txt
|
|
|
|
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
|
|
< add_months_submissions_tasks.txt
|
|
|
|
# --- Part 2: sort new months into staging (Spark, single fat node) ----------
|
|
|
|
source "$SPARK_CONF_DIR/spark-env.sh"
|
|
start_spark_cluster.sh
|
|
|
|
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
|
comments_part2.py \
|
|
--indir="$TEMP_COMMENTS" \
|
|
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
|
--out_by_author="$STAGING_COMMENTS_AUTH"
|
|
|
|
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
|
submissions_part2.py \
|
|
--indir="$TEMP_SUBMISSIONS" \
|
|
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
|
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
|
|
|
stop-all.sh
|
|
|
|
# --- Verify: inspect staging before copying to live -------------------------
|
|
#
|
|
# The script stops here. Check the staging output looks right before running
|
|
# the copy step manually. The live datasets are untouched at this point.
|
|
# Example checks:
|
|
#
|
|
# ls -lah "$STAGING_COMMENTS_SUB" | head
|
|
# python3 -c "
|
|
# import pyarrow.parquet as pq, os
|
|
# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0]
|
|
# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc'])
|
|
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
|
|
# "
|
|
|
|
exit 0
|
|
|
|
# --- Copy: add staging files into live datasets -----------------------------
|
|
#
|
|
# Run these lines manually after verifying staging. This is the only step
|
|
# that touches the live datasets. It only adds new files — existing files
|
|
# are never deleted or overwritten.
|
|
|
|
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
|
|
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
|
|
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
|
|
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
|
|
|
|
# --- Cleanup: remove temp and staging dirs ----------------------------------
|
|
#
|
|
# Run after confirming the copy succeeded and the live datasets look right.
|
|
|
|
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
|
|
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
|
|
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"
|