18
0
Files
cdsc_reddit/datasets/README.md
Benjamin Mako Hill 2390d2d10c datasets/README: fix stale add_new_month references
After the rename to add_months.sh and addition of merge_layers.sh /
*_merge.py, the Hyak walkthrough section still pointed at the old script
names. Update the Step 2 inventory and the "for incremental updates"
aside to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:24:38 -07:00

382 lines
16 KiB
Markdown

# Reddit dumps → sorted parquet datasets
This directory holds the pipeline that turns compressed Reddit dump files
(`RC_YYYY-MM.zst` for comments, `RS_YYYY-MM.zst` for submissions) into the
sorted, repartitioned parquet datasets that the rest of the project
consumes.
## Pipeline overview
The raw dumps are huge compressed json files with a lot of metadata that
we may not need. They aren't indexed so it's expensive to pull data from
just a handful of subreddits. It also turns out that it's a pain to read
these compressed files straight into spark. Extracting useful variables
from the dumps and building parquet datasets makes them easier to work
with. This happens in two steps:
1. Extracting json into (temporary, unpartitioned) parquet files using
pyarrow.
2. Repartitioning and sorting the data using pyspark.
Breaking this down into two steps is useful because it allows us to
decompress and parse the dumps in the backfill queue and then sort them
in spark. Partitioning the data makes it possible to efficiently read
data for specific subreddits or authors. Sorting it means that you can
efficiently compute aggregations at the subreddit or user level. More
documentation on using these files is available on the [CDSC wiki][hyak-datasets].
The final datasets are in `/gscratch/comdata/output`:
- `reddit_comments_by_author.parquet` has comments partitioned and sorted
by username (lowercase).
- `reddit_comments_by_subreddit.parquet` has comments partitioned and
sorted by subreddit name (lowercase).
- `reddit_submissions_by_author.parquet` has submissions partitioned and
sorted by username (lowercase).
- `reddit_submissions_by_subreddit.parquet` has submissions partitioned
and sorted by subreddit name (lowercase).
[hyak-datasets]: https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets
## Scripts
| Script | Role |
|---|---|
| `comments_part1.py`, `submissions_part1.py` | Part 1 entry points. Each parses one compressed dump into one parquet file. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
| `comments_part2.py`, `submissions_part2.py` | Part 2 entry points. Each is a Spark job that reads a directory of per-source parquets and writes the final `*_by_subreddit.parquet` and `*_by_author.parquet` datasets. Accepts `--indir` and `--mode` to support layered appends; defaults match the build-from-scratch workflow. |
| `comments_merge.py`, `submissions_merge.py` | Merge entry points. Each is a Spark job that collapses all accumulated layers in the final datasets into a single clean layer. Launched via `start_spark_and_run.sh`. |
| `dumps_helper.py` | Shared module. Schemas, the simdjson parser, a generic parse loop with per-field handler dispatch, and the `parse_dump` / `gen_task_list` / `sort_and_write` / `merge_layers` workers that the entry-point scripts wrap. Adding a new dump type or a new field is a one-place edit. |
| `helper.py` | Lower-level helpers for opening compressed dump files (`.zst`, `.xz`, `.bz2`, `.gz`). |
## The three workflows
### Build from scratch — `build_from_scratch.sh`
Use this when there is no existing parquet output, or when the upstream
data has changed in a way that requires reparsing everything. Wipes the
per-source temp directories, processes every `RC_*` / `RS_*` dump in the
raw dumps directory through Part 1 (in parallel via GNU parallel), then
runs the Part 2 Spark sort.
### Add new months — `add_months.sh YYYY-MM [YYYY-MM ...]`
> **NOTE: written but not yet tested. Remove this notice after a
> successful end-to-end run.**
Use this for routine incremental updates. Runs Part 1 on only the
specified months, then appends the sorted output as a new layer of
partition files alongside the existing ones. No existing data is
rewritten.
Each run adds one layer to each final dataset directory. Spark and DuckDB
read all layers together correctly. At a yearly update cadence the number
of layers stays small; use `merge_layers.sh` to collapse them when
needed.
#### Environment setup
The Python environment runs inside a Singularity container. Set `PYTHON`
to the full path of the venv interpreter so that `parallel` jobs use the
right Python (fresh shells spawned by `parallel` don't inherit the active
venv):
```sh
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3
```
The `.zst` decompression uses the `zstandard` Python library rather than
the system `zstd` binary, which is inaccessible from inside the container.
#### Dump directory
The new `.zst` dump files must be accessible at `COMMENTS_DUMPDIR` and
`SUBMISSIONS_DUMPDIR`. Override the defaults (which match `dumps_helper.py`)
via environment variables if the files are not in the standard locations:
```sh
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
```
#### Running as a Slurm job
The recommended way to run `add_months.sh` is via `srun` on a fat
`cpu-g2` node. Using `srun` (rather than `salloc`) means the node is
released automatically as soon as the script finishes, regardless of the
walltime. Run from a login node inside a `tmux` session so the terminal
survives disconnections:
```sh
tmux new -s add_months
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /mmfs1/gscratch/comdata/users/makohill/cdsc_reddit && \
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3 \
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
./datasets/add_months.sh --clean 2025-01 2025-02 ... YYYY-MM
" 2>&1 | tee /gscratch/comdata/users/makohill/add_months_run.log
```
The `bash -l` flag sources `.bashrc` on the compute node so the Spark
environment is available. The `tee` command writes output to both the
terminal and a log file so you can review it later.
Detach from tmux with `Ctrl-b d` and reattach with `tmux attach -t add_months`.
For a multi-node Spark cluster instead, use `add_months_multinode.sh`
from a login node — it takes the number of nodes as its first argument.
### Merge layers — `merge_layers.sh`
> **NOTE: written but not yet tested. Remove this notice after a
> successful end-to-end run.**
Use this to collapse accumulated layers from incremental adds into a
single clean layer. Reads the existing final datasets, re-sorts
everything, writes to `.merging` temp paths, then atomically replaces the
originals via rename.
Run this when query performance has degraded due to many layers, or any
time you want a clean single-file-per-partition layout. The existing
datasets are safe until the rename step completes; see `merge_layers.sh`
for recovery notes if interrupted. As with `add_months.sh`, Part 2 can
run on a single fat node or via `start_spark_and_run.sh`.
## Running steps individually
Both `.sh` runners are written so that every meaningful step is a
separate, self-contained command. If something fails partway through, or
you want to inspect intermediate state, you can copy any single line out
of the runner and execute it standalone. For example:
```sh
# parse one specific file (skipping the rest of the workflow)
python3 comments_part1.py parse_dump RC_2025-03.zst
# override default dump/output paths from the CLI
python3 comments_part1.py parse_dump RC_2025-03.zst \
--dumpdir=/tmp/test --outdir=/tmp/out
# regenerate just the task list
python3 submissions_part1.py gen_task_list
```
The Spark Part 2 step is launched via `start_spark_and_run.sh` (a
Hyak-provided wrapper not included in this repo); see the wiki for the
launch convention.
## Detailed walkthrough: refreshing the data on Hyak
This walkthrough describes the process we went through updating Reddit
data from the PushShift cutoff up to the end of 2024. Adapting it for
newer data should just involve using different academic torrent files
that start from 2025 onwards. For incremental updates, the
`add_months.sh` workflow above is much shorter; this walkthrough is
for the bulk-refresh case.
### Prerequisites
- [Set up Hyak with CDSC lab][hyak-setup] (make sure to update config
and `.bashrc`)
- [Go through the Hyak Getting Started tutorial][hyak-syllabus]
Reddit dumps info (handled by `u/Watchful1` and `u/RaiderBDev`):
- [Watchful1's reddit explanation][watchful1-explainer] (separated by
subreddit), the [dataset not divided by subreddits][watchful1-bulk],
and the [GitHub repo with scripts for analyzing data][watchful1-repo]
- [RaiderBDev monthly dumps][raiderbdev-monthly] and
[RaiderBDev's ArcticShift API][arctic-shift]
- The [2005-06 to 2024-12 academic torrent][academic-torrent] used for
the 2005-2024 refresh
CDSC and Hyak docs:
- [Hyak docs — how to work with modules][hyak-modules]
- [CDSC — how to download Python or R packages][cdsc-pkgs]
- [CDSC — Hyak datasets information][hyak-datasets]
- [CDSC — Hyak Spark information][hyak-spark]
[hyak-setup]: https://wiki.communitydata.science/CommunityData:Hyak#General_Introduction_to_Hyak
[hyak-syllabus]: https://hyak.uw.edu/docs/hyak101/basics/syllabus/
[watchful1-explainer]: https://www.reddit.com/r/pushshift/comments/1itme1k/separate_dump_files_for_the_top_40k_subreddits/
[watchful1-bulk]: https://www.reddit.com/r/pushshift/comments/1i4mlqu/dump_files_from_200506_to_202412/
[watchful1-repo]: https://github.com/Watchful1/PushshiftDumps/tree/master
[raiderbdev-monthly]: https://www.reddit.com/r/pushshift/comments/1ithjd3/subreddits_metadata_rules_and_wikis_202501/
[arctic-shift]: https://github.com/ArthurHeitmann/arctic_shift
[academic-torrent]: https://academictorrents.com/details/1614740ac8c94505e4ecb9d88be8bed7b6afddd4
[hyak-modules]: https://hyak.uw.edu/docs/tools/modules
[cdsc-pkgs]: https://wiki.communitydata.science/CommunityData:Hyak_software_installation#Python_packages
[hyak-spark]: https://wiki.communitydata.science/CommunityData:Hyak_Spark
### Step 1: data download on Nada and Hyak
We downloaded the [2005-2024 academic torrent][academic-torrent] and put
it on Nada (~2 days of downloading). We copied the raw data over to
Hyak's scrubbed directory in a new directory,
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/reddit`, with raw
data sorted into `/comments` or `/submissions`. The `/submissions`
directory shows `RS_20*.zst` files and the `/comments` shows `RC_20*.zst`
files. (There are no earlier zip files, such as `.bz2` or `.xz`, to deal
with.)
### Step 2: clone the repo on Hyak
On Hyak, clone this repo (or `scp` the contents of `datasets/`) into the
working directory next to the raw data, e.g.
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/`. The relevant
code lives entirely in `datasets/`:
- `dumps_helper.py` — shared parsing and Spark logic
- `helper.py` — file-open helpers
- `comments_part1.py`, `submissions_part1.py` — Part 1 entry points
- `comments_part2.py`, `submissions_part2.py` — Part 2 entry points
- `comments_merge.py`, `submissions_merge.py` — merge entry points
- `build_from_scratch.sh`, `add_months.sh`, `merge_layers.sh` — the runner scripts
The Spark wrapper scripts (`start_spark_and_run.sh`,
`start_spark_cluster.sh`, `start_spark_worker.sh`) are not in this repo;
they are part of the CDSC Hyak environment and should already be on
PATH.
### Step 3: smoke-test Part 1 on a single file
Check out `any_machine`. We'll test submissions Part 1 with just one
file:
```sh
python3 submissions_part1.py parse_dump RS_2005-06.zst
```
To verify, go to your output directory and examine the start of the
file:
```sh
python3 -c "import pandas as pd; df = pd.read_parquet('reddit_submissions.parquet'); print(df.head())"
```
You should see columns like `id`, `author`, `subreddit`, and `title`
printed out. Repeat the process with `comments_part1.py`; you should see
columns like `id`, `subreddit`, `link_id`, and `parent_id` printed out.
**Note**: you may have to install relevant libraries before successfully
running the file:
```sh
pip install --user pyarrow simdjson zstandard fire
```
### Step 4: Part 1 — converting `.zst` to `.parquet` files
Now we'll convert all of our `.zst` compressed Reddit data to `.parquet`
files. First, to generate our task list, we'll run
```sh
python3 submissions_part1.py gen_task_list
```
There should be a script, `parse_submissions_task_list`, in the working
directory. Check the script (`less parse_submissions_task_list`); it
should have many lines that look like our earlier test command,
`python3 submissions_part1.py parse_dump RS_2005-06.zst`, but for all of
our `.zst` files. Do the same process with comments to generate
`parse_comments_task_list`.
From a login node, run `tmux` to keep our job running and then
`any_machine` to check out a node to do computational work. We'll run
our tasks (from the task list) in parallel to optimize. Start with
submissions:
```sh
parallel --joblog submissions_joblog.txt --results submissions/logs < parse_submissions_task_list
```
The `--joblog` flag creates a text file where you can see which tasks
completed successfully, and the `--results` flag creates a directory
where each task has its own stderr output to see the specific error
(this is best practice for debugging).
Now we'll monitor the job. Create a new window in tmux (`CTRL+b c`).
We'll ssh into our computational node (`ssh n1234` — you can get the
node name by running `ourjobs`) and run `htop`
([more details on htop][htop-explainer]). You should see that the
machine's CPUs are getting close to 100% usage. If all looks good,
create a new window and repeat the process for comments.
[htop-explainer]: https://codeahoy.com/2017/01/20/hhtop-explained-visually/
Once the job has successfully completed, you'll see that your CPUs are
closer to 0% usage in `htop` and your `submissions_joblog.txt` file
should show an `exitval` of 0 for all commands. Kill your node by
running `scancel 12345678` (the job ID can be found from `ourjobs`).
### Step 5: verify the per-source parquet files
We'll want to verify our `.parquet` files at this point. We compared the
new files' number of columns and rows to the old data: from the
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/output/temp/reddit_comments.parquet`
directory, run
```sh
diff <(../../../report_parquet_filesizes.py *.parquet) <(../../../report_parquet_filesizes.py /gscratch/comdata/output/temp/reddit_comments.parquet/*.parquet)
```
and confirm there are no differences (same process with submissions).
This may or may not be relevant if we continue using the same academic
torrent to update data and have nothing to compare to, but you can still
check that the new data's number of columns and rows are fairly
continuous with the most recent data we already have.
### Step 6: Part 2 — sorting the `.parquet` files by author and subreddit via Spark
If the `.parquet` files reasonably appear to be complete, we can now
sort them by author and subreddit. The most efficient way to do so is via
`srun` on a `cpu-g2` node (128 CPUs, ~1 TB RAM). Using `srun` releases
the node automatically when the job finishes. Run from a login node
inside `tmux`:
```sh
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /path/to/cdsc_reddit/datasets && \
source \$SPARK_CONF_DIR/spark-env.sh && \
start_spark_cluster.sh && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT submissions_part2.py && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT comments_part2.py && \
stop-all.sh
"
```
[hyak-blog]: https://hyak.uw.edu/blog/g1-vs-g2/
Monitor via `htop` (as described in Step 4); the CPUs may not always
show high usage but you should see that memory is being used. Repeat
for the comments. Successful jobs will result in
`/gscratch/comdata/output` having four new directories:
`reddit_submissions_by_author.parquet`,
`reddit_submissions_by_subreddit.parquet`,
`reddit_comments_by_author.parquet`, and
`reddit_comments_by_subreddit.parquet`. Each should contain many
`snappy.parquet` files (e.g.
`part-00799-c8ec5f61-5158-43c7-ae2a-189169e9a86b-c000.snappy.parquet`)
and `_SUCCESS`.
### Step 7: data verification
Verify and make sure the new data is reasonably complete before deleting
any of the old data. Do a simple time series to see how many posts there
are per day and make sure things don't fall off. It is also useful to
have lab members test out anything they're working on again with the
new parquet files.
## See also
The CDSC wiki page
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
is the landing page for this project on the wiki and provides
cross-links to related CDSC and Hyak documentation. The walkthrough
above used to live there; it now lives here so that doc and code stay
in sync.