18
0

datasets/: stage new layer before touching live datasets in add_months

Replace mode='append'-direct-to-live approach with a safer staging
workflow: Part 2 writes the new sorted layer to temp staging directories,
the user verifies, then a separate copy step adds the files to the live
datasets. Live datasets are never touched until the copy step, and the
copy only adds files — nothing is deleted or overwritten.

- sort_and_write gains out_by_subreddit/out_by_author params (replaces
  mode param) so Part 2 can target staging paths
- comments_part2.py, submissions_part2.py: expose new params via Fire
- add_months.sh: rewritten with explicit staging dirs, verify checkpoint,
  and find-based copy step

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 18:17:38 -07:00
parent 2d1d760142
commit 6b18840604
4 changed files with 89 additions and 33 deletions

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env bash
#
# Add one or more new months to the existing parquet datasets using a
# layered append. Part 1 runs on a compute node; Part 2 must be launched
# from a login node (it calls salloc via start_spark_and_run.sh).
# layered append. The live datasets are never touched until the final
# copy step, so they remain safe and queryable throughout.
#
# Usage:
# add_months.sh YYYY-MM [YYYY-MM ...]
@@ -21,15 +21,19 @@
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
# ./add_months.sh 2025-01 2025-02
#
# How layering works: Part 2 appends a new set of sorted partition files
# alongside the existing ones. Spark and DuckDB read all layers together
# transparently. Run merge_layers.sh to collapse layers into one when n
# gets large. Run build_from_scratch.sh to rebuild everything from raw dumps.
# Workflow:
# Part 1 — parse new .zst files into per-month parquets (compute node)
# Part 2 — sort into staging directories, not the live datasets (fat node)
# Verify — inspect staging before committing (manual step, not scripted)
# Copy — move staging files into live datasets (run manually after verify)
# Cleanup — remove temp and staging dirs (run manually after copy)
#
# Every command below is independently runnable for debugging. The copy
# and cleanup steps are intentionally left as separate commands so you can
# verify the staging output before touching the live datasets.
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
#
# Every command below is independently runnable for debugging.
set -e
cd "$(dirname "$0")"
@@ -42,37 +46,79 @@ fi
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
# Part 1 temp dirs (per-month parquets, parsed from .zst)
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
# --- Part 1: parse new months in parallel -----------------------------------
# Staging dirs (sorted new layer; inspected before copying to live)
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
# Live dataset dirs
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
# --- Part 1: parse new months in parallel (run on a compute node) -----------
# build task lists for the specified months
printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
> add_months_comments_tasks.txt
printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
> add_months_submissions_tasks.txt
# parse all new comment months in parallel
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
< add_months_comments_tasks.txt
# parse all new submission months in parallel
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
< add_months_submissions_tasks.txt
# --- Part 2: sort and append new layer (run from a login node) --------------
# --- Part 2: sort new months into staging (not the live datasets) -----------
#
# start_spark_and_run.sh calls salloc; run these two lines from a login node,
# not from within an existing compute allocation.
# start_spark_and_run.sh calls salloc run from a login node, or replace
# with start_spark_cluster.sh + spark-submit if already on a suitable node.
# append new comment layer to reddit_comments_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 comments_part2.py --indir="$TEMP_COMMENTS" --mode=append
start_spark_and_run.sh 1 comments_part2.py \
--indir="$TEMP_COMMENTS" \
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
--out_by_author="$STAGING_COMMENTS_AUTH"
# append new submission layer to reddit_submissions_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 submissions_part2.py --indir="$TEMP_SUBMISSIONS" --mode=append
start_spark_and_run.sh 1 submissions_part2.py \
--indir="$TEMP_SUBMISSIONS" \
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
# --- cleanup: remove temporary Part 1 files ---------------------------------
# --- Verify: inspect staging before copying to live -------------------------
#
# Stop here and check that the staging output looks right before running
# the copy step. The live datasets are untouched at this point. Example:
#
# ls -lah "$STAGING_COMMENTS_SUB" | head
# python3 -c "
# import pyarrow.parquet as pq, os
# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0]
# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc'])
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
# "
# --- Copy: add staging files into live datasets -----------------------------
#
# Run these lines manually after verifying staging. This is the only step
# that touches the live datasets. It only adds new files — existing files
# are never deleted or overwritten.
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
# --- Cleanup: remove temp and staging dirs ----------------------------------
#
# Run after confirming the copy succeeded and the live datasets look right.
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"

