18
0

7 Commits

Author SHA1 Message Date
0ea57b2377 datasets/add_months.sh: fail on leftover files, add --clean to wipe them
Without --clean, the script now exits with a clear error if temp or
staging directories from a previous run exist. Pass --clean to remove
them automatically before starting. README example updated to include
the flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:10:16 -07:00
6c6e05c360 datasets/README.md: document srun workflow, PYTHON var, container notes
Update the add_months and Step 6 sections with lessons learned from the
first run attempt:
- Replace salloc with srun (releases node automatically on completion)
- Document the PYTHON variable override needed for parallel/venv
- Note that .zst decompression uses the zstandard library due to
  Singularity container restrictions on the system zstd binary
- Add full srun invocation with bash -l, tee logging, and tmux guidance
- Update Step 6 walkthrough to use srun instead of salloc

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:05:45 -07:00
4854d4f537 datasets/add_months.sh: run comments and submissions Part 1 together
Combine task lists and run a single parallel call so all 32 files
(16 comments + 16 submissions) parse simultaneously.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:51:56 -07:00
bf6ccbc84a datasets/helper.py: use zstandard library for .zst decompression
The Python environment runs inside a Singularity container that cannot
exec the host's /usr/bin/zstd via subprocess. Replace the subprocess
call with the zstandard Python library, which was already a dependency.
Other formats (bz2, xz, gz) still use subprocess as before.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:48:04 -07:00
18925dfe5b datasets/: add PYTHON variable to add_months scripts
GNU parallel spawns fresh shells that don't inherit the active venv.
Using an explicit PYTHON path ensures the right interpreter is used in
parallel tasks. Defaults to python3 but can be overridden:

  PYTHON=/path/to/venv/bin/python3 ./add_months.sh ...

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:42:05 -07:00
926e9bc364 datasets/: fat-node add_months.sh; multinode variant as separate script
add_months.sh now targets a single fat node directly: starts a local
Spark cluster via start_spark_cluster.sh, submits jobs, stops the
cluster. No salloc needed.

add_months_multinode.sh is a new script for the multi-node case using
start_spark_and_run.sh from a login node. Usage takes NODES as first arg.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:35:12 -07:00
526dc03732 datasets/add_months.sh: stop before copy step to force manual verification
The script now exits after Part 2 so the copy and cleanup commands must
be run manually. This prevents the live datasets from being touched
without a deliberate verification step in between.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:22:03 -07:00
4 changed files with 232 additions and 63 deletions

View File

@@ -73,6 +73,22 @@ read all layers together correctly. At a yearly update cadence the number
of layers stays small; use `merge_layers.sh` to collapse them when
needed.
#### Environment setup
The Python environment runs inside a Singularity container. Set `PYTHON`
to the full path of the venv interpreter so that `parallel` jobs use the
right Python (fresh shells spawned by `parallel` don't inherit the active
venv):
```sh
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3
```
The `.zst` decompression uses the `zstandard` Python library rather than
the system `zstd` binary, which is inaccessible from inside the container.
#### Dump directory
The new `.zst` dump files must be accessible at `COMMENTS_DUMPDIR` and
`SUBMISSIONS_DUMPDIR`. Override the defaults (which match `dumps_helper.py`)
via environment variables if the files are not in the standard locations:
@@ -80,18 +96,37 @@ via environment variables if the files are not in the standard locations:
```sh
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
./add_months.sh 2025-01 2025-02 2025-03
```
Part 1 runs directly on a compute node. For Part 2 there are two options:
#### Running as a Slurm job
- **Single fat node** (simpler, often faster for smaller sorts): `salloc`
a `cpu-g2` node (128 cores, ~1 TB RAM) and run the Part 2 script
directly with `spark-submit` or `python3`. See Step 6 of the walkthrough
below for the `salloc` invocation.
- **Multi-node Spark cluster**: use `start_spark_and_run.sh` from a login
node. It allocates nodes via `salloc` and handles cluster coordination.
Pass the number of nodes as the first argument.
The recommended way to run `add_months.sh` is via `srun` on a fat
`cpu-g2` node. Using `srun` (rather than `salloc`) means the node is
released automatically as soon as the script finishes, regardless of the
walltime. Run from a login node inside a `tmux` session so the terminal
survives disconnections:
```sh
tmux new -s add_months
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /mmfs1/gscratch/comdata/users/makohill/cdsc_reddit && \
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3 \
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
./datasets/add_months.sh --clean 2025-01 2025-02 ... YYYY-MM
" 2>&1 | tee /gscratch/comdata/users/makohill/add_months_run.log
```
The `bash -l` flag sources `.bashrc` on the compute node so the Spark
environment is available. The `tee` command writes output to both the
terminal and a log file so you can review it later.
Detach from tmux with `Ctrl-b d` and reattach with `tmux attach -t add_months`.
For a multi-node Spark cluster instead, use `add_months_multinode.sh`
from a login node — it takes the number of nodes as its first argument.
### Merge layers — `merge_layers.sh`
@@ -296,25 +331,25 @@ continuous with the most recent data we already have.
### Step 6: Part 2 — sorting the `.parquet` files by author and subreddit via Spark
If the `.parquet` files reasonably appear to be complete, we can now
sort them by author and subreddit. The most efficient way to do so is by
using one node on `cpu-g2` with 128 CPUs and 994G memory. This one node
splits into up to six slices (four in our current case) so the tasks
will still be parallelized (`hyakalloc` or [this Hyak blog][hyak-blog]
are good resources for further information). Run `tmux` on a login
node, then grab the whole node for up to a week with:
sort them by author and subreddit. The most efficient way to do so is via
`srun` on a `cpu-g2` node (128 CPUs, ~1 TB RAM). Using `srun` releases
the node automatically when the job finishes. Run from a login node
inside `tmux`:
```sh
salloc -p cpu-g2 -A comdata --nodes=1 --time=168:00:00 -c 128 --mem=994G
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /path/to/cdsc_reddit/datasets && \
source \$SPARK_CONF_DIR/spark-env.sh && \
start_spark_cluster.sh && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT submissions_part2.py && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT comments_part2.py && \
stop-all.sh
"
```
[hyak-blog]: https://hyak.uw.edu/blog/g1-vs-g2/
Once Slurm drops you onto the compute node, run
```sh
./start_spark_and_run.sh submissions_part2.py
```
Monitor via `htop` (as described in Step 4); the CPUs may not always
show high usage but you should see that memory is being used. Repeat
for the comments. Successful jobs will result in

