18
0

11 Commits

Author SHA1 Message Date
2390d2d10c datasets/README: fix stale add_new_month references
After the rename to add_months.sh and addition of merge_layers.sh /
*_merge.py, the Hyak walkthrough section still pointed at the old script
names. Update the Step 2 inventory and the "for incremental updates"
aside to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:24:38 -07:00
0ea57b2377 datasets/add_months.sh: fail on leftover files, add --clean to wipe them
Without --clean, the script now exits with a clear error if temp or
staging directories from a previous run exist. Pass --clean to remove
them automatically before starting. README example updated to include
the flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:10:16 -07:00
6c6e05c360 datasets/README.md: document srun workflow, PYTHON var, container notes
Update the add_months and Step 6 sections with lessons learned from the
first run attempt:
- Replace salloc with srun (releases node automatically on completion)
- Document the PYTHON variable override needed for parallel/venv
- Note that .zst decompression uses the zstandard library due to
  Singularity container restrictions on the system zstd binary
- Add full srun invocation with bash -l, tee logging, and tmux guidance
- Update Step 6 walkthrough to use srun instead of salloc

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 19:05:45 -07:00
4854d4f537 datasets/add_months.sh: run comments and submissions Part 1 together
Combine task lists and run a single parallel call so all 32 files
(16 comments + 16 submissions) parse simultaneously.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:51:56 -07:00
bf6ccbc84a datasets/helper.py: use zstandard library for .zst decompression
The Python environment runs inside a Singularity container that cannot
exec the host's /usr/bin/zstd via subprocess. Replace the subprocess
call with the zstandard Python library, which was already a dependency.
Other formats (bz2, xz, gz) still use subprocess as before.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:48:04 -07:00
18925dfe5b datasets/: add PYTHON variable to add_months scripts
GNU parallel spawns fresh shells that don't inherit the active venv.
Using an explicit PYTHON path ensures the right interpreter is used in
parallel tasks. Defaults to python3 but can be overridden:

  PYTHON=/path/to/venv/bin/python3 ./add_months.sh ...

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:42:05 -07:00
926e9bc364 datasets/: fat-node add_months.sh; multinode variant as separate script
add_months.sh now targets a single fat node directly: starts a local
Spark cluster via start_spark_cluster.sh, submits jobs, stops the
cluster. No salloc needed.

add_months_multinode.sh is a new script for the multi-node case using
start_spark_and_run.sh from a login node. Usage takes NODES as first arg.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:35:12 -07:00
526dc03732 datasets/add_months.sh: stop before copy step to force manual verification
The script now exits after Part 2 so the copy and cleanup commands must
be run manually. This prevents the live datasets from being touched
without a deliberate verification step in between.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:22:03 -07:00
6b18840604 datasets/: stage new layer before touching live datasets in add_months
Replace mode='append'-direct-to-live approach with a safer staging
workflow: Part 2 writes the new sorted layer to temp staging directories,
the user verifies, then a separate copy step adds the files to the live
datasets. Live datasets are never touched until the copy step, and the
copy only adds files — nothing is deleted or overwritten.

- sort_and_write gains out_by_subreddit/out_by_author params (replaces
  mode param) so Part 2 can target staging paths
- comments_part2.py, submissions_part2.py: expose new params via Fire
- add_months.sh: rewritten with explicit staging dirs, verify checkpoint,
  and find-based copy step

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 18:17:38 -07:00
2d1d760142 datasets/: replace add_new_month with layered append workflow
Add add_months.sh and merge_layers.sh implementing a layered append
strategy for incremental dataset updates. Each incremental run appends
new sorted partition files alongside existing ones rather than re-sorting
the full corpus, which is prohibitively slow at this dataset scale.

- dumps_helper.py: sort_and_write gains indir/mode params; new
  merge_layers function collapses accumulated layers via atomic rename
- comments_part2.py, submissions_part2.py: expose --indir/--mode via Fire
- add_months.sh: new layered append script (not yet tested)
- merge_layers.sh: new layer collapse script (not yet tested)
- comments_merge.py, submissions_merge.py: Spark entry points for merge
- add_new_month.sh: deleted (full re-sort each add is redundant with
  build_from_scratch at corpus scale)
- README.md: document three workflows; flag untested sections

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 17:59:36 -07:00
1851132a06 move dataset + similarity docs from wiki into repo READMEs
The wiki page CommunityData:CDSC Reddit had a detailed Hyak walkthrough
(Steps 1-7) for refreshing the parquet datasets and a long TF-IDF methods
section, both of which duplicated or risked drifting from the actual code.
Move both into the repo so they stay in sync with the scripts they
describe:

- datasets/README.md: expand with the wiki's "Building Parquet Datasets"
  prose and the Step 1-7 Hyak walkthrough (ported verbatim where possible,
  adapted to the new script names and dropping obsolete notes about
  pull_pushshift_*.sh / check_*_shas.py).
- similarities/README.md (new): port the wiki's Subreddit Similarity
  section — TF-IDF math, PMI phrase detection, cosine similarity — with
  MediaWiki math converted to markdown LaTeX and script references
  updated to current paths.

The wiki page has been trimmed to a landing page that points at these
README files in gitea.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 17:20:21 -07:00
13 changed files with 935 additions and 115 deletions

View File