View File

@@ -6,8 +6,8 @@ Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 comments_part2.py --indir=/path/to/parquets --mode=append
--indir defaults to the temp comments dir in dumps_helper.py.
--mode defaults to 'overwrite'; use 'append' to add a new layer without
touching existing partition files (see add_months.sh).
--out_by_subreddit and --out_by_author default to the live dataset paths;
override them to write to staging directories first (see add_months.sh).
"""
import fire
@@ -15,4 +15,7 @@ from dumps_helper import COMMENTS, sort_and_write
if __name__ == "__main__":
fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(COMMENTS, indir=indir, mode=mode))
fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
sort_and_write(COMMENTS, indir=indir,
out_by_subreddit=out_by_subreddit,
out_by_author=out_by_author))

View File

@@ -256,13 +256,14 @@ def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
# --- Part 2: spark sort + repartition --------------------------------------
def sort_and_write(config, indir=None, mode='overwrite'):
def sort_and_write(config, indir=None, out_by_subreddit=None, out_by_author=None):
"""Read a directory of per-source parquets, sort and repartition twice
(once by subreddit, once by author), and write the two final datasets.
(once by subreddit, once by author), and write the two output datasets.
indir defaults to config['outdir']. mode is passed to parquet write and
may be 'overwrite' (default, used by build_from_scratch) or 'append'
(used by add_months to layer new data alongside existing files).
indir defaults to config['outdir'].
out_by_subreddit and out_by_author default to config['output_by_subreddit']
and config['output_by_author']. Override them to write to staging directories
instead of the live datasets (see add_months.sh).
Pyspark is imported lazily so Part 1 callers don't pay the Spark startup
cost.
@@ -270,6 +271,9 @@ def sort_and_write(config, indir=None, mode='overwrite'):
from pyspark.sql import SparkSession, functions as f
indir = indir or config['outdir']
out_by_subreddit = out_by_subreddit or config['output_by_subreddit']
out_by_author = out_by_author or config['output_by_author']
spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
df = spark.read.parquet(indir, compression='snappy')
@@ -286,12 +290,12 @@ def sort_and_write(config, indir=None, mode='overwrite'):
sub_keys = config['subreddit_sort_keys']
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
df_sub.write.parquet(config['output_by_subreddit'], mode=mode, compression='snappy')
df_sub.write.parquet(out_by_subreddit, mode='overwrite', compression='snappy')
auth_keys = config['author_sort_keys']
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
df_auth.write.parquet(config['output_by_author'], mode=mode, compression='snappy')
df_auth.write.parquet(out_by_author, mode='overwrite', compression='snappy')
def merge_layers(config):

View File

@@ -6,8 +6,8 @@ Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 submissions_part2.py --indir=/path/to/parquets --mode=append
--indir defaults to the temp submissions dir in dumps_helper.py.
--mode defaults to 'overwrite'; use 'append' to add a new layer without
touching existing partition files (see add_months.sh).
--out_by_subreddit and --out_by_author default to the live dataset paths;
override them to write to staging directories first (see add_months.sh).
"""
import fire
@@ -15,4 +15,7 @@ from dumps_helper import SUBMISSIONS, sort_and_write
if __name__ == "__main__":
fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(SUBMISSIONS, indir=indir, mode=mode))
fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
sort_and_write(SUBMISSIONS, indir=indir,
out_by_subreddit=out_by_subreddit,
out_by_author=out_by_author))