View File

@@ -1,15 +1,19 @@
#!/usr/bin/env bash
#
# Add one or more new months to the existing parquet datasets using a
# layered append. The live datasets are never touched until the final
# copy step, so they remain safe and queryable throughout.
# layered append. Designed to run on a single fat node (e.g. cpu-g2 with
# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see
# add_months_multinode.sh.
#
# Usage:
# add_months.sh YYYY-MM [YYYY-MM ...]
# add_months.sh [--clean] YYYY-MM [YYYY-MM ...]
#
# Example:
# add_months.sh 2025-01 2025-02 2025-03
#
# If temp or staging directories from a previous run exist, the script
# will exit with an error. Pass --clean to wipe them before starting:
#
# The new .zst dump files must live at:
# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst
# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst
@@ -22,29 +26,34 @@
# ./add_months.sh 2025-01 2025-02
#
# Workflow:
# Part 1 — parse new .zst files into per-month parquets (compute node)
# Part 2 — sort into staging directories, not the live datasets (fat node)
# Verify — inspect staging before committing (manual step, not scripted)
# Part 1 — parse new .zst files into per-month parquets (parallel)
# Part 2 — sort into staging directories, not the live datasets (Spark)
# [script exits here — verify staging before continuing]
# Copy — move staging files into live datasets (run manually after verify)
# Cleanup — remove temp and staging dirs (run manually after copy)
#
# Every command below is independently runnable for debugging. The copy
# and cleanup steps are intentionally left as separate commands so you can
# verify the staging output before touching the live datasets.
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
#
# Every command below is independently runnable for debugging.
set -e
cd "$(dirname "$0")"
CLEAN=0
if [ "${1:-}" = "--clean" ]; then
CLEAN=1
shift
fi
if [ $# -eq 0 ]; then
echo "Usage: $0 YYYY-MM [YYYY-MM ...]" >&2
echo "Usage: $0 [--clean] YYYY-MM [YYYY-MM ...]" >&2
exit 1
fi
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
PYTHON="${PYTHON:-python3}"
# Part 1 temp dirs (per-month parquets, parsed from .zst)
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
@@ -62,39 +71,64 @@ LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
# --- Part 1: parse new months in parallel (run on a compute node) -----------
# --- Check for leftover output from a previous run --------------------------
printf "python3 comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
> add_months_comments_tasks.txt
EXISTING=()
for d in "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" \
"$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" \
"$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"; do
[ -e "$d" ] && EXISTING+=("$d")
done
printf "python3 submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
> add_months_submissions_tasks.txt
if [ ${#EXISTING[@]} -gt 0 ]; then
if [ $CLEAN -eq 1 ]; then
echo "Removing leftover files from previous run..."
rm -rf "${EXISTING[@]}"
rm -f add_months_tasks.txt add_months_joblog.txt
rm -rf add_months_logs/
else
echo "Error: leftover files from a previous run exist:" >&2
printf ' %s\n' "${EXISTING[@]}" >&2
echo "Re-run with --clean to remove them before starting." >&2
exit 1
fi
fi
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
< add_months_comments_tasks.txt
# --- Part 1: parse new months in parallel (comments and submissions together) -
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
< add_months_submissions_tasks.txt
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
> add_months_tasks.txt
# --- Part 2: sort new months into staging (not the live datasets) -----------
#
# start_spark_and_run.sh calls salloc — run from a login node, or replace
# with start_spark_cluster.sh + spark-submit if already on a suitable node.
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
>> add_months_tasks.txt
start_spark_and_run.sh 1 comments_part2.py \
parallel --joblog add_months_joblog.txt --results add_months_logs \
< add_months_tasks.txt
# --- Part 2: sort new months into staging (Spark, single fat node) ----------
source "$SPARK_CONF_DIR/spark-env.sh"
start_spark_cluster.sh
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
comments_part2.py \
--indir="$TEMP_COMMENTS" \
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
--out_by_author="$STAGING_COMMENTS_AUTH"
start_spark_and_run.sh 1 submissions_part2.py \
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
submissions_part2.py \
--indir="$TEMP_SUBMISSIONS" \
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
stop-all.sh
# --- Verify: inspect staging before copying to live -------------------------
#
# Stop here and check that the staging output looks right before running
# the copy step. The live datasets are untouched at this point. Example:
# The script stops here. Check the staging output looks right before running
# the copy step manually. The live datasets are untouched at this point.
# Example checks:
#
# ls -lah "$STAGING_COMMENTS_SUB" | head
# python3 -c "
@@ -104,6 +138,8 @@ start_spark_and_run.sh 1 submissions_part2.py \
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
# "
exit 0
# --- Copy: add staging files into live datasets -----------------------------
#
# Run these lines manually after verifying staging. This is the only step
@@ -119,6 +155,8 @@ find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSI
#
# Run after confirming the copy succeeded and the live datasets look right.
rm -f add_months_tasks.txt add_months_joblog.txt
rm -rf add_months_logs/
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env bash
#
# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to
# allocate a Spark cluster across multiple nodes via salloc. Run this
# from a login node.
#
# For the common single-fat-node case, use add_months.sh instead.
#
# Usage:
# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...]
#
# Example (2 nodes, 3 months):
# add_months_multinode.sh 2 2025-01 2025-02 2025-03
#
# Override dump directories via environment variables if needed:
#
# COMMENTS_DUMPDIR=/path/to/new/comments \
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
# ./add_months_multinode.sh 2 2025-01 2025-02
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
set -e
cd "$(dirname "$0")"
NODES="${1:-}"
if [ -z "$NODES" ] || [ $# -lt 2 ]; then
echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2
exit 1
fi
shift
MONTHS=("$@")
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
PYTHON="${PYTHON:-python3}"
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
# --- Part 1: parse new months in parallel -----------------------------------
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \
> add_months_comments_tasks.txt
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \
> add_months_submissions_tasks.txt
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
< add_months_comments_tasks.txt
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
< add_months_submissions_tasks.txt
# --- Part 2: sort new months into staging (multi-node Spark cluster) --------
start_spark_and_run.sh "$NODES" comments_part2.py \
--indir="$TEMP_COMMENTS" \
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
--out_by_author="$STAGING_COMMENTS_AUTH"
start_spark_and_run.sh "$NODES" submissions_part2.py \
--indir="$TEMP_SUBMISSIONS" \
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
# --- Verify staging, then copy and cleanup manually -------------------------
#
# See add_months.sh for verify/copy/cleanup commands — they are identical.
exit 0
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"

View File

@@ -3,6 +3,9 @@ import re
from collections import defaultdict
from os import path
import glob
import io
import zstandard
def find_dumps(dumpdir, base_pattern):
@@ -28,24 +31,28 @@ def open_fileset(files):
yield line
def open_input_file(input_filename):
# .zst handled via the zstandard library to avoid subprocess/container issues
if re.match(r'.*\.zst$', input_filename):
fh = open(input_filename, 'rb')
dctx = zstandard.ZstdDecompressor()
return io.TextIOWrapper(dctx.stream_reader(fh), encoding='utf-8')
if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename]
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.bz', input_filename):
cmd = ["bzcat", "-dk", input_filename]
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename]
elif re.match(r'.*\.gz',input_filename):
cmd = ['gzip','-dc', input_filename]
cmd = ["xzcat", '-dk', '-T 20', input_filename]
elif re.match(r'.*\.gz', input_filename):
cmd = ["zcat", input_filename]
else:
return open(input_filename, 'r')
try:
input_file = Popen(cmd, stdout=PIPE).stdout
return Popen(cmd, stdout=PIPE).stdout
except NameError as e:
print(e)
input_file = open(input_filename, 'r')
return input_file
return open(input_filename, 'r')