The script now exits after Part 2 so the copy and cleanup commands must be run manually. This prevents the live datasets from being touched without a deliberate verification step in between. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
128 lines
5.4 KiB
Bash
Executable File
128 lines
5.4 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
#
|
|
# Add one or more new months to the existing parquet datasets using a
|
|
# layered append. The live datasets are never touched until the final
|
|
# copy step, so they remain safe and queryable throughout.
|
|
#
|
|
# Usage:
|
|
# add_months.sh YYYY-MM [YYYY-MM ...]
|
|
#
|
|
# Example:
|
|
# add_months.sh 2025-01 2025-02 2025-03
|
|
#
|
|
# The new .zst dump files must live at:
|
|
# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst
|
|
# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst
|
|
#
|
|
# Override the dump directories via environment variables if the new files
|
|
# are not in the standard locations:
|
|
#
|
|
# COMMENTS_DUMPDIR=/path/to/new/comments \
|
|
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
|
# ./add_months.sh 2025-01 2025-02
|
|
#
|
|
# Workflow:
|
|
# Part 1 — parse new .zst files into per-month parquets (compute node)
|
|
# Part 2 — sort into staging directories, not the live datasets (fat node)
|
|
# Verify — inspect staging before committing (manual step, not scripted)
|
|
# Copy — move staging files into live datasets (run manually after verify)
|
|
# Cleanup — remove temp and staging dirs (run manually after copy)
|
|
#
|
|
# Every command below is independently runnable for debugging. The copy
|
|
# and cleanup steps are intentionally left as separate commands so you can
|
|
# verify the staging output before touching the live datasets.
|
|
#
|
|
# NOTE: This script and its workflow are written but not yet tested.
|
|
# Remove this notice after a successful end-to-end run.
|
|
|
|
set -e
|
|
cd "$(dirname "$0")"
|
|
|
|
if [ $# -eq 0 ]; then
|
|
echo "Usage: $0 YYYY-MM [YYYY-MM ...]" >&2
|
|
exit 1
|
|
fi
|
|
|
|
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
|
|
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
|
|
|
|
# Part 1 temp dirs (per-month parquets, parsed from .zst)
|
|
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
|
|
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
|
|
|
|
# Staging dirs (sorted new layer; inspected before copying to live)
|
|
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
|
|
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
|
|
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
|
|
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
|
|
|
|
# Live dataset dirs
|
|
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
|
|
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
|
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
|
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
|
|
|
# --- Part 1: parse new months in parallel (run on a compute node) -----------
|
|
|
|
printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
|
|
> add_months_comments_tasks.txt
|
|
|
|
printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
|
|
> add_months_submissions_tasks.txt
|
|
|
|
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
|
|
< add_months_comments_tasks.txt
|
|
|
|
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
|
|
< add_months_submissions_tasks.txt
|
|
|
|
# --- Part 2: sort new months into staging (not the live datasets) -----------
|
|
#
|
|
# start_spark_and_run.sh calls salloc — run from a login node, or replace
|
|
# with start_spark_cluster.sh + spark-submit if already on a suitable node.
|
|
|
|
start_spark_and_run.sh 1 comments_part2.py \
|
|
--indir="$TEMP_COMMENTS" \
|
|
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
|
--out_by_author="$STAGING_COMMENTS_AUTH"
|
|
|
|
start_spark_and_run.sh 1 submissions_part2.py \
|
|
--indir="$TEMP_SUBMISSIONS" \
|
|
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
|
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
|
|
|
# --- Verify: inspect staging before copying to live -------------------------
|
|
#
|
|
# The script stops here (exit 0 below). Check the staging output looks right
|
|
# before running the copy step manually. The live datasets are untouched at
|
|
# this point. Example checks:
|
|
#
|
|
# ls -lah "$STAGING_COMMENTS_SUB" | head
|
|
# python3 -c "
|
|
# import pyarrow.parquet as pq, os
|
|
# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0]
|
|
# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc'])
|
|
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
|
|
# "
|
|
|
|
exit 0
|
|
|
|
# --- Copy: add staging files into live datasets -----------------------------
|
|
#
|
|
# Run these lines manually after verifying staging. This is the only step
|
|
# that touches the live datasets. It only adds new files — existing files
|
|
# are never deleted or overwritten.
|
|
|
|
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
|
|
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
|
|
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
|
|
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
|
|
|
|
# --- Cleanup: remove temp and staging dirs ----------------------------------
|
|
#
|
|
# Run after confirming the copy succeeded and the live datasets look right.
|
|
|
|
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
|
|
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
|
|
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"
|