diff --git a/datasets/README.md b/datasets/README.md index f5a6a4b..52229d3 100644 --- a/datasets/README.md +++ b/datasets/README.md @@ -43,34 +43,71 @@ The final datasets are in `/gscratch/comdata/output`: | Script | Role | |---|---| | `comments_part1.py`, `submissions_part1.py` | Part 1 entry points. Each parses one compressed dump into one parquet file. `parse_dump ` and `gen_task_list` subcommands via fire. | -| `comments_part2.py`, `submissions_part2.py` | Part 2 entry points. Each is a Spark job that reads the directory of per-source parquets and writes the final `*_by_subreddit.parquet` and `*_by_author.parquet` datasets. Launched via `start_spark_and_run.sh`. | -| `dumps_helper.py` | Shared module. Schemas, the simdjson parser, a generic parse loop with per-field handler dispatch, and the `parse_dump` / `gen_task_list` / `sort_and_write` workers that the entry-point scripts wrap. Adding a new dump type or a new field is a one-place edit. | +| `comments_part2.py`, `submissions_part2.py` | Part 2 entry points. Each is a Spark job that reads a directory of per-source parquets and writes the final `*_by_subreddit.parquet` and `*_by_author.parquet` datasets. Accepts `--indir` and `--mode` to support layered appends; defaults match the build-from-scratch workflow. | +| `comments_merge.py`, `submissions_merge.py` | Merge entry points. Each is a Spark job that collapses all accumulated layers in the final datasets into a single clean layer. Launched via `start_spark_and_run.sh`. | +| `dumps_helper.py` | Shared module. Schemas, the simdjson parser, a generic parse loop with per-field handler dispatch, and the `parse_dump` / `gen_task_list` / `sort_and_write` / `merge_layers` workers that the entry-point scripts wrap. Adding a new dump type or a new field is a one-place edit. | | `helper.py` | Lower-level helpers for opening compressed dump files (`.zst`, `.xz`, `.bz2`, `.gz`). | -## The two workflows - -There are two ways to run the pipeline; pick the one that matches your -situation. +## The three workflows ### Build from scratch — `build_from_scratch.sh` Use this when there is no existing parquet output, or when the upstream data has changed in a way that requires reparsing everything. Wipes the per-source temp directories, processes every `RC_*` / `RS_*` dump in the -raw dumps directory through Part 1, then runs the Part 2 Spark sort. +raw dumps directory through Part 1 (in parallel via GNU parallel), then +runs the Part 2 Spark sort. -### Add a new month — `add_new_month.sh YYYY-MM` +### Add new months — `add_months.sh YYYY-MM [YYYY-MM ...]` -Use this when one or more months of new dump files have arrived and you -just want to bring the existing datasets up to date. Processes only the -specified month's `RC_.zst` and `RS_.zst` files through -Part 1 (the existing per-source parquet files are left in place), then -re-runs the Part 2 Spark sort over the full temp directory so the final -datasets pick up the new data. +> **NOTE: written but not yet tested. Remove this notice after a +> successful end-to-end run.** -The Part 2 sort is global and not incremental, so each monthly add -re-sorts the entire corpus. That's fine for a monthly cadence; it would -need a rearchitecture if the cost became a problem. +Use this for routine incremental updates. Runs Part 1 on only the +specified months, then appends the sorted output as a new layer of +partition files alongside the existing ones. No existing data is +rewritten. + +Each run adds one layer to each final dataset directory. Spark and DuckDB +read all layers together correctly. At a yearly update cadence the number +of layers stays small; use `merge_layers.sh` to collapse them when +needed. + +The new `.zst` dump files must be accessible at `COMMENTS_DUMPDIR` and +`SUBMISSIONS_DUMPDIR`. Override the defaults (which match `dumps_helper.py`) +via environment variables if the files are not in the standard locations: + +```sh +COMMENTS_DUMPDIR=/path/to/new/comments \ +SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ +./add_months.sh 2025-01 2025-02 2025-03 +``` + +Part 1 runs directly on a compute node. For Part 2 there are two options: + +- **Single fat node** (simpler, often faster for smaller sorts): `salloc` + a `cpu-g2` node (128 cores, ~1 TB RAM) and run the Part 2 script + directly with `spark-submit` or `python3`. See Step 6 of the walkthrough + below for the `salloc` invocation. +- **Multi-node Spark cluster**: use `start_spark_and_run.sh` from a login + node. It allocates nodes via `salloc` and handles cluster coordination. + Pass the number of nodes as the first argument. + +### Merge layers — `merge_layers.sh` + +> **NOTE: written but not yet tested. Remove this notice after a +> successful end-to-end run.** + +Use this to collapse accumulated layers from incremental adds into a +single clean layer. Reads the existing final datasets, re-sorts +everything, writes to `.merging` temp paths, then atomically replaces the +originals via rename. + +Run this when query performance has degraded due to many layers, or any +time you want a clean single-file-per-partition layout. The existing +datasets are safe until the rename step completes; see `merge_layers.sh` +for recovery notes if interrupted. As with `add_months.sh`, Part 2 can +run on a single fat node or via `start_spark_and_run.sh`. ## Running steps individually diff --git a/datasets/add_months.sh b/datasets/add_months.sh new file mode 100755 index 0000000..3da60a6 --- /dev/null +++ b/datasets/add_months.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +# +# Add one or more new months to the existing parquet datasets using a +# layered append. Part 1 runs on a compute node; Part 2 must be launched +# from a login node (it calls salloc via start_spark_and_run.sh). +# +# Usage: +# add_months.sh YYYY-MM [YYYY-MM ...] +# +# Example: +# add_months.sh 2025-01 2025-02 2025-03 +# +# The new .zst dump files must live at: +# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst +# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst +# +# Override the dump directories via environment variables if the new files +# are not in the standard locations: +# +# COMMENTS_DUMPDIR=/path/to/new/comments \ +# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \ +# ./add_months.sh 2025-01 2025-02 +# +# How layering works: Part 2 appends a new set of sorted partition files +# alongside the existing ones. Spark and DuckDB read all layers together +# transparently. Run merge_layers.sh to collapse layers into one when n +# gets large. Run build_from_scratch.sh to rebuild everything from raw dumps. +# +# NOTE: This script and its workflow are written but not yet tested. +# Remove this notice after a successful end-to-end run. +# +# Every command below is independently runnable for debugging. + +set -e +cd "$(dirname "$0")" + +if [ $# -eq 0 ]; then + echo "Usage: $0 YYYY-MM [YYYY-MM ...]" >&2 + exit 1 +fi + +COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}" +SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}" + +TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet" +TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet" + +# --- Part 1: parse new months in parallel ----------------------------------- + +# build task lists for the specified months +printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \ + > add_months_comments_tasks.txt + +printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \ + > add_months_submissions_tasks.txt + +# parse all new comment months in parallel +parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \ + < add_months_comments_tasks.txt + +# parse all new submission months in parallel +parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \ + < add_months_submissions_tasks.txt + +# --- Part 2: sort and append new layer (run from a login node) -------------- +# +# start_spark_and_run.sh calls salloc; run these two lines from a login node, +# not from within an existing compute allocation. + +# append new comment layer to reddit_comments_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 comments_part2.py --indir="$TEMP_COMMENTS" --mode=append + +# append new submission layer to reddit_submissions_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 submissions_part2.py --indir="$TEMP_SUBMISSIONS" --mode=append + +# --- cleanup: remove temporary Part 1 files --------------------------------- + +rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" diff --git a/datasets/add_new_month.sh b/datasets/add_new_month.sh deleted file mode 100755 index 5e0af8d..0000000 --- a/datasets/add_new_month.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash -# -# Add a single new month of dumps to the existing parquet datasets. -# -# Processes only the RC_.zst and RS_.zst files (Part 1), -# leaving the existing per-source temp parquet files untouched, then -# re-runs the Part 2 Spark sort + repartition over the full temp dir so -# the final by_subreddit / by_author datasets pick up the new data. -# -# Usage: -# add_new_month.sh YYYY-MM -# -# Example: -# add_new_month.sh 2025-03 -# -# Every command below is independently runnable — to debug, copy a line -# out and run it directly. For a full rebuild instead, see -# build_from_scratch.sh. -# -# Note on cost: Part 2 always re-sorts the full corpus (the sort is global, -# not incremental), so this gets slightly slower each month. For the -# monthly cadence this is fine; if the sort becomes a bottleneck we'd -# need to rearchitect Part 2 to merge-append instead of re-sort. - -set -e -cd "$(dirname "$0")" - -MONTH="${1:-}" -if [ -z "$MONTH" ]; then - echo "Usage: $0 YYYY-MM" >&2 - exit 1 -fi - -# --- Part 1: parse the new month's dumps (no wipe) ------------------------- - -# parse the new comments file -python3 comments_part1.py parse_dump "RC_${MONTH}.zst" - -# parse the new submissions file -python3 submissions_part1.py parse_dump "RS_${MONTH}.zst" - -# --- Part 2: re-sort the full corpus including the new data --------------- - -# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 comments_part2.py - -# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 submissions_part2.py diff --git a/datasets/build_from_scratch.sh b/datasets/build_from_scratch.sh index c5be98f..04cde28 100755 --- a/datasets/build_from_scratch.sh +++ b/datasets/build_from_scratch.sh @@ -16,8 +16,8 @@ # - GNU parallel installed # - start_spark_and_run.sh on PATH (Hyak-provided wrapper) # -# To add one new month to an existing build instead of rebuilding from -# scratch, use add_new_month.sh. +# To add new months to an existing build without rebuilding from scratch, +# use add_months.sh. set -e cd "$(dirname "$0")" diff --git a/datasets/comments_merge.py b/datasets/comments_merge.py new file mode 100644 index 0000000..732797c --- /dev/null +++ b/datasets/comments_merge.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Collapse all layers in the comments final datasets into a single clean layer. + +Must be launched from a login node via the Hyak-provided wrapper: + start_spark_and_run.sh 1 comments_merge.py + +See merge_layers.sh and dumps_helper.merge_layers for details. +""" + +from dumps_helper import COMMENTS, merge_layers + + +if __name__ == "__main__": + merge_layers(COMMENTS) diff --git a/datasets/comments_part2.py b/datasets/comments_part2.py index 283d8f3..bb98ac0 100755 --- a/datasets/comments_part2.py +++ b/datasets/comments_part2.py @@ -1,14 +1,18 @@ #!/usr/bin/env python3 -"""Part 2 for comments: Spark sort + repartition the per-source parquets -produced by comments_part1.py into the final by_subreddit / by_author -datasets. +"""Part 2 for comments: Spark sort + repartition into the final datasets. -Launched via the Hyak-provided start_spark_and_run.sh wrapper: +Must be launched from a login node via the Hyak-provided wrapper: start_spark_and_run.sh 1 comments_part2.py + start_spark_and_run.sh 1 comments_part2.py --indir=/path/to/parquets --mode=append + +--indir defaults to the temp comments dir in dumps_helper.py. +--mode defaults to 'overwrite'; use 'append' to add a new layer without +touching existing partition files (see add_months.sh). """ +import fire from dumps_helper import COMMENTS, sort_and_write if __name__ == "__main__": - sort_and_write(COMMENTS) + fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(COMMENTS, indir=indir, mode=mode)) diff --git a/datasets/dumps_helper.py b/datasets/dumps_helper.py index 2c9a575..1d85420 100644 --- a/datasets/dumps_helper.py +++ b/datasets/dumps_helper.py @@ -10,6 +10,7 @@ task-list generator, and the Spark sort are all shared here. """ import os +import shutil from datetime import datetime from itertools import islice @@ -255,16 +256,23 @@ def gen_task_list(config, script_name, dumpdir=None, tasklist=None): # --- Part 2: spark sort + repartition -------------------------------------- -def sort_and_write(config): - """Read the directory of per-source parquets, sort and repartition - twice (once by subreddit, once by author), and write the two final - datasets. Pyspark is imported lazily so Part 1 callers don't pay the - Spark startup cost.""" +def sort_and_write(config, indir=None, mode='overwrite'): + """Read a directory of per-source parquets, sort and repartition twice + (once by subreddit, once by author), and write the two final datasets. + + indir defaults to config['outdir']. mode is passed to parquet write and + may be 'overwrite' (default, used by build_from_scratch) or 'append' + (used by add_months to layer new data alongside existing files). + + Pyspark is imported lazily so Part 1 callers don't pay the Spark startup + cost. + """ from pyspark.sql import SparkSession, functions as f + indir = indir or config['outdir'] spark = SparkSession.builder.appName(config['app_name']).getOrCreate() - df = spark.read.parquet(config['outdir'], compression='snappy') + df = spark.read.parquet(indir, compression='snappy') df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') @@ -278,9 +286,54 @@ def sort_and_write(config): sub_keys = config['subreddit_sort_keys'] df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) - df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') + df_sub.write.parquet(config['output_by_subreddit'], mode=mode, compression='snappy') auth_keys = config['author_sort_keys'] df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) - df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy') + df_auth.write.parquet(config['output_by_author'], mode=mode, compression='snappy') + + +def merge_layers(config): + """Collapse all accumulated layers in the final datasets into a single + clean layer. Reads the existing by_subreddit dataset (which contains all + layers), re-sorts twice, writes to temp paths, then atomically replaces + the originals by renaming. + + Safe to interrupt after the writes complete but before the renames — the + originals are untouched until the .merging directories exist. The .old + directories are left behind if the process is interrupted after renaming; + delete them manually once satisfied. + + Pyspark is imported lazily so Part 1 callers don't pay the Spark startup + cost. + """ + from pyspark.sql import SparkSession + + spark = SparkSession.builder.appName(config['app_name'] + ' merge layers').getOrCreate() + + # Both final datasets have identical rows; read from by_subreddit. + df = spark.read.parquet(config['output_by_subreddit']) + + tmp_sub = config['output_by_subreddit'] + '.merging' + tmp_auth = config['output_by_author'] + '.merging' + + sub_keys = config['subreddit_sort_keys'] + df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) + df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) + df_sub.write.parquet(tmp_sub, mode='overwrite', compression='snappy') + + auth_keys = config['author_sort_keys'] + df_auth = df.repartition('author').sort(auth_keys, ascending=True) + df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) + df_auth.write.parquet(tmp_auth, mode='overwrite', compression='snappy') + + # Atomic swap: rename old → .old, then .merging → final, then delete .old. + old_sub = config['output_by_subreddit'] + '.old' + old_auth = config['output_by_author'] + '.old' + os.rename(config['output_by_subreddit'], old_sub) + os.rename(tmp_sub, config['output_by_subreddit']) + os.rename(config['output_by_author'], old_auth) + os.rename(tmp_auth, config['output_by_author']) + shutil.rmtree(old_sub) + shutil.rmtree(old_auth) diff --git a/datasets/merge_layers.sh b/datasets/merge_layers.sh new file mode 100755 index 0000000..567a6d5 --- /dev/null +++ b/datasets/merge_layers.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# +# Collapse all accumulated layers in the final parquet datasets into a +# single clean layer. Use this after several incremental adds via +# add_months.sh when you want to reduce the number of partition files. +# +# Reads the existing by_subreddit / by_author datasets, re-sorts everything, +# writes to temp paths, then atomically replaces the originals via rename. +# The old directories are removed once the new ones are in place. +# +# If the process is interrupted after writing the .merging directories but +# before the renames complete, re-run — the .merging directories will be +# overwritten and the originals are still intact. If interrupted after the +# renames, the .old directories are left behind; delete them manually once +# satisfied with the output. +# +# To add new months without merging, use add_months.sh. +# To rebuild everything from raw dumps, use build_from_scratch.sh. +# +# NOTE: This script and its workflow are written but not yet tested. +# Remove this notice after a successful end-to-end run. +# +# Every command below is independently runnable for debugging. + +set -e +cd "$(dirname "$0")" + +# merge and collapse comments layers +start_spark_and_run.sh 1 comments_merge.py + +# merge and collapse submissions layers +start_spark_and_run.sh 1 submissions_merge.py diff --git a/datasets/submissions_merge.py b/datasets/submissions_merge.py new file mode 100644 index 0000000..3536ca5 --- /dev/null +++ b/datasets/submissions_merge.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Collapse all layers in the submissions final datasets into a single clean layer. + +Must be launched from a login node via the Hyak-provided wrapper: + start_spark_and_run.sh 1 submissions_merge.py + +See merge_layers.sh and dumps_helper.merge_layers for details. +""" + +from dumps_helper import SUBMISSIONS, merge_layers + + +if __name__ == "__main__": + merge_layers(SUBMISSIONS) diff --git a/datasets/submissions_part2.py b/datasets/submissions_part2.py index 55c9be8..7a45bb9 100755 --- a/datasets/submissions_part2.py +++ b/datasets/submissions_part2.py @@ -1,14 +1,18 @@ #!/usr/bin/env python3 -"""Part 2 for submissions: Spark sort + repartition the per-source parquets -produced by submissions_part1.py into the final by_subreddit / by_author -datasets. +"""Part 2 for submissions: Spark sort + repartition into the final datasets. -Launched via the Hyak-provided start_spark_and_run.sh wrapper: +Must be launched from a login node via the Hyak-provided wrapper: start_spark_and_run.sh 1 submissions_part2.py + start_spark_and_run.sh 1 submissions_part2.py --indir=/path/to/parquets --mode=append + +--indir defaults to the temp submissions dir in dumps_helper.py. +--mode defaults to 'overwrite'; use 'append' to add a new layer without +touching existing partition files (see add_months.sh). """ +import fire from dumps_helper import SUBMISSIONS, sort_and_write if __name__ == "__main__": - sort_and_write(SUBMISSIONS) + fire.Fire(lambda indir=None, mode='overwrite': sort_and_write(SUBMISSIONS, indir=indir, mode=mode))