@@ -5,52 +5,151 @@ This directory holds the pipeline that turns compressed Reddit dump files
sorted, repartitioned parquet datasets that the rest of the project sorted, repartitioned parquet datasets that the rest of the project
consumes. consumes.
The pipeline has two stages: ## Pipeline overview
| Stage | What it does | The raw dumps are huge compressed json files with a lot of metadata that
we may not need. They aren't indexed so it's expensive to pull data from
just a handful of subreddits. It also turns out that it's a pain to read
these compressed files straight into spark. Extracting useful variables
from the dumps and building parquet datasets makes them easier to work
with. This happens in two steps:
1. Extracting json into (temporary, unpartitioned) parquet files using
pyarrow.
2. Repartitioning and sorting the data using pyspark.
Breaking this down into two steps is useful because it allows us to
decompress and parse the dumps in the backfill queue and then sort them
in spark. Partitioning the data makes it possible to efficiently read
data for specific subreddits or authors. Sorting it means that you can
efficiently compute aggregations at the subreddit or user level. More
documentation on using these files is available on the [CDSC wiki][hyak-datasets].
The final datasets are in `/gscratch/comdata/output`:
- `reddit_comments_by_author.parquet` has comments partitioned and sorted
by username (lowercase).
- `reddit_comments_by_subreddit.parquet` has comments partitioned and
sorted by subreddit name (lowercase).
- `reddit_submissions_by_author.parquet` has submissions partitioned and
sorted by username (lowercase).
- `reddit_submissions_by_subreddit.parquet` has submissions partitioned
and sorted by subreddit name (lowercase).
[hyak-datasets]: https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets
## Scripts
| Script | Role |
|---|---| |---|---|
| Part 1 | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. | | `comments_part1.py`, `submissions_part1.py` | Part 1 entry points. Each parses one compressed dump into one parquet file. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
| Part 2 | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. | | `comments_part2.py`, `submissions_part2.py` | Part 2 entry points. Each is a Spark job that reads a directory of per-source parquets and writes the final `*_by_subreddit.parquet` and `*_by_author.parquet` datasets. Accepts `--indir` and `--mode` to support layered appends; defaults match the build-from-scratch workflow. |
| `comments_merge.py`, `submissions_merge.py` | Merge entry points. Each is a Spark job that collapses all accumulated layers in the final datasets into a single clean layer. Launched via `start_spark_and_run.sh`. |
| `dumps_helper.py` | Shared module. Schemas, the simdjson parser, a generic parse loop with per-field handler dispatch, and the `parse_dump` / `gen_task_list` / `sort_and_write` / `merge_layers` workers that the entry-point scripts wrap. Adding a new dump type or a new field is a one-place edit. |
| `helper.py` | Lower-level helpers for opening compressed dump files (`.zst`, `.xz`, `.bz2`, `.gz`). |
Each stage has a thin entry-point script per dump type: ## The three workflows
| Script | Notes |
|---|---|
| `comments_part1.py`, `submissions_part1.py` | Per-file parse. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
| `comments_part2.py`, `submissions_part2.py` | Spark sort. Launched via `start_spark_and_run.sh`. |
| `dumps_helper.py` | Shared module: schemas, simdjson parser, generic parse loop, parse_dump / gen_task_list / sort_and_write workers. The only per-type code is the two field-handler dicts and the configuration dicts at the top. |
## The two workflows
There are two ways to run the pipeline; pick the one that matches your
situation.
### Build from scratch — `build_from_scratch.sh` ### Build from scratch — `build_from_scratch.sh`
Use this when there is no existing parquet output, or when the upstream Use this when there is no existing parquet output, or when the upstream
data has changed in a way that requires reparsing everything. Wipes the data has changed in a way that requires reparsing everything. Wipes the
per-source temp directories, processes every `RC_*` / `RS_*` dump in the per-source temp directories, processes every `RC_*` / `RS_*` dump in the
raw dumps directory through Part 1, then runs the Part 2 Spark sort. raw dumps directory through Part 1 (in parallel via GNU parallel), then
runs the Part 2 Spark sort.
### Add a new month — `add_new_month.sh YYYY-MM` ### Add new months — `add_months.sh YYYY-MM [YYYY-MM ...]`
Use this when one or more months of new dump files have arrived and you > **NOTE: written but not yet tested. Remove this notice after a
just want to bring the existing datasets up to date. Processes only the > successful end-to-end run.**
specified month's `RC_<MONTH>.zst` and `RS_<MONTH>.zst` files through
Part 1 (the existing per-source parquet files are left in place), then
re-runs the Part 2 Spark sort over the full temp directory so the final
datasets pick up the new data.
The Part 2 sort is global and not incremental, so each monthly add Use this for routine incremental updates. Runs Part 1 on only the
re-sorts the entire corpus. That's fine for a monthly cadence; it would specified months, then appends the sorted output as a new layer of
need a rearchitecture if the cost became a problem. partition files alongside the existing ones. No existing data is
rewritten.
Each run adds one layer to each final dataset directory. Spark and DuckDB
read all layers together correctly. At a yearly update cadence the number
of layers stays small; use `merge_layers.sh` to collapse them when
needed.
#### Environment setup
The Python environment runs inside a Singularity container. Set `PYTHON`
to the full path of the venv interpreter so that `parallel` jobs use the
right Python (fresh shells spawned by `parallel` don't inherit the active
venv):
```sh
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3
```
The `.zst` decompression uses the `zstandard` Python library rather than
the system `zstd` binary, which is inaccessible from inside the container.
#### Dump directory
The new `.zst` dump files must be accessible at `COMMENTS_DUMPDIR` and
`SUBMISSIONS_DUMPDIR`. Override the defaults (which match `dumps_helper.py`)
via environment variables if the files are not in the standard locations:
```sh
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
```
#### Running as a Slurm job
The recommended way to run `add_months.sh` is via `srun` on a fat
`cpu-g2` node. Using `srun` (rather than `salloc`) means the node is
released automatically as soon as the script finishes, regardless of the
walltime. Run from a login node inside a `tmux` session so the terminal
survives disconnections:
```sh
tmux new -s add_months
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /mmfs1/gscratch/comdata/users/makohill/cdsc_reddit && \
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3 \
COMMENTS_DUMPDIR=/path/to/new/comments \
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
./datasets/add_months.sh --clean 2025-01 2025-02 ... YYYY-MM
" 2>&1 | tee /gscratch/comdata/users/makohill/add_months_run.log
```
The `bash -l` flag sources `.bashrc` on the compute node so the Spark
environment is available. The `tee` command writes output to both the
terminal and a log file so you can review it later.
Detach from tmux with `Ctrl-b d` and reattach with `tmux attach -t add_months`.
For a multi-node Spark cluster instead, use `add_months_multinode.sh`
from a login node — it takes the number of nodes as its first argument.
### Merge layers — `merge_layers.sh`
> **NOTE: written but not yet tested. Remove this notice after a
> successful end-to-end run.**
Use this to collapse accumulated layers from incremental adds into a
single clean layer. Reads the existing final datasets, re-sorts
everything, writes to `.merging` temp paths, then atomically replaces the
originals via rename.
Run this when query performance has degraded due to many layers, or any
time you want a clean single-file-per-partition layout. The existing
datasets are safe until the rename step completes; see `merge_layers.sh`
for recovery notes if interrupted. As with `add_months.sh`, Part 2 can
run on a single fat node or via `start_spark_and_run.sh`.
## Running steps individually ## Running steps individually
Both `.sh` runners are written so that every meaningful step is a separate, Both `.sh` runners are written so that every meaningful step is a
self-contained command. If something fails partway through, or you want separate, self-contained command. If something fails partway through, or
to inspect intermediate state, you can copy any single line out of the you want to inspect intermediate state, you can copy any single line out
runner and execute it standalone. For example: of the runner and execute it standalone. For example:
```sh ```sh
# parse one specific file (skipping the rest of the workflow) # parse one specific file (skipping the rest of the workflow)
@@ -68,10 +167,215 @@ The Spark Part 2 step is launched via `start_spark_and_run.sh` (a
Hyak-provided wrapper not included in this repo); see the wiki for the Hyak-provided wrapper not included in this repo); see the wiki for the
launch convention. launch convention.
## Detailed walkthrough: refreshing the data on Hyak
This walkthrough describes the process we went through updating Reddit
data from the PushShift cutoff up to the end of 2024. Adapting it for
newer data should just involve using different academic torrent files
that start from 2025 onwards. For incremental updates, the
`add_months.sh` workflow above is much shorter; this walkthrough is
for the bulk-refresh case.
### Prerequisites
- [Set up Hyak with CDSC lab][hyak-setup] (make sure to update config
and `.bashrc`)
- [Go through the Hyak Getting Started tutorial][hyak-syllabus]
Reddit dumps info (handled by `u/Watchful1` and `u/RaiderBDev`):
- [Watchful1's reddit explanation][watchful1-explainer] (separated by
subreddit), the [dataset not divided by subreddits][watchful1-bulk],
and the [GitHub repo with scripts for analyzing data][watchful1-repo]
- [RaiderBDev monthly dumps][raiderbdev-monthly] and
[RaiderBDev's ArcticShift API][arctic-shift]
- The [2005-06 to 2024-12 academic torrent][academic-torrent] used for
the 2005-2024 refresh
CDSC and Hyak docs:
- [Hyak docs — how to work with modules][hyak-modules]
- [CDSC — how to download Python or R packages][cdsc-pkgs]
- [CDSC — Hyak datasets information][hyak-datasets]
- [CDSC — Hyak Spark information][hyak-spark]
[hyak-setup]: https://wiki.communitydata.science/CommunityData:Hyak#General_Introduction_to_Hyak
[hyak-syllabus]: https://hyak.uw.edu/docs/hyak101/basics/syllabus/
[watchful1-explainer]: https://www.reddit.com/r/pushshift/comments/1itme1k/separate_dump_files_for_the_top_40k_subreddits/
[watchful1-bulk]: https://www.reddit.com/r/pushshift/comments/1i4mlqu/dump_files_from_200506_to_202412/
[watchful1-repo]: https://github.com/Watchful1/PushshiftDumps/tree/master
[raiderbdev-monthly]: https://www.reddit.com/r/pushshift/comments/1ithjd3/subreddits_metadata_rules_and_wikis_202501/
[arctic-shift]: https://github.com/ArthurHeitmann/arctic_shift
[academic-torrent]: https://academictorrents.com/details/1614740ac8c94505e4ecb9d88be8bed7b6afddd4
[hyak-modules]: https://hyak.uw.edu/docs/tools/modules
[cdsc-pkgs]: https://wiki.communitydata.science/CommunityData:Hyak_software_installation#Python_packages
[hyak-spark]: https://wiki.communitydata.science/CommunityData:Hyak_Spark
### Step 1: data download on Nada and Hyak
We downloaded the [2005-2024 academic torrent][academic-torrent] and put
it on Nada (~2 days of downloading). We copied the raw data over to
Hyak's scrubbed directory in a new directory,
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/reddit`, with raw
data sorted into `/comments` or `/submissions`. The `/submissions`
directory shows `RS_20*.zst` files and the `/comments` shows `RC_20*.zst`
files. (There are no earlier zip files, such as `.bz2` or `.xz`, to deal
with.)
### Step 2: clone the repo on Hyak
On Hyak, clone this repo (or `scp` the contents of `datasets/`) into the
working directory next to the raw data, e.g.
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/`. The relevant
code lives entirely in `datasets/`:
- `dumps_helper.py` — shared parsing and Spark logic
- `helper.py` — file-open helpers
- `comments_part1.py`, `submissions_part1.py` — Part 1 entry points
- `comments_part2.py`, `submissions_part2.py` — Part 2 entry points
- `comments_merge.py`, `submissions_merge.py` — merge entry points
- `build_from_scratch.sh`, `add_months.sh`, `merge_layers.sh` — the runner scripts
The Spark wrapper scripts (`start_spark_and_run.sh`,
`start_spark_cluster.sh`, `start_spark_worker.sh`) are not in this repo;
they are part of the CDSC Hyak environment and should already be on
PATH.
### Step 3: smoke-test Part 1 on a single file
Check out `any_machine`. We'll test submissions Part 1 with just one
file:
```sh
python3 submissions_part1.py parse_dump RS_2005-06.zst
```
To verify, go to your output directory and examine the start of the
file:
```sh
python3 -c "import pandas as pd; df = pd.read_parquet('reddit_submissions.parquet'); print(df.head())"
```
You should see columns like `id`, `author`, `subreddit`, and `title`
printed out. Repeat the process with `comments_part1.py`; you should see
columns like `id`, `subreddit`, `link_id`, and `parent_id` printed out.
**Note**: you may have to install relevant libraries before successfully
running the file:
```sh
pip install --user pyarrow simdjson zstandard fire
```
### Step 4: Part 1 — converting `.zst` to `.parquet` files
Now we'll convert all of our `.zst` compressed Reddit data to `.parquet`
files. First, to generate our task list, we'll run
```sh
python3 submissions_part1.py gen_task_list
```
There should be a script, `parse_submissions_task_list`, in the working
directory. Check the script (`less parse_submissions_task_list`); it
should have many lines that look like our earlier test command,
`python3 submissions_part1.py parse_dump RS_2005-06.zst`, but for all of
our `.zst` files. Do the same process with comments to generate
`parse_comments_task_list`.
From a login node, run `tmux` to keep our job running and then
`any_machine` to check out a node to do computational work. We'll run
our tasks (from the task list) in parallel to optimize. Start with
submissions:
```sh
parallel --joblog submissions_joblog.txt --results submissions/logs < parse_submissions_task_list
```
The `--joblog` flag creates a text file where you can see which tasks
completed successfully, and the `--results` flag creates a directory
where each task has its own stderr output to see the specific error
(this is best practice for debugging).
Now we'll monitor the job. Create a new window in tmux (`CTRL+b c`).
We'll ssh into our computational node (`ssh n1234` — you can get the
node name by running `ourjobs`) and run `htop`
([more details on htop][htop-explainer]). You should see that the
machine's CPUs are getting close to 100% usage. If all looks good,
create a new window and repeat the process for comments.
[htop-explainer]: https://codeahoy.com/2017/01/20/hhtop-explained-visually/
Once the job has successfully completed, you'll see that your CPUs are
closer to 0% usage in `htop` and your `submissions_joblog.txt` file
should show an `exitval` of 0 for all commands. Kill your node by
running `scancel 12345678` (the job ID can be found from `ourjobs`).
### Step 5: verify the per-source parquet files
We'll want to verify our `.parquet` files at this point. We compared the
new files' number of columns and rows to the old data: from the
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/output/temp/reddit_comments.parquet`
directory, run
```sh
diff <(../../../report_parquet_filesizes.py *.parquet) <(../../../report_parquet_filesizes.py /gscratch/comdata/output/temp/reddit_comments.parquet/*.parquet)
```
and confirm there are no differences (same process with submissions).
This may or may not be relevant if we continue using the same academic
torrent to update data and have nothing to compare to, but you can still
check that the new data's number of columns and rows are fairly
continuous with the most recent data we already have.
### Step 6: Part 2 — sorting the `.parquet` files by author and subreddit via Spark
If the `.parquet` files reasonably appear to be complete, we can now
sort them by author and subreddit. The most efficient way to do so is via
`srun` on a `cpu-g2` node (128 CPUs, ~1 TB RAM). Using `srun` releases
the node automatically when the job finishes. Run from a login node
inside `tmux`:
```sh
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
bash -l -c "
cd /path/to/cdsc_reddit/datasets && \
source \$SPARK_CONF_DIR/spark-env.sh && \
start_spark_cluster.sh && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT submissions_part2.py && \
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT comments_part2.py && \
stop-all.sh
"
```
[hyak-blog]: https://hyak.uw.edu/blog/g1-vs-g2/
Monitor via `htop` (as described in Step 4); the CPUs may not always
show high usage but you should see that memory is being used. Repeat
for the comments. Successful jobs will result in
`/gscratch/comdata/output` having four new directories:
`reddit_submissions_by_author.parquet`,
`reddit_submissions_by_subreddit.parquet`,
`reddit_comments_by_author.parquet`, and
`reddit_comments_by_subreddit.parquet`. Each should contain many
`snappy.parquet` files (e.g.
`part-00799-c8ec5f61-5158-43c7-ae2a-189169e9a86b-c000.snappy.parquet`)
and `_SUCCESS`.
### Step 7: data verification
Verify and make sure the new data is reasonably complete before deleting
any of the old data. Do a simple time series to see how many posts there
are per day and make sure things don't fall off. It is also useful to
have lab members test out anything they're working on again with the
new parquet files.
## See also ## See also
The CDSC wiki page The CDSC wiki page
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit) [CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
documents the surrounding workflow — where the raw dump files come from is the landing page for this project on the wiki and provides
(currently ArcticShift via academic torrents), how to stage them on cross-links to related CDSC and Hyak documentation. The walkthrough
Hyak, and how to run Spark jobs on the cluster. above used to live there; it now lives here so that doc and code stay
in sync.

162
datasets/add_months.sh Executable file
View File

@@ -0,0 +1,162 @@
#!/usr/bin/env bash
#
# Add one or more new months to the existing parquet datasets using a
# layered append. Designed to run on a single fat node (e.g. cpu-g2 with
# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see
# add_months_multinode.sh.
#
# Usage:
# add_months.sh [--clean] YYYY-MM [YYYY-MM ...]
#
# Example:
# add_months.sh 2025-01 2025-02 2025-03
#
# If temp or staging directories from a previous run exist, the script
# will exit with an error. Pass --clean to wipe them before starting:
#
# The new .zst dump files must live at:
# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst
# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst
#
# Override the dump directories via environment variables if the new files
# are not in the standard locations:
#
# COMMENTS_DUMPDIR=/path/to/new/comments \
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
# ./add_months.sh 2025-01 2025-02
#
# Workflow:
# Part 1 — parse new .zst files into per-month parquets (parallel)
# Part 2 — sort into staging directories, not the live datasets (Spark)
# [script exits here — verify staging before continuing]
# Copy — move staging files into live datasets (run manually after verify)
# Cleanup — remove temp and staging dirs (run manually after copy)
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
#
# Every command below is independently runnable for debugging.
set -e
cd "$(dirname "$0")"
CLEAN=0
if [ "${1:-}" = "--clean" ]; then
CLEAN=1
shift
fi
if [ $# -eq 0 ]; then
echo "Usage: $0 [--clean] YYYY-MM [YYYY-MM ...]" >&2
exit 1
fi
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
PYTHON="${PYTHON:-python3}"
# Part 1 temp dirs (per-month parquets, parsed from .zst)
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
# Staging dirs (sorted new layer; inspected before copying to live)
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
# Live dataset dirs
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
# --- Check for leftover output from a previous run --------------------------
EXISTING=()
for d in "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" \
"$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" \
"$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"; do
[ -e "$d" ] && EXISTING+=("$d")
done
if [ ${#EXISTING[@]} -gt 0 ]; then
if [ $CLEAN -eq 1 ]; then
echo "Removing leftover files from previous run..."
rm -rf "${EXISTING[@]}"
rm -f add_months_tasks.txt add_months_joblog.txt
rm -rf add_months_logs/
else
echo "Error: leftover files from a previous run exist:" >&2
printf ' %s\n' "${EXISTING[@]}" >&2
echo "Re-run with --clean to remove them before starting." >&2
exit 1
fi
fi
# --- Part 1: parse new months in parallel (comments and submissions together) -
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
> add_months_tasks.txt
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
>> add_months_tasks.txt
parallel --joblog add_months_joblog.txt --results add_months_logs \
< add_months_tasks.txt
# --- Part 2: sort new months into staging (Spark, single fat node) ----------
source "$SPARK_CONF_DIR/spark-env.sh"
start_spark_cluster.sh
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
comments_part2.py \
--indir="$TEMP_COMMENTS" \
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
--out_by_author="$STAGING_COMMENTS_AUTH"
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
submissions_part2.py \
--indir="$TEMP_SUBMISSIONS" \
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
stop-all.sh
# --- Verify: inspect staging before copying to live -------------------------
#
# The script stops here. Check the staging output looks right before running
# the copy step manually. The live datasets are untouched at this point.
# Example checks:
#
# ls -lah "$STAGING_COMMENTS_SUB" | head
# python3 -c "
# import pyarrow.parquet as pq, os
# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0]
# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc'])
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
# "
exit 0
# --- Copy: add staging files into live datasets -----------------------------
#
# Run these lines manually after verifying staging. This is the only step
# that touches the live datasets. It only adds new files — existing files
# are never deleted or overwritten.
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
# --- Cleanup: remove temp and staging dirs ----------------------------------
#
# Run after confirming the copy succeeded and the live datasets look right.
rm -f add_months_tasks.txt add_months_joblog.txt
rm -rf add_months_logs/
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env bash
#
# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to
# allocate a Spark cluster across multiple nodes via salloc. Run this
# from a login node.
#
# For the common single-fat-node case, use add_months.sh instead.
#
# Usage:
# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...]
#
# Example (2 nodes, 3 months):
# add_months_multinode.sh 2 2025-01 2025-02 2025-03
#
# Override dump directories via environment variables if needed:
#
# COMMENTS_DUMPDIR=/path/to/new/comments \
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
# ./add_months_multinode.sh 2 2025-01 2025-02
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
set -e
cd "$(dirname "$0")"
NODES="${1:-}"
if [ -z "$NODES" ] || [ $# -lt 2 ]; then
echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2
exit 1
fi
shift
MONTHS=("$@")
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
PYTHON="${PYTHON:-python3}"
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
# --- Part 1: parse new months in parallel -----------------------------------
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \
> add_months_comments_tasks.txt
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \
> add_months_submissions_tasks.txt
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
< add_months_comments_tasks.txt
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
< add_months_submissions_tasks.txt
# --- Part 2: sort new months into staging (multi-node Spark cluster) --------
start_spark_and_run.sh "$NODES" comments_part2.py \
--indir="$TEMP_COMMENTS" \
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
--out_by_author="$STAGING_COMMENTS_AUTH"
start_spark_and_run.sh "$NODES" submissions_part2.py \
--indir="$TEMP_SUBMISSIONS" \
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
# --- Verify staging, then copy and cleanup manually -------------------------
#
# See add_months.sh for verify/copy/cleanup commands — they are identical.
exit 0
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env bash
#
# Add a single new month of dumps to the existing parquet datasets.
#
# Processes only the RC_<month>.zst and RS_<month>.zst files (Part 1),
# leaving the existing per-source temp parquet files untouched, then
# re-runs the Part 2 Spark sort + repartition over the full temp dir so
# the final by_subreddit / by_author datasets pick up the new data.
#
# Usage:
# add_new_month.sh YYYY-MM
#
# Example:
# add_new_month.sh 2025-03
#
# Every command below is independently runnable — to debug, copy a line
# out and run it directly. For a full rebuild instead, see
# build_from_scratch.sh.
#
# Note on cost: Part 2 always re-sorts the full corpus (the sort is global,
# not incremental), so this gets slightly slower each month. For the
# monthly cadence this is fine; if the sort becomes a bottleneck we'd
# need to rearchitect Part 2 to merge-append instead of re-sort.
set -e
cd "$(dirname "$0")"
MONTH="${1:-}"
if [ -z "$MONTH" ]; then
echo "Usage: $0 YYYY-MM" >&2
exit 1
fi
# --- Part 1: parse the new month's dumps (no wipe) -------------------------
# parse the new comments file
python3 comments_part1.py parse_dump "RC_${MONTH}.zst"
# parse the new submissions file
python3 submissions_part1.py parse_dump "RS_${MONTH}.zst"
# --- Part 2: re-sort the full corpus including the new data ---------------
# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 comments_part2.py
# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 submissions_part2.py

View File

@@ -16,8 +16,8 @@
# - GNU parallel installed # - GNU parallel installed
# - start_spark_and_run.sh on PATH (Hyak-provided wrapper) # - start_spark_and_run.sh on PATH (Hyak-provided wrapper)
# #
# To add one new month to an existing build instead of rebuilding from # To add new months to an existing build without rebuilding from scratch,
# scratch, use add_new_month.sh. # use add_months.sh.
set -e set -e
cd "$(dirname "$0")" cd "$(dirname "$0")"

View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Collapse all layers in the comments final datasets into a single clean layer.
Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 comments_merge.py
See merge_layers.sh and dumps_helper.merge_layers for details.
"""
from dumps_helper import COMMENTS, merge_layers
if __name__ == "__main__":
merge_layers(COMMENTS)

View File

@@ -1,14 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Part 2 for comments: Spark sort + repartition the per-source parquets """Part 2 for comments: Spark sort + repartition into the final datasets.
produced by comments_part1.py into the final by_subreddit / by_author
datasets.
Launched via the Hyak-provided start_spark_and_run.sh wrapper: Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 comments_part2.py start_spark_and_run.sh 1 comments_part2.py
start_spark_and_run.sh 1 comments_part2.py --indir=/path/to/parquets --mode=append
--indir defaults to the temp comments dir in dumps_helper.py.
--out_by_subreddit and --out_by_author default to the live dataset paths;
override them to write to staging directories first (see add_months.sh).
""" """
import fire
from dumps_helper import COMMENTS, sort_and_write from dumps_helper import COMMENTS, sort_and_write
if __name__ == "__main__": if __name__ == "__main__":
sort_and_write(COMMENTS) fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
sort_and_write(COMMENTS, indir=indir,
out_by_subreddit=out_by_subreddit,
out_by_author=out_by_author))

View File

@@ -10,6 +10,7 @@ task-list generator, and the Spark sort are all shared here.
""" """
import os import os
import shutil
from datetime import datetime from datetime import datetime
from itertools import islice from itertools import islice
@@ -255,16 +256,27 @@ def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
# --- Part 2: spark sort + repartition -------------------------------------- # --- Part 2: spark sort + repartition --------------------------------------
def sort_and_write(config): def sort_and_write(config, indir=None, out_by_subreddit=None, out_by_author=None):
"""Read the directory of per-source parquets, sort and repartition """Read a directory of per-source parquets, sort and repartition twice
twice (once by subreddit, once by author), and write the two final (once by subreddit, once by author), and write the two output datasets.
datasets. Pyspark is imported lazily so Part 1 callers don't pay the
Spark startup cost.""" indir defaults to config['outdir'].
out_by_subreddit and out_by_author default to config['output_by_subreddit']
and config['output_by_author']. Override them to write to staging directories
instead of the live datasets (see add_months.sh).
Pyspark is imported lazily so Part 1 callers don't pay the Spark startup
cost.
"""
from pyspark.sql import SparkSession, functions as f from pyspark.sql import SparkSession, functions as f
indir = indir or config['outdir']
out_by_subreddit = out_by_subreddit or config['output_by_subreddit']
out_by_author = out_by_author or config['output_by_author']
spark = SparkSession.builder.appName(config['app_name']).getOrCreate() spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
df = spark.read.parquet(config['outdir'], compression='snappy') df = spark.read.parquet(indir, compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit') df = df.drop('subreddit')
@@ -278,9 +290,54 @@ def sort_and_write(config):
sub_keys = config['subreddit_sort_keys'] sub_keys = config['subreddit_sort_keys']
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') df_sub.write.parquet(out_by_subreddit, mode='overwrite', compression='snappy')
auth_keys = config['author_sort_keys'] auth_keys = config['author_sort_keys']
df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df.repartition('author').sort(auth_keys, ascending=True)
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy') df_auth.write.parquet(out_by_author, mode='overwrite', compression='snappy')
def merge_layers(config):
"""Collapse all accumulated layers in the final datasets into a single
clean layer. Reads the existing by_subreddit dataset (which contains all
layers), re-sorts twice, writes to temp paths, then atomically replaces
the originals by renaming.
Safe to interrupt after the writes complete but before the renames — the
originals are untouched until the .merging directories exist. The .old
directories are left behind if the process is interrupted after renaming;
delete them manually once satisfied.
Pyspark is imported lazily so Part 1 callers don't pay the Spark startup
cost.
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(config['app_name'] + ' merge layers').getOrCreate()
# Both final datasets have identical rows; read from by_subreddit.
df = spark.read.parquet(config['output_by_subreddit'])
tmp_sub = config['output_by_subreddit'] + '.merging'
tmp_auth = config['output_by_author'] + '.merging'
sub_keys = config['subreddit_sort_keys']
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
df_sub.write.parquet(tmp_sub, mode='overwrite', compression='snappy')
auth_keys = config['author_sort_keys']
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
df_auth.write.parquet(tmp_auth, mode='overwrite', compression='snappy')
# Atomic swap: rename old → .old, then .merging → final, then delete .old.
old_sub = config['output_by_subreddit'] + '.old'
old_auth = config['output_by_author'] + '.old'
os.rename(config['output_by_subreddit'], old_sub)
os.rename(tmp_sub, config['output_by_subreddit'])
os.rename(config['output_by_author'], old_auth)
os.rename(tmp_auth, config['output_by_author'])
shutil.rmtree(old_sub)
shutil.rmtree(old_auth)

View File

@@ -3,6 +3,9 @@ import re
from collections import defaultdict from collections import defaultdict
from os import path from os import path
import glob import glob
import io
import zstandard
def find_dumps(dumpdir, base_pattern): def find_dumps(dumpdir, base_pattern):
@@ -28,24 +31,28 @@ def open_fileset(files):
yield line yield line
def open_input_file(input_filename): def open_input_file(input_filename):
# .zst handled via the zstandard library to avoid subprocess/container issues
if re.match(r'.*\.zst$', input_filename):
fh = open(input_filename, 'rb')
dctx = zstandard.ZstdDecompressor()
return io.TextIOWrapper(dctx.stream_reader(fh), encoding='utf-8')
if re.match(r'.*\.7z$', input_filename): if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*'] cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename): elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename] cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.bz', input_filename): elif re.match(r'.*\.bz', input_filename):
cmd = ["bzcat", "-dk", input_filename] cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.xz', input_filename): elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename] cmd = ["xzcat", '-dk', '-T 20', input_filename]
elif re.match(r'.*\.zst',input_filename): elif re.match(r'.*\.gz', input_filename):
cmd = ['zstd','-dck', input_filename] cmd = ["zcat", input_filename]
elif re.match(r'.*\.gz',input_filename): else:
cmd = ['gzip','-dc', input_filename] return open(input_filename, 'r')
try: try:
input_file = Popen(cmd, stdout=PIPE).stdout return Popen(cmd, stdout=PIPE).stdout
except NameError as e: except NameError as e:
print(e) print(e)
input_file = open(input_filename, 'r') return open(input_filename, 'r')
return input_file

32
datasets/merge_layers.sh Executable file
View File

@@ -0,0 +1,32 @@
#!/usr/bin/env bash
#
# Collapse all accumulated layers in the final parquet datasets into a
# single clean layer. Use this after several incremental adds via
# add_months.sh when you want to reduce the number of partition files.
#
# Reads the existing by_subreddit / by_author datasets, re-sorts everything,
# writes to temp paths, then atomically replaces the originals via rename.
# The old directories are removed once the new ones are in place.
#
# If the process is interrupted after writing the .merging directories but
# before the renames complete, re-run — the .merging directories will be
# overwritten and the originals are still intact. If interrupted after the
# renames, the .old directories are left behind; delete them manually once
# satisfied with the output.
#
# To add new months without merging, use add_months.sh.
# To rebuild everything from raw dumps, use build_from_scratch.sh.
#
# NOTE: This script and its workflow are written but not yet tested.
# Remove this notice after a successful end-to-end run.
#
# Every command below is independently runnable for debugging.
set -e
cd "$(dirname "$0")"
# merge and collapse comments layers
start_spark_and_run.sh 1 comments_merge.py
# merge and collapse submissions layers
start_spark_and_run.sh 1 submissions_merge.py

View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Collapse all layers in the submissions final datasets into a single clean layer.
Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 submissions_merge.py
See merge_layers.sh and dumps_helper.merge_layers for details.
"""
from dumps_helper import SUBMISSIONS, merge_layers
if __name__ == "__main__":
merge_layers(SUBMISSIONS)

View File

@@ -1,14 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Part 2 for submissions: Spark sort + repartition the per-source parquets """Part 2 for submissions: Spark sort + repartition into the final datasets.
produced by submissions_part1.py into the final by_subreddit / by_author
datasets.
Launched via the Hyak-provided start_spark_and_run.sh wrapper: Must be launched from a login node via the Hyak-provided wrapper:
start_spark_and_run.sh 1 submissions_part2.py start_spark_and_run.sh 1 submissions_part2.py
start_spark_and_run.sh 1 submissions_part2.py --indir=/path/to/parquets --mode=append
--indir defaults to the temp submissions dir in dumps_helper.py.
--out_by_subreddit and --out_by_author default to the live dataset paths;
override them to write to staging directories first (see add_months.sh).
""" """
import fire
from dumps_helper import SUBMISSIONS, sort_and_write from dumps_helper import SUBMISSIONS, sort_and_write
if __name__ == "__main__": if __name__ == "__main__":
sort_and_write(SUBMISSIONS) fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
sort_and_write(SUBMISSIONS, indir=indir,
out_by_subreddit=out_by_subreddit,
out_by_author=out_by_author))

175
similarities/README.md Normal file
View File

@@ -0,0 +1,175 @@
# Subreddit similarity
This directory holds the code that computes pairwise similarities between
subreddits — both term-based (from TF-IDF over comment text) and
author-based (from overlapping commenter sets). Similarity matrices
produced here feed downstream clustering (`../clustering/`) and density
analysis (`../density/`).
## Datasets
Subreddit similarity datasets based on comment terms and comment authors
are available on Hyak in `/gscratch/comdata/output/reddit_similarity`.
The overall approach to subreddit similarity seems to work reasonably
well and the code is stabilizing. If you want help using these
similarities in a project, just reach out to
[Nate](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Texas_at_Austin.29).
By default, the scripts here take a `TopN` parameter which selects the
subreddits to include in the similarity dataset according to how many
total comments they have. You can alternatively pass a value to the
`included_subreddits` parameter for a file with the names of the
subreddits you would like to include on each line.
## Scripts
| Script | What it does |
|---|---|
| `tfidf.py` | Builds TF-IDF vectors for subreddits. Fire CLI subcommands for `authors`, `terms`, `authors_weekly`, `terms_weekly`. |
| `cosine_similarities.py` | Computes cosine similarities between subreddit TF-IDF vectors. Fire CLI subcommands `author`, `term`, `author-tf`. |
| `weekly_cosine_similarities.py` | Same idea but operating on the weekly TF-IDF vectors. |
| `wang_similarity.py` | A variant similarity computation based on user overlaps in the style of Wang et al. |
| `top_subreddits_by_comments.py` | Produces the `subreddits_by_num_comments.csv` ranking used to pick the top-N subreddits for the similarity matrices. |
| `similarities_helper.py` | Shared helpers for building TF-IDF datasets, reindexing, and selecting the top-N subreddits. |
| `Makefile` | Wires everything together with the canonical Hyak output paths. |
## Methods
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) is a common and
simple information-retrieval technique that we can use to quantify the
topic of a subreddit. The goal of TF-IDF is to build a vector for each
subreddit that scores every term (or phrase) according to how
characteristic it is of the overall lexicon used in that subreddit. For
example, the most characteristic terms in the subreddit `/r/christianity`
in the current version of the TF-IDF model are:
| Term | tf_idf |
|:------------:|:------:|
| christians | 0.581 |
| christianity | 0.569 |
| kjv | 0.568 |
| bible | 0.557 |
| scripture | 0.55 |
TF-IDF stands for "term frequency — inverse document frequency" because
it is the product of two terms "term frequency" and "inverse document
frequency." Term frequency quantifies the amount that a term appears in
a subreddit (document). Inverse document frequency quantifies how much
that term appears in other subreddits (documents). As you can see on
the Wikipedia page, there are many possible ways of constructing and
combining these terms.
I chose to normalize term frequency by the maximum (raw) term frequency
for each subreddit:
$$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\max_{t' \in d}{f_{t',d}}}$$
I use the log inverse document frequency:
$$\mathrm{idf}_{t} = \log\frac{N}{|\{d \in D : t \in d\}|}$$
I then combine them using some smoothing to get:
$$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$$
(Other normalization strategies are worth trying — see the note in
`TODO`.)
### Building TF-IDF vectors
The process for building TF-IDF vectors has four steps:
1. Extracting terms using `../ngrams/tf_comments.py`
2. Detecting common phrases using `../ngrams/top_comment_phrases.py`
3. Extracting terms and common phrases using
`../ngrams/tf_comments.py --mwe-pass='second'`
4. Building IDF and TF-IDF scores in `tfidf.py`
#### Running `tf_comments.py` on the backfill queue
The main reason that I did it in four steps instead of one is to take
advantage of the backfill queue for running `tf_comments.py`. This step
requires reading all of the text in every comment and converting it to
a bag of words at the subreddit level. This is a lot of computation
that is easily parallelizable. The script `../ngrams/run_tf_jobs.sh`
partially automates running steps 1 (or 3) on the backfill queue.
#### Phrase detection using pointwise mutual information
TF-IDF is simple, but only uses single words (unigrams). Sequences of
multiple words can be important to account for how words have different
meanings in different contexts or how sequences of words refer to
distinct things like names. Dealing with context or longer sequences of
words is a common challenge in natural language processing since the
number of possible n-grams grows like crazy as n gets bigger. Phrase
detection helps this problem by limiting the set of n-grams to those
most informative.
But how do we detect phrases? I implemented [pointwise mutual
information](https://en.wikipedia.org/wiki/Pointwise_mutual_information),
which is a pretty simple way but seems to work pretty well.
PMI is a quantity derived from information theory. The intuition is
that if two words occur together quite frequently compared to how often
they appear separately then the cooccurrance is likely to be
informative.
$$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)\,p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}$$
In `../ngrams/tf_comments.py` if `--mwe-pass=first` then a 10% sample
of 1-4-grams (sequences of terms up to length 4) will be written to a
file to be consumed by `../ngrams/top_comment_phrases.py`.
`top_comment_phrases.py` computes the PMI for these possible phrases
and writes those that occur at least 3500 times in the sample of
n-grams and have a PMI of at least 3 (about 65000 expressions).
`tf_comments.py --mwe-pass=second` then uses the detected phrases and
adds them to the term frequency data.
## Cosine similarity
Once the TF-IDF vectors are built, making a similarity score between
two subreddits is straightforward using cosine similarity.
$$\text{similarity} = \cos(\theta) = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\|\,\|\mathbf{B}\|} = \frac{\sum_{i=1}^{n}{A_i\,B_i}}{\sqrt{\sum_{i=1}^{n}{A_i^2}}\,\sqrt{\sum_{i=1}^{n}{B_i^2}}}$$
Intuitively, we represent two subreddits as lines in a high-dimensional
space (TF-IDF vectors). In linear algebra, the dot product ($\cdot$)
between two vectors takes their weighted sum (e.g. linear regression is
a dot product of a vector of covariates and a vector of weights). The
vectors might have different lengths — if one subreddit has more words
in comments than the other — so in cosine similarity the dot product
is normalized by the magnitude (length) of the vectors. It turns out
that this is equivalent to taking the cosine of the two vectors. So
cosine similarity in essence quantifies the angle between the two lines
in high-dimensional space. If the cosine similarity between two
subreddits is greater then their TF-IDF vectors are more correlated.
Cosine similarity with TF-IDF is popular (indeed it has been applied to
Reddit in research several times before) because it quantifies the
correlation between the most characteristic terms for two communities.
Compared to other approaches to similarity like those using word
embeddings or topic models it may struggle to handle polysemy, synonymy,
or correlations between different terms. Using phrase detection helps
with this a little bit. The advantages of this approach are simplicity
and scalability. I'm thinking about using [latent semantic
analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis) as an
intermediate step to improve upon similarities based on raw TF-IDFs.
Even still, computing similarities between a large number of subreddits
is computationally expensive and requires $n(n-1)/2$ dot-product
evaluations. This can be sped up by passing
`similarity-threshold=X` where $X>0$ into `cosine_similarities.py`. I
used a cosine similarity function that's built into the spark matrix
library which supports the `DIMSUM` algorithm for approximating
matrix-matrix products. This algorithm is commonly used in industry
(i.e. at Twitter, Google) for large-scale similarity scoring.
## See also
The CDSC wiki page
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
is the landing page for this project on the wiki. The methods writeup
above used to live there; it now lives here so that doc and code stay
in sync.