Compare commits
11 Commits
33150243cd
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 2390d2d10c | |||
| 0ea57b2377 | |||
| 6c6e05c360 | |||
| 4854d4f537 | |||
| bf6ccbc84a | |||
| 18925dfe5b | |||
| 926e9bc364 | |||
| 526dc03732 | |||
| 6b18840604 | |||
| 2d1d760142 | |||
| 1851132a06 |
@@ -5,52 +5,151 @@ This directory holds the pipeline that turns compressed Reddit dump files
|
||||
sorted, repartitioned parquet datasets that the rest of the project
|
||||
consumes.
|
||||
|
||||
The pipeline has two stages:
|
||||
## Pipeline overview
|
||||
|
||||
| Stage | What it does |
|
||||
The raw dumps are huge compressed json files with a lot of metadata that
|
||||
we may not need. They aren't indexed so it's expensive to pull data from
|
||||
just a handful of subreddits. It also turns out that it's a pain to read
|
||||
these compressed files straight into spark. Extracting useful variables
|
||||
from the dumps and building parquet datasets makes them easier to work
|
||||
with. This happens in two steps:
|
||||
|
||||
1. Extracting json into (temporary, unpartitioned) parquet files using
|
||||
pyarrow.
|
||||
2. Repartitioning and sorting the data using pyspark.
|
||||
|
||||
Breaking this down into two steps is useful because it allows us to
|
||||
decompress and parse the dumps in the backfill queue and then sort them
|
||||
in spark. Partitioning the data makes it possible to efficiently read
|
||||
data for specific subreddits or authors. Sorting it means that you can
|
||||
efficiently compute aggregations at the subreddit or user level. More
|
||||
documentation on using these files is available on the [CDSC wiki][hyak-datasets].
|
||||
|
||||
The final datasets are in `/gscratch/comdata/output`:
|
||||
|
||||
- `reddit_comments_by_author.parquet` has comments partitioned and sorted
|
||||
by username (lowercase).
|
||||
- `reddit_comments_by_subreddit.parquet` has comments partitioned and
|
||||
sorted by subreddit name (lowercase).
|
||||
- `reddit_submissions_by_author.parquet` has submissions partitioned and
|
||||
sorted by username (lowercase).
|
||||
- `reddit_submissions_by_subreddit.parquet` has submissions partitioned
|
||||
and sorted by subreddit name (lowercase).
|
||||
|
||||
[hyak-datasets]: https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets
|
||||
|
||||
## Scripts
|
||||
|
||||
| Script | Role |
|
||||
|---|---|
|
||||
| Part 1 | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. |
|
||||
| Part 2 | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. |
|
||||
| `comments_part1.py`, `submissions_part1.py` | Part 1 entry points. Each parses one compressed dump into one parquet file. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
|
||||
| `comments_part2.py`, `submissions_part2.py` | Part 2 entry points. Each is a Spark job that reads a directory of per-source parquets and writes the final `*_by_subreddit.parquet` and `*_by_author.parquet` datasets. Accepts `--indir` and `--mode` to support layered appends; defaults match the build-from-scratch workflow. |
|
||||
| `comments_merge.py`, `submissions_merge.py` | Merge entry points. Each is a Spark job that collapses all accumulated layers in the final datasets into a single clean layer. Launched via `start_spark_and_run.sh`. |
|
||||
| `dumps_helper.py` | Shared module. Schemas, the simdjson parser, a generic parse loop with per-field handler dispatch, and the `parse_dump` / `gen_task_list` / `sort_and_write` / `merge_layers` workers that the entry-point scripts wrap. Adding a new dump type or a new field is a one-place edit. |
|
||||
| `helper.py` | Lower-level helpers for opening compressed dump files (`.zst`, `.xz`, `.bz2`, `.gz`). |
|
||||
|
||||
Each stage has a thin entry-point script per dump type:
|
||||
|
||||
| Script | Notes |
|
||||
|---|---|
|
||||
| `comments_part1.py`, `submissions_part1.py` | Per-file parse. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
|
||||
| `comments_part2.py`, `submissions_part2.py` | Spark sort. Launched via `start_spark_and_run.sh`. |
|
||||
| `dumps_helper.py` | Shared module: schemas, simdjson parser, generic parse loop, parse_dump / gen_task_list / sort_and_write workers. The only per-type code is the two field-handler dicts and the configuration dicts at the top. |
|
||||
|
||||
## The two workflows
|
||||
|
||||
There are two ways to run the pipeline; pick the one that matches your
|
||||
situation.
|
||||
## The three workflows
|
||||
|
||||
### Build from scratch — `build_from_scratch.sh`
|
||||
|
||||
Use this when there is no existing parquet output, or when the upstream
|
||||
data has changed in a way that requires reparsing everything. Wipes the
|
||||
per-source temp directories, processes every `RC_*` / `RS_*` dump in the
|
||||
raw dumps directory through Part 1, then runs the Part 2 Spark sort.
|
||||
raw dumps directory through Part 1 (in parallel via GNU parallel), then
|
||||
runs the Part 2 Spark sort.
|
||||
|
||||
### Add a new month — `add_new_month.sh YYYY-MM`
|
||||
### Add new months — `add_months.sh YYYY-MM [YYYY-MM ...]`
|
||||
|
||||
Use this when one or more months of new dump files have arrived and you
|
||||
just want to bring the existing datasets up to date. Processes only the
|
||||
specified month's `RC_<MONTH>.zst` and `RS_<MONTH>.zst` files through
|
||||
Part 1 (the existing per-source parquet files are left in place), then
|
||||
re-runs the Part 2 Spark sort over the full temp directory so the final
|
||||
datasets pick up the new data.
|
||||
> **NOTE: written but not yet tested. Remove this notice after a
|
||||
> successful end-to-end run.**
|
||||
|
||||
The Part 2 sort is global and not incremental, so each monthly add
|
||||
re-sorts the entire corpus. That's fine for a monthly cadence; it would
|
||||
need a rearchitecture if the cost became a problem.
|
||||
Use this for routine incremental updates. Runs Part 1 on only the
|
||||
specified months, then appends the sorted output as a new layer of
|
||||
partition files alongside the existing ones. No existing data is
|
||||
rewritten.
|
||||
|
||||
Each run adds one layer to each final dataset directory. Spark and DuckDB
|
||||
read all layers together correctly. At a yearly update cadence the number
|
||||
of layers stays small; use `merge_layers.sh` to collapse them when
|
||||
needed.
|
||||
|
||||
#### Environment setup
|
||||
|
||||
The Python environment runs inside a Singularity container. Set `PYTHON`
|
||||
to the full path of the venv interpreter so that `parallel` jobs use the
|
||||
right Python (fresh shells spawned by `parallel` don't inherit the active
|
||||
venv):
|
||||
|
||||
```sh
|
||||
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3
|
||||
```
|
||||
|
||||
The `.zst` decompression uses the `zstandard` Python library rather than
|
||||
the system `zstd` binary, which is inaccessible from inside the container.
|
||||
|
||||
#### Dump directory
|
||||
|
||||
The new `.zst` dump files must be accessible at `COMMENTS_DUMPDIR` and
|
||||
`SUBMISSIONS_DUMPDIR`. Override the defaults (which match `dumps_helper.py`)
|
||||
via environment variables if the files are not in the standard locations:
|
||||
|
||||
```sh
|
||||
COMMENTS_DUMPDIR=/path/to/new/comments \
|
||||
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
||||
```
|
||||
|
||||
#### Running as a Slurm job
|
||||
|
||||
The recommended way to run `add_months.sh` is via `srun` on a fat
|
||||
`cpu-g2` node. Using `srun` (rather than `salloc`) means the node is
|
||||
released automatically as soon as the script finishes, regardless of the
|
||||
walltime. Run from a login node inside a `tmux` session so the terminal
|
||||
survives disconnections:
|
||||
|
||||
```sh
|
||||
tmux new -s add_months
|
||||
|
||||
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
|
||||
bash -l -c "
|
||||
cd /mmfs1/gscratch/comdata/users/makohill/cdsc_reddit && \
|
||||
PYTHON=/gscratch/comdata/users/makohill/cdsc_reddit/venv/bin/python3 \
|
||||
COMMENTS_DUMPDIR=/path/to/new/comments \
|
||||
SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
||||
./datasets/add_months.sh --clean 2025-01 2025-02 ... YYYY-MM
|
||||
" 2>&1 | tee /gscratch/comdata/users/makohill/add_months_run.log
|
||||
```
|
||||
|
||||
The `bash -l` flag sources `.bashrc` on the compute node so the Spark
|
||||
environment is available. The `tee` command writes output to both the
|
||||
terminal and a log file so you can review it later.
|
||||
|
||||
Detach from tmux with `Ctrl-b d` and reattach with `tmux attach -t add_months`.
|
||||
|
||||
For a multi-node Spark cluster instead, use `add_months_multinode.sh`
|
||||
from a login node — it takes the number of nodes as its first argument.
|
||||
|
||||
### Merge layers — `merge_layers.sh`
|
||||
|
||||
> **NOTE: written but not yet tested. Remove this notice after a
|
||||
> successful end-to-end run.**
|
||||
|
||||
Use this to collapse accumulated layers from incremental adds into a
|
||||
single clean layer. Reads the existing final datasets, re-sorts
|
||||
everything, writes to `.merging` temp paths, then atomically replaces the
|
||||
originals via rename.
|
||||
|
||||
Run this when query performance has degraded due to many layers, or any
|
||||
time you want a clean single-file-per-partition layout. The existing
|
||||
datasets are safe until the rename step completes; see `merge_layers.sh`
|
||||
for recovery notes if interrupted. As with `add_months.sh`, Part 2 can
|
||||
run on a single fat node or via `start_spark_and_run.sh`.
|
||||
|
||||
## Running steps individually
|
||||
|
||||
Both `.sh` runners are written so that every meaningful step is a separate,
|
||||
self-contained command. If something fails partway through, or you want
|
||||
to inspect intermediate state, you can copy any single line out of the
|
||||
runner and execute it standalone. For example:
|
||||
Both `.sh` runners are written so that every meaningful step is a
|
||||
separate, self-contained command. If something fails partway through, or
|
||||
you want to inspect intermediate state, you can copy any single line out
|
||||
of the runner and execute it standalone. For example:
|
||||
|
||||
```sh
|
||||
# parse one specific file (skipping the rest of the workflow)
|
||||
@@ -68,10 +167,215 @@ The Spark Part 2 step is launched via `start_spark_and_run.sh` (a
|
||||
Hyak-provided wrapper not included in this repo); see the wiki for the
|
||||
launch convention.
|
||||
|
||||
## Detailed walkthrough: refreshing the data on Hyak
|
||||
|
||||
This walkthrough describes the process we went through updating Reddit
|
||||
data from the PushShift cutoff up to the end of 2024. Adapting it for
|
||||
newer data should just involve using different academic torrent files
|
||||
that start from 2025 onwards. For incremental updates, the
|
||||
`add_months.sh` workflow above is much shorter; this walkthrough is
|
||||
for the bulk-refresh case.
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- [Set up Hyak with CDSC lab][hyak-setup] (make sure to update config
|
||||
and `.bashrc`)
|
||||
- [Go through the Hyak Getting Started tutorial][hyak-syllabus]
|
||||
|
||||
Reddit dumps info (handled by `u/Watchful1` and `u/RaiderBDev`):
|
||||
|
||||
- [Watchful1's reddit explanation][watchful1-explainer] (separated by
|
||||
subreddit), the [dataset not divided by subreddits][watchful1-bulk],
|
||||
and the [GitHub repo with scripts for analyzing data][watchful1-repo]
|
||||
- [RaiderBDev monthly dumps][raiderbdev-monthly] and
|
||||
[RaiderBDev's ArcticShift API][arctic-shift]
|
||||
- The [2005-06 to 2024-12 academic torrent][academic-torrent] used for
|
||||
the 2005-2024 refresh
|
||||
|
||||
CDSC and Hyak docs:
|
||||
|
||||
- [Hyak docs — how to work with modules][hyak-modules]
|
||||
- [CDSC — how to download Python or R packages][cdsc-pkgs]
|
||||
- [CDSC — Hyak datasets information][hyak-datasets]
|
||||
- [CDSC — Hyak Spark information][hyak-spark]
|
||||
|
||||
[hyak-setup]: https://wiki.communitydata.science/CommunityData:Hyak#General_Introduction_to_Hyak
|
||||
[hyak-syllabus]: https://hyak.uw.edu/docs/hyak101/basics/syllabus/
|
||||
[watchful1-explainer]: https://www.reddit.com/r/pushshift/comments/1itme1k/separate_dump_files_for_the_top_40k_subreddits/
|
||||
[watchful1-bulk]: https://www.reddit.com/r/pushshift/comments/1i4mlqu/dump_files_from_200506_to_202412/
|
||||
[watchful1-repo]: https://github.com/Watchful1/PushshiftDumps/tree/master
|
||||
[raiderbdev-monthly]: https://www.reddit.com/r/pushshift/comments/1ithjd3/subreddits_metadata_rules_and_wikis_202501/
|
||||
[arctic-shift]: https://github.com/ArthurHeitmann/arctic_shift
|
||||
[academic-torrent]: https://academictorrents.com/details/1614740ac8c94505e4ecb9d88be8bed7b6afddd4
|
||||
[hyak-modules]: https://hyak.uw.edu/docs/tools/modules
|
||||
[cdsc-pkgs]: https://wiki.communitydata.science/CommunityData:Hyak_software_installation#Python_packages
|
||||
[hyak-spark]: https://wiki.communitydata.science/CommunityData:Hyak_Spark
|
||||
|
||||
### Step 1: data download on Nada and Hyak
|
||||
|
||||
We downloaded the [2005-2024 academic torrent][academic-torrent] and put
|
||||
it on Nada (~2 days of downloading). We copied the raw data over to
|
||||
Hyak's scrubbed directory in a new directory,
|
||||
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/reddit`, with raw
|
||||
data sorted into `/comments` or `/submissions`. The `/submissions`
|
||||
directory shows `RS_20*.zst` files and the `/comments` shows `RC_20*.zst`
|
||||
files. (There are no earlier zip files, such as `.bz2` or `.xz`, to deal
|
||||
with.)
|
||||
|
||||
### Step 2: clone the repo on Hyak
|
||||
|
||||
On Hyak, clone this repo (or `scp` the contents of `datasets/`) into the
|
||||
working directory next to the raw data, e.g.
|
||||
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/`. The relevant
|
||||
code lives entirely in `datasets/`:
|
||||
|
||||
- `dumps_helper.py` — shared parsing and Spark logic
|
||||
- `helper.py` — file-open helpers
|
||||
- `comments_part1.py`, `submissions_part1.py` — Part 1 entry points
|
||||
- `comments_part2.py`, `submissions_part2.py` — Part 2 entry points
|
||||
- `comments_merge.py`, `submissions_merge.py` — merge entry points
|
||||
- `build_from_scratch.sh`, `add_months.sh`, `merge_layers.sh` — the runner scripts
|
||||
|
||||
The Spark wrapper scripts (`start_spark_and_run.sh`,
|
||||
`start_spark_cluster.sh`, `start_spark_worker.sh`) are not in this repo;
|
||||
they are part of the CDSC Hyak environment and should already be on
|
||||
PATH.
|
||||
|
||||
### Step 3: smoke-test Part 1 on a single file
|
||||
|
||||
Check out `any_machine`. We'll test submissions Part 1 with just one
|
||||
file:
|
||||
|
||||
```sh
|
||||
python3 submissions_part1.py parse_dump RS_2005-06.zst
|
||||
```
|
||||
|
||||
To verify, go to your output directory and examine the start of the
|
||||
file:
|
||||
|
||||
```sh
|
||||
python3 -c "import pandas as pd; df = pd.read_parquet('reddit_submissions.parquet'); print(df.head())"
|
||||
```
|
||||
|
||||
You should see columns like `id`, `author`, `subreddit`, and `title`
|
||||
printed out. Repeat the process with `comments_part1.py`; you should see
|
||||
columns like `id`, `subreddit`, `link_id`, and `parent_id` printed out.
|
||||
|
||||
**Note**: you may have to install relevant libraries before successfully
|
||||
running the file:
|
||||
|
||||
```sh
|
||||
pip install --user pyarrow simdjson zstandard fire
|
||||
```
|
||||
|
||||
### Step 4: Part 1 — converting `.zst` to `.parquet` files
|
||||
|
||||
Now we'll convert all of our `.zst` compressed Reddit data to `.parquet`
|
||||
files. First, to generate our task list, we'll run
|
||||
|
||||
```sh
|
||||
python3 submissions_part1.py gen_task_list
|
||||
```
|
||||
|
||||
There should be a script, `parse_submissions_task_list`, in the working
|
||||
directory. Check the script (`less parse_submissions_task_list`); it
|
||||
should have many lines that look like our earlier test command,
|
||||
`python3 submissions_part1.py parse_dump RS_2005-06.zst`, but for all of
|
||||
our `.zst` files. Do the same process with comments to generate
|
||||
`parse_comments_task_list`.
|
||||
|
||||
From a login node, run `tmux` to keep our job running and then
|
||||
`any_machine` to check out a node to do computational work. We'll run
|
||||
our tasks (from the task list) in parallel to optimize. Start with
|
||||
submissions:
|
||||
|
||||
```sh
|
||||
parallel --joblog submissions_joblog.txt --results submissions/logs < parse_submissions_task_list
|
||||
```
|
||||
|
||||
The `--joblog` flag creates a text file where you can see which tasks
|
||||
completed successfully, and the `--results` flag creates a directory
|
||||
where each task has its own stderr output to see the specific error
|
||||
(this is best practice for debugging).
|
||||
|
||||
Now we'll monitor the job. Create a new window in tmux (`CTRL+b c`).
|
||||
We'll ssh into our computational node (`ssh n1234` — you can get the
|
||||
node name by running `ourjobs`) and run `htop`
|
||||
([more details on htop][htop-explainer]). You should see that the
|
||||
machine's CPUs are getting close to 100% usage. If all looks good,
|
||||
create a new window and repeat the process for comments.
|
||||
|
||||
[htop-explainer]: https://codeahoy.com/2017/01/20/hhtop-explained-visually/
|
||||
|
||||
Once the job has successfully completed, you'll see that your CPUs are
|
||||
closer to 0% usage in `htop` and your `submissions_joblog.txt` file
|
||||
should show an `exitval` of 0 for all commands. Kill your node by
|
||||
running `scancel 12345678` (the job ID can be found from `ourjobs`).
|
||||
|
||||
### Step 5: verify the per-source parquet files
|
||||
|
||||
We'll want to verify our `.parquet` files at this point. We compared the
|
||||
new files' number of columns and rows to the old data: from the
|
||||
`/gscratch/scrubbed/comdata/reddit_download_2005-2024/output/temp/reddit_comments.parquet`
|
||||
directory, run
|
||||
|
||||
```sh
|
||||
diff <(../../../report_parquet_filesizes.py *.parquet) <(../../../report_parquet_filesizes.py /gscratch/comdata/output/temp/reddit_comments.parquet/*.parquet)
|
||||
```
|
||||
|
||||
and confirm there are no differences (same process with submissions).
|
||||
This may or may not be relevant if we continue using the same academic
|
||||
torrent to update data and have nothing to compare to, but you can still
|
||||
check that the new data's number of columns and rows are fairly
|
||||
continuous with the most recent data we already have.
|
||||
|
||||
### Step 6: Part 2 — sorting the `.parquet` files by author and subreddit via Spark
|
||||
|
||||
If the `.parquet` files reasonably appear to be complete, we can now
|
||||
sort them by author and subreddit. The most efficient way to do so is via
|
||||
`srun` on a `cpu-g2` node (128 CPUs, ~1 TB RAM). Using `srun` releases
|
||||
the node automatically when the job finishes. Run from a login node
|
||||
inside `tmux`:
|
||||
|
||||
```sh
|
||||
srun -p cpu-g2 -A comdata --nodes=1 --time=72:00:00 -c 112 --mem=400G \
|
||||
bash -l -c "
|
||||
cd /path/to/cdsc_reddit/datasets && \
|
||||
source \$SPARK_CONF_DIR/spark-env.sh && \
|
||||
start_spark_cluster.sh && \
|
||||
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT submissions_part2.py && \
|
||||
spark-submit --master spark://\$(hostname):\$SPARK_MASTER_PORT comments_part2.py && \
|
||||
stop-all.sh
|
||||
"
|
||||
```
|
||||
|
||||
[hyak-blog]: https://hyak.uw.edu/blog/g1-vs-g2/
|
||||
|
||||
Monitor via `htop` (as described in Step 4); the CPUs may not always
|
||||
show high usage but you should see that memory is being used. Repeat
|
||||
for the comments. Successful jobs will result in
|
||||
`/gscratch/comdata/output` having four new directories:
|
||||
`reddit_submissions_by_author.parquet`,
|
||||
`reddit_submissions_by_subreddit.parquet`,
|
||||
`reddit_comments_by_author.parquet`, and
|
||||
`reddit_comments_by_subreddit.parquet`. Each should contain many
|
||||
`snappy.parquet` files (e.g.
|
||||
`part-00799-c8ec5f61-5158-43c7-ae2a-189169e9a86b-c000.snappy.parquet`)
|
||||
and `_SUCCESS`.
|
||||
|
||||
### Step 7: data verification
|
||||
|
||||
Verify and make sure the new data is reasonably complete before deleting
|
||||
any of the old data. Do a simple time series to see how many posts there
|
||||
are per day and make sure things don't fall off. It is also useful to
|
||||
have lab members test out anything they're working on again with the
|
||||
new parquet files.
|
||||
|
||||
## See also
|
||||
|
||||
The CDSC wiki page
|
||||
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
|
||||
documents the surrounding workflow — where the raw dump files come from
|
||||
(currently ArcticShift via academic torrents), how to stage them on
|
||||
Hyak, and how to run Spark jobs on the cluster.
|
||||
is the landing page for this project on the wiki and provides
|
||||
cross-links to related CDSC and Hyak documentation. The walkthrough
|
||||
above used to live there; it now lives here so that doc and code stay
|
||||
in sync.
|
||||
|
||||
162
datasets/add_months.sh
Executable file
162
datasets/add_months.sh
Executable file
@@ -0,0 +1,162 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Add one or more new months to the existing parquet datasets using a
|
||||
# layered append. Designed to run on a single fat node (e.g. cpu-g2 with
|
||||
# 128 cores / ~1TB RAM). For a multi-node Spark cluster instead, see
|
||||
# add_months_multinode.sh.
|
||||
#
|
||||
# Usage:
|
||||
# add_months.sh [--clean] YYYY-MM [YYYY-MM ...]
|
||||
#
|
||||
# Example:
|
||||
# add_months.sh 2025-01 2025-02 2025-03
|
||||
#
|
||||
# If temp or staging directories from a previous run exist, the script
|
||||
# will exit with an error. Pass --clean to wipe them before starting:
|
||||
#
|
||||
# The new .zst dump files must live at:
|
||||
# $COMMENTS_DUMPDIR/RC_YYYY-MM.zst
|
||||
# $SUBMISSIONS_DUMPDIR/RS_YYYY-MM.zst
|
||||
#
|
||||
# Override the dump directories via environment variables if the new files
|
||||
# are not in the standard locations:
|
||||
#
|
||||
# COMMENTS_DUMPDIR=/path/to/new/comments \
|
||||
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
||||
# ./add_months.sh 2025-01 2025-02
|
||||
#
|
||||
# Workflow:
|
||||
# Part 1 — parse new .zst files into per-month parquets (parallel)
|
||||
# Part 2 — sort into staging directories, not the live datasets (Spark)
|
||||
# [script exits here — verify staging before continuing]
|
||||
# Copy — move staging files into live datasets (run manually after verify)
|
||||
# Cleanup — remove temp and staging dirs (run manually after copy)
|
||||
#
|
||||
# NOTE: This script and its workflow are written but not yet tested.
|
||||
# Remove this notice after a successful end-to-end run.
|
||||
#
|
||||
# Every command below is independently runnable for debugging.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
CLEAN=0
|
||||
if [ "${1:-}" = "--clean" ]; then
|
||||
CLEAN=1
|
||||
shift
|
||||
fi
|
||||
|
||||
if [ $# -eq 0 ]; then
|
||||
echo "Usage: $0 [--clean] YYYY-MM [YYYY-MM ...]" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
|
||||
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
|
||||
PYTHON="${PYTHON:-python3}"
|
||||
|
||||
# Part 1 temp dirs (per-month parquets, parsed from .zst)
|
||||
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
|
||||
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
|
||||
|
||||
# Staging dirs (sorted new layer; inspected before copying to live)
|
||||
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
|
||||
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
|
||||
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
|
||||
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
|
||||
|
||||
# Live dataset dirs
|
||||
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
|
||||
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
||||
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
||||
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
||||
|
||||
# --- Check for leftover output from a previous run --------------------------
|
||||
|
||||
EXISTING=()
|
||||
for d in "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS" \
|
||||
"$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH" \
|
||||
"$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"; do
|
||||
[ -e "$d" ] && EXISTING+=("$d")
|
||||
done
|
||||
|
||||
if [ ${#EXISTING[@]} -gt 0 ]; then
|
||||
if [ $CLEAN -eq 1 ]; then
|
||||
echo "Removing leftover files from previous run..."
|
||||
rm -rf "${EXISTING[@]}"
|
||||
rm -f add_months_tasks.txt add_months_joblog.txt
|
||||
rm -rf add_months_logs/
|
||||
else
|
||||
echo "Error: leftover files from a previous run exist:" >&2
|
||||
printf ' %s\n' "${EXISTING[@]}" >&2
|
||||
echo "Re-run with --clean to remove them before starting." >&2
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
# --- Part 1: parse new months in parallel (comments and submissions together) -
|
||||
|
||||
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "$@" \
|
||||
> add_months_tasks.txt
|
||||
|
||||
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "$@" \
|
||||
>> add_months_tasks.txt
|
||||
|
||||
parallel --joblog add_months_joblog.txt --results add_months_logs \
|
||||
< add_months_tasks.txt
|
||||
|
||||
# --- Part 2: sort new months into staging (Spark, single fat node) ----------
|
||||
|
||||
source "$SPARK_CONF_DIR/spark-env.sh"
|
||||
start_spark_cluster.sh
|
||||
|
||||
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
||||
comments_part2.py \
|
||||
--indir="$TEMP_COMMENTS" \
|
||||
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
||||
--out_by_author="$STAGING_COMMENTS_AUTH"
|
||||
|
||||
spark-submit --master "spark://$(hostname):$SPARK_MASTER_PORT" \
|
||||
submissions_part2.py \
|
||||
--indir="$TEMP_SUBMISSIONS" \
|
||||
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
||||
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
||||
|
||||
stop-all.sh
|
||||
|
||||
# --- Verify: inspect staging before copying to live -------------------------
|
||||
#
|
||||
# The script stops here. Check the staging output looks right before running
|
||||
# the copy step manually. The live datasets are untouched at this point.
|
||||
# Example checks:
|
||||
#
|
||||
# ls -lah "$STAGING_COMMENTS_SUB" | head
|
||||
# python3 -c "
|
||||
# import pyarrow.parquet as pq, os
|
||||
# f = sorted(os.listdir('$STAGING_COMMENTS_SUB'))[0]
|
||||
# t = pq.read_table('$STAGING_COMMENTS_SUB/' + f, columns=['created_utc'])
|
||||
# print(t.column('created_utc')[0].as_py(), t.column('created_utc')[-1].as_py())
|
||||
# "
|
||||
|
||||
exit 0
|
||||
|
||||
# --- Copy: add staging files into live datasets -----------------------------
|
||||
#
|
||||
# Run these lines manually after verifying staging. This is the only step
|
||||
# that touches the live datasets. It only adds new files — existing files
|
||||
# are never deleted or overwritten.
|
||||
|
||||
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
|
||||
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
|
||||
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
|
||||
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
|
||||
|
||||
# --- Cleanup: remove temp and staging dirs ----------------------------------
|
||||
#
|
||||
# Run after confirming the copy succeeded and the live datasets look right.
|
||||
|
||||
rm -f add_months_tasks.txt add_months_joblog.txt
|
||||
rm -rf add_months_logs/
|
||||
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
|
||||
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
|
||||
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"
|
||||
89
datasets/add_months_multinode.sh
Executable file
89
datasets/add_months_multinode.sh
Executable file
@@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Multi-node variant of add_months.sh. Uses start_spark_and_run.sh to
|
||||
# allocate a Spark cluster across multiple nodes via salloc. Run this
|
||||
# from a login node.
|
||||
#
|
||||
# For the common single-fat-node case, use add_months.sh instead.
|
||||
#
|
||||
# Usage:
|
||||
# add_months_multinode.sh NODES YYYY-MM [YYYY-MM ...]
|
||||
#
|
||||
# Example (2 nodes, 3 months):
|
||||
# add_months_multinode.sh 2 2025-01 2025-02 2025-03
|
||||
#
|
||||
# Override dump directories via environment variables if needed:
|
||||
#
|
||||
# COMMENTS_DUMPDIR=/path/to/new/comments \
|
||||
# SUBMISSIONS_DUMPDIR=/path/to/new/submissions \
|
||||
# ./add_months_multinode.sh 2 2025-01 2025-02
|
||||
#
|
||||
# NOTE: This script and its workflow are written but not yet tested.
|
||||
# Remove this notice after a successful end-to-end run.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
NODES="${1:-}"
|
||||
if [ -z "$NODES" ] || [ $# -lt 2 ]; then
|
||||
echo "Usage: $0 NODES YYYY-MM [YYYY-MM ...]" >&2
|
||||
exit 1
|
||||
fi
|
||||
shift
|
||||
MONTHS=("$@")
|
||||
|
||||
COMMENTS_DUMPDIR="${COMMENTS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/comments}"
|
||||
SUBMISSIONS_DUMPDIR="${SUBMISSIONS_DUMPDIR:-/gscratch/comdata/raw_data/reddit_dumps/submissions}"
|
||||
PYTHON="${PYTHON:-python3}"
|
||||
|
||||
TEMP_COMMENTS="/gscratch/comdata/output/temp/add_months_comments.parquet"
|
||||
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/add_months_submissions.parquet"
|
||||
STAGING_COMMENTS_SUB="/gscratch/comdata/output/temp/new_layer_comments_by_subreddit.parquet"
|
||||
STAGING_COMMENTS_AUTH="/gscratch/comdata/output/temp/new_layer_comments_by_author.parquet"
|
||||
STAGING_SUBMISSIONS_SUB="/gscratch/comdata/output/temp/new_layer_submissions_by_subreddit.parquet"
|
||||
STAGING_SUBMISSIONS_AUTH="/gscratch/comdata/output/temp/new_layer_submissions_by_author.parquet"
|
||||
LIVE_COMMENTS_SUB="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet"
|
||||
LIVE_COMMENTS_AUTH="/gscratch/comdata/output/reddit_comments_by_author.parquet"
|
||||
LIVE_SUBMISSIONS_SUB="/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet"
|
||||
LIVE_SUBMISSIONS_AUTH="/gscratch/comdata/output/reddit_submissions_by_author.parquet"
|
||||
|
||||
# --- Part 1: parse new months in parallel -----------------------------------
|
||||
|
||||
printf "$PYTHON comments_part1.py parse_dump RC_%s.zst --dumpdir=\"$COMMENTS_DUMPDIR\" --outdir=\"$TEMP_COMMENTS\"\n" "${MONTHS[@]}" \
|
||||
> add_months_comments_tasks.txt
|
||||
|
||||
printf "$PYTHON submissions_part1.py parse_dump RS_%s.zst --dumpdir=\"$SUBMISSIONS_DUMPDIR\" --outdir=\"$TEMP_SUBMISSIONS\"\n" "${MONTHS[@]}" \
|
||||
> add_months_submissions_tasks.txt
|
||||
|
||||
parallel --joblog add_months_comments_joblog.txt --results add_months_comments_logs \
|
||||
< add_months_comments_tasks.txt
|
||||
|
||||
parallel --joblog add_months_submissions_joblog.txt --results add_months_submissions_logs \
|
||||
< add_months_submissions_tasks.txt
|
||||
|
||||
# --- Part 2: sort new months into staging (multi-node Spark cluster) --------
|
||||
|
||||
start_spark_and_run.sh "$NODES" comments_part2.py \
|
||||
--indir="$TEMP_COMMENTS" \
|
||||
--out_by_subreddit="$STAGING_COMMENTS_SUB" \
|
||||
--out_by_author="$STAGING_COMMENTS_AUTH"
|
||||
|
||||
start_spark_and_run.sh "$NODES" submissions_part2.py \
|
||||
--indir="$TEMP_SUBMISSIONS" \
|
||||
--out_by_subreddit="$STAGING_SUBMISSIONS_SUB" \
|
||||
--out_by_author="$STAGING_SUBMISSIONS_AUTH"
|
||||
|
||||
# --- Verify staging, then copy and cleanup manually -------------------------
|
||||
#
|
||||
# See add_months.sh for verify/copy/cleanup commands — they are identical.
|
||||
|
||||
exit 0
|
||||
|
||||
find "$STAGING_COMMENTS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_SUB"/ \;
|
||||
find "$STAGING_COMMENTS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_COMMENTS_AUTH"/ \;
|
||||
find "$STAGING_SUBMISSIONS_SUB" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_SUB"/ \;
|
||||
find "$STAGING_SUBMISSIONS_AUTH" -maxdepth 1 -type f -exec cp {} "$LIVE_SUBMISSIONS_AUTH"/ \;
|
||||
|
||||
rm -rf "$TEMP_COMMENTS" "$TEMP_SUBMISSIONS"
|
||||
rm -rf "$STAGING_COMMENTS_SUB" "$STAGING_COMMENTS_AUTH"
|
||||
rm -rf "$STAGING_SUBMISSIONS_SUB" "$STAGING_SUBMISSIONS_AUTH"
|
||||
@@ -1,48 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Add a single new month of dumps to the existing parquet datasets.
|
||||
#
|
||||
# Processes only the RC_<month>.zst and RS_<month>.zst files (Part 1),
|
||||
# leaving the existing per-source temp parquet files untouched, then
|
||||
# re-runs the Part 2 Spark sort + repartition over the full temp dir so
|
||||
# the final by_subreddit / by_author datasets pick up the new data.
|
||||
#
|
||||
# Usage:
|
||||
# add_new_month.sh YYYY-MM
|
||||
#
|
||||
# Example:
|
||||
# add_new_month.sh 2025-03
|
||||
#
|
||||
# Every command below is independently runnable — to debug, copy a line
|
||||
# out and run it directly. For a full rebuild instead, see
|
||||
# build_from_scratch.sh.
|
||||
#
|
||||
# Note on cost: Part 2 always re-sorts the full corpus (the sort is global,
|
||||
# not incremental), so this gets slightly slower each month. For the
|
||||
# monthly cadence this is fine; if the sort becomes a bottleneck we'd
|
||||
# need to rearchitect Part 2 to merge-append instead of re-sort.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
MONTH="${1:-}"
|
||||
if [ -z "$MONTH" ]; then
|
||||
echo "Usage: $0 YYYY-MM" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Part 1: parse the new month's dumps (no wipe) -------------------------
|
||||
|
||||
# parse the new comments file
|
||||
python3 comments_part1.py parse_dump "RC_${MONTH}.zst"
|
||||
|
||||
# parse the new submissions file
|
||||
python3 submissions_part1.py parse_dump "RS_${MONTH}.zst"
|
||||
|
||||
# --- Part 2: re-sort the full corpus including the new data ---------------
|
||||
|
||||
# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 comments_part2.py
|
||||
|
||||
# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 submissions_part2.py
|
||||
@@ -16,8 +16,8 @@
|
||||
# - GNU parallel installed
|
||||
# - start_spark_and_run.sh on PATH (Hyak-provided wrapper)
|
||||
#
|
||||
# To add one new month to an existing build instead of rebuilding from
|
||||
# scratch, use add_new_month.sh.
|
||||
# To add new months to an existing build without rebuilding from scratch,
|
||||
# use add_months.sh.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
14
datasets/comments_merge.py
Normal file
14
datasets/comments_merge.py
Normal file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Collapse all layers in the comments final datasets into a single clean layer.
|
||||
|
||||
Must be launched from a login node via the Hyak-provided wrapper:
|
||||
start_spark_and_run.sh 1 comments_merge.py
|
||||
|
||||
See merge_layers.sh and dumps_helper.merge_layers for details.
|
||||
"""
|
||||
|
||||
from dumps_helper import COMMENTS, merge_layers
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
merge_layers(COMMENTS)
|
||||
@@ -1,14 +1,21 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 2 for comments: Spark sort + repartition the per-source parquets
|
||||
produced by comments_part1.py into the final by_subreddit / by_author
|
||||
datasets.
|
||||
"""Part 2 for comments: Spark sort + repartition into the final datasets.
|
||||
|
||||
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
|
||||
Must be launched from a login node via the Hyak-provided wrapper:
|
||||
start_spark_and_run.sh 1 comments_part2.py
|
||||
start_spark_and_run.sh 1 comments_part2.py --indir=/path/to/parquets --mode=append
|
||||
|
||||
--indir defaults to the temp comments dir in dumps_helper.py.
|
||||
--out_by_subreddit and --out_by_author default to the live dataset paths;
|
||||
override them to write to staging directories first (see add_months.sh).
|
||||
"""
|
||||
|
||||
import fire
|
||||
from dumps_helper import COMMENTS, sort_and_write
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sort_and_write(COMMENTS)
|
||||
fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
|
||||
sort_and_write(COMMENTS, indir=indir,
|
||||
out_by_subreddit=out_by_subreddit,
|
||||
out_by_author=out_by_author))
|
||||
|
||||
@@ -10,6 +10,7 @@ task-list generator, and the Spark sort are all shared here.
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from itertools import islice
|
||||
|
||||
@@ -255,16 +256,27 @@ def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
|
||||
|
||||
# --- Part 2: spark sort + repartition --------------------------------------
|
||||
|
||||
def sort_and_write(config):
|
||||
"""Read the directory of per-source parquets, sort and repartition
|
||||
twice (once by subreddit, once by author), and write the two final
|
||||
datasets. Pyspark is imported lazily so Part 1 callers don't pay the
|
||||
Spark startup cost."""
|
||||
def sort_and_write(config, indir=None, out_by_subreddit=None, out_by_author=None):
|
||||
"""Read a directory of per-source parquets, sort and repartition twice
|
||||
(once by subreddit, once by author), and write the two output datasets.
|
||||
|
||||
indir defaults to config['outdir'].
|
||||
out_by_subreddit and out_by_author default to config['output_by_subreddit']
|
||||
and config['output_by_author']. Override them to write to staging directories
|
||||
instead of the live datasets (see add_months.sh).
|
||||
|
||||
Pyspark is imported lazily so Part 1 callers don't pay the Spark startup
|
||||
cost.
|
||||
"""
|
||||
from pyspark.sql import SparkSession, functions as f
|
||||
|
||||
indir = indir or config['outdir']
|
||||
out_by_subreddit = out_by_subreddit or config['output_by_subreddit']
|
||||
out_by_author = out_by_author or config['output_by_author']
|
||||
|
||||
spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
|
||||
|
||||
df = spark.read.parquet(config['outdir'], compression='snappy')
|
||||
df = spark.read.parquet(indir, compression='snappy')
|
||||
|
||||
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
|
||||
df = df.drop('subreddit')
|
||||
@@ -278,9 +290,54 @@ def sort_and_write(config):
|
||||
sub_keys = config['subreddit_sort_keys']
|
||||
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
|
||||
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
|
||||
df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy')
|
||||
df_sub.write.parquet(out_by_subreddit, mode='overwrite', compression='snappy')
|
||||
|
||||
auth_keys = config['author_sort_keys']
|
||||
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
|
||||
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
|
||||
df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy')
|
||||
df_auth.write.parquet(out_by_author, mode='overwrite', compression='snappy')
|
||||
|
||||
|
||||
def merge_layers(config):
|
||||
"""Collapse all accumulated layers in the final datasets into a single
|
||||
clean layer. Reads the existing by_subreddit dataset (which contains all
|
||||
layers), re-sorts twice, writes to temp paths, then atomically replaces
|
||||
the originals by renaming.
|
||||
|
||||
Safe to interrupt after the writes complete but before the renames — the
|
||||
originals are untouched until the .merging directories exist. The .old
|
||||
directories are left behind if the process is interrupted after renaming;
|
||||
delete them manually once satisfied.
|
||||
|
||||
Pyspark is imported lazily so Part 1 callers don't pay the Spark startup
|
||||
cost.
|
||||
"""
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
spark = SparkSession.builder.appName(config['app_name'] + ' merge layers').getOrCreate()
|
||||
|
||||
# Both final datasets have identical rows; read from by_subreddit.
|
||||
df = spark.read.parquet(config['output_by_subreddit'])
|
||||
|
||||
tmp_sub = config['output_by_subreddit'] + '.merging'
|
||||
tmp_auth = config['output_by_author'] + '.merging'
|
||||
|
||||
sub_keys = config['subreddit_sort_keys']
|
||||
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
|
||||
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
|
||||
df_sub.write.parquet(tmp_sub, mode='overwrite', compression='snappy')
|
||||
|
||||
auth_keys = config['author_sort_keys']
|
||||
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
|
||||
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
|
||||
df_auth.write.parquet(tmp_auth, mode='overwrite', compression='snappy')
|
||||
|
||||
# Atomic swap: rename old → .old, then .merging → final, then delete .old.
|
||||
old_sub = config['output_by_subreddit'] + '.old'
|
||||
old_auth = config['output_by_author'] + '.old'
|
||||
os.rename(config['output_by_subreddit'], old_sub)
|
||||
os.rename(tmp_sub, config['output_by_subreddit'])
|
||||
os.rename(config['output_by_author'], old_auth)
|
||||
os.rename(tmp_auth, config['output_by_author'])
|
||||
shutil.rmtree(old_sub)
|
||||
shutil.rmtree(old_auth)
|
||||
|
||||
@@ -3,6 +3,9 @@ import re
|
||||
from collections import defaultdict
|
||||
from os import path
|
||||
import glob
|
||||
import io
|
||||
import zstandard
|
||||
|
||||
|
||||
def find_dumps(dumpdir, base_pattern):
|
||||
|
||||
@@ -28,24 +31,28 @@ def open_fileset(files):
|
||||
yield line
|
||||
|
||||
def open_input_file(input_filename):
|
||||
# .zst handled via the zstandard library to avoid subprocess/container issues
|
||||
if re.match(r'.*\.zst$', input_filename):
|
||||
fh = open(input_filename, 'rb')
|
||||
dctx = zstandard.ZstdDecompressor()
|
||||
return io.TextIOWrapper(dctx.stream_reader(fh), encoding='utf-8')
|
||||
|
||||
if re.match(r'.*\.7z$', input_filename):
|
||||
cmd = ["7za", "x", "-so", input_filename, '*']
|
||||
elif re.match(r'.*\.gz$', input_filename):
|
||||
cmd = ["zcat", input_filename]
|
||||
cmd = ["7za", "x", "-so", input_filename, '*']
|
||||
elif re.match(r'.*\.bz2$', input_filename):
|
||||
cmd = ["bzcat", "-dk", input_filename]
|
||||
cmd = ["bzcat", "-dk", input_filename]
|
||||
elif re.match(r'.*\.bz', input_filename):
|
||||
cmd = ["bzcat", "-dk", input_filename]
|
||||
cmd = ["bzcat", "-dk", input_filename]
|
||||
elif re.match(r'.*\.xz', input_filename):
|
||||
cmd = ["xzcat",'-dk', '-T 20',input_filename]
|
||||
elif re.match(r'.*\.zst',input_filename):
|
||||
cmd = ['zstd','-dck', input_filename]
|
||||
elif re.match(r'.*\.gz',input_filename):
|
||||
cmd = ['gzip','-dc', input_filename]
|
||||
cmd = ["xzcat", '-dk', '-T 20', input_filename]
|
||||
elif re.match(r'.*\.gz', input_filename):
|
||||
cmd = ["zcat", input_filename]
|
||||
else:
|
||||
return open(input_filename, 'r')
|
||||
|
||||
try:
|
||||
input_file = Popen(cmd, stdout=PIPE).stdout
|
||||
return Popen(cmd, stdout=PIPE).stdout
|
||||
except NameError as e:
|
||||
print(e)
|
||||
input_file = open(input_filename, 'r')
|
||||
return input_file
|
||||
return open(input_filename, 'r')
|
||||
|
||||
|
||||
32
datasets/merge_layers.sh
Executable file
32
datasets/merge_layers.sh
Executable file
@@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Collapse all accumulated layers in the final parquet datasets into a
|
||||
# single clean layer. Use this after several incremental adds via
|
||||
# add_months.sh when you want to reduce the number of partition files.
|
||||
#
|
||||
# Reads the existing by_subreddit / by_author datasets, re-sorts everything,
|
||||
# writes to temp paths, then atomically replaces the originals via rename.
|
||||
# The old directories are removed once the new ones are in place.
|
||||
#
|
||||
# If the process is interrupted after writing the .merging directories but
|
||||
# before the renames complete, re-run — the .merging directories will be
|
||||
# overwritten and the originals are still intact. If interrupted after the
|
||||
# renames, the .old directories are left behind; delete them manually once
|
||||
# satisfied with the output.
|
||||
#
|
||||
# To add new months without merging, use add_months.sh.
|
||||
# To rebuild everything from raw dumps, use build_from_scratch.sh.
|
||||
#
|
||||
# NOTE: This script and its workflow are written but not yet tested.
|
||||
# Remove this notice after a successful end-to-end run.
|
||||
#
|
||||
# Every command below is independently runnable for debugging.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# merge and collapse comments layers
|
||||
start_spark_and_run.sh 1 comments_merge.py
|
||||
|
||||
# merge and collapse submissions layers
|
||||
start_spark_and_run.sh 1 submissions_merge.py
|
||||
14
datasets/submissions_merge.py
Normal file
14
datasets/submissions_merge.py
Normal file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Collapse all layers in the submissions final datasets into a single clean layer.
|
||||
|
||||
Must be launched from a login node via the Hyak-provided wrapper:
|
||||
start_spark_and_run.sh 1 submissions_merge.py
|
||||
|
||||
See merge_layers.sh and dumps_helper.merge_layers for details.
|
||||
"""
|
||||
|
||||
from dumps_helper import SUBMISSIONS, merge_layers
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
merge_layers(SUBMISSIONS)
|
||||
@@ -1,14 +1,21 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 2 for submissions: Spark sort + repartition the per-source parquets
|
||||
produced by submissions_part1.py into the final by_subreddit / by_author
|
||||
datasets.
|
||||
"""Part 2 for submissions: Spark sort + repartition into the final datasets.
|
||||
|
||||
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
|
||||
Must be launched from a login node via the Hyak-provided wrapper:
|
||||
start_spark_and_run.sh 1 submissions_part2.py
|
||||
start_spark_and_run.sh 1 submissions_part2.py --indir=/path/to/parquets --mode=append
|
||||
|
||||
--indir defaults to the temp submissions dir in dumps_helper.py.
|
||||
--out_by_subreddit and --out_by_author default to the live dataset paths;
|
||||
override them to write to staging directories first (see add_months.sh).
|
||||
"""
|
||||
|
||||
import fire
|
||||
from dumps_helper import SUBMISSIONS, sort_and_write
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sort_and_write(SUBMISSIONS)
|
||||
fire.Fire(lambda indir=None, out_by_subreddit=None, out_by_author=None:
|
||||
sort_and_write(SUBMISSIONS, indir=indir,
|
||||
out_by_subreddit=out_by_subreddit,
|
||||
out_by_author=out_by_author))
|
||||
|
||||
175
similarities/README.md
Normal file
175
similarities/README.md
Normal file
@@ -0,0 +1,175 @@
|
||||
# Subreddit similarity
|
||||
|
||||
This directory holds the code that computes pairwise similarities between
|
||||
subreddits — both term-based (from TF-IDF over comment text) and
|
||||
author-based (from overlapping commenter sets). Similarity matrices
|
||||
produced here feed downstream clustering (`../clustering/`) and density
|
||||
analysis (`../density/`).
|
||||
|
||||
## Datasets
|
||||
|
||||
Subreddit similarity datasets based on comment terms and comment authors
|
||||
are available on Hyak in `/gscratch/comdata/output/reddit_similarity`.
|
||||
The overall approach to subreddit similarity seems to work reasonably
|
||||
well and the code is stabilizing. If you want help using these
|
||||
similarities in a project, just reach out to
|
||||
[Nate](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Texas_at_Austin.29).
|
||||
|
||||
By default, the scripts here take a `TopN` parameter which selects the
|
||||
subreddits to include in the similarity dataset according to how many
|
||||
total comments they have. You can alternatively pass a value to the
|
||||
`included_subreddits` parameter for a file with the names of the
|
||||
subreddits you would like to include on each line.
|
||||
|
||||
## Scripts
|
||||
|
||||
| Script | What it does |
|
||||
|---|---|
|
||||
| `tfidf.py` | Builds TF-IDF vectors for subreddits. Fire CLI subcommands for `authors`, `terms`, `authors_weekly`, `terms_weekly`. |
|
||||
| `cosine_similarities.py` | Computes cosine similarities between subreddit TF-IDF vectors. Fire CLI subcommands `author`, `term`, `author-tf`. |
|
||||
| `weekly_cosine_similarities.py` | Same idea but operating on the weekly TF-IDF vectors. |
|
||||
| `wang_similarity.py` | A variant similarity computation based on user overlaps in the style of Wang et al. |
|
||||
| `top_subreddits_by_comments.py` | Produces the `subreddits_by_num_comments.csv` ranking used to pick the top-N subreddits for the similarity matrices. |
|
||||
| `similarities_helper.py` | Shared helpers for building TF-IDF datasets, reindexing, and selecting the top-N subreddits. |
|
||||
| `Makefile` | Wires everything together with the canonical Hyak output paths. |
|
||||
|
||||
## Methods
|
||||
|
||||
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) is a common and
|
||||
simple information-retrieval technique that we can use to quantify the
|
||||
topic of a subreddit. The goal of TF-IDF is to build a vector for each
|
||||
subreddit that scores every term (or phrase) according to how
|
||||
characteristic it is of the overall lexicon used in that subreddit. For
|
||||
example, the most characteristic terms in the subreddit `/r/christianity`
|
||||
in the current version of the TF-IDF model are:
|
||||
|
||||
| Term | tf_idf |
|
||||
|:------------:|:------:|
|
||||
| christians | 0.581 |
|
||||
| christianity | 0.569 |
|
||||
| kjv | 0.568 |
|
||||
| bible | 0.557 |
|
||||
| scripture | 0.55 |
|
||||
|
||||
TF-IDF stands for "term frequency — inverse document frequency" because
|
||||
it is the product of two terms "term frequency" and "inverse document
|
||||
frequency." Term frequency quantifies the amount that a term appears in
|
||||
a subreddit (document). Inverse document frequency quantifies how much
|
||||
that term appears in other subreddits (documents). As you can see on
|
||||
the Wikipedia page, there are many possible ways of constructing and
|
||||
combining these terms.
|
||||
|
||||
I chose to normalize term frequency by the maximum (raw) term frequency
|
||||
for each subreddit:
|
||||
|
||||
$$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\max_{t' \in d}{f_{t',d}}}$$
|
||||
|
||||
I use the log inverse document frequency:
|
||||
|
||||
$$\mathrm{idf}_{t} = \log\frac{N}{|\{d \in D : t \in d\}|}$$
|
||||
|
||||
I then combine them using some smoothing to get:
|
||||
|
||||
$$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$$
|
||||
|
||||
(Other normalization strategies are worth trying — see the note in
|
||||
`TODO`.)
|
||||
|
||||
### Building TF-IDF vectors
|
||||
|
||||
The process for building TF-IDF vectors has four steps:
|
||||
|
||||
1. Extracting terms using `../ngrams/tf_comments.py`
|
||||
2. Detecting common phrases using `../ngrams/top_comment_phrases.py`
|
||||
3. Extracting terms and common phrases using
|
||||
`../ngrams/tf_comments.py --mwe-pass='second'`
|
||||
4. Building IDF and TF-IDF scores in `tfidf.py`
|
||||
|
||||
#### Running `tf_comments.py` on the backfill queue
|
||||
|
||||
The main reason that I did it in four steps instead of one is to take
|
||||
advantage of the backfill queue for running `tf_comments.py`. This step
|
||||
requires reading all of the text in every comment and converting it to
|
||||
a bag of words at the subreddit level. This is a lot of computation
|
||||
that is easily parallelizable. The script `../ngrams/run_tf_jobs.sh`
|
||||
partially automates running steps 1 (or 3) on the backfill queue.
|
||||
|
||||
#### Phrase detection using pointwise mutual information
|
||||
|
||||
TF-IDF is simple, but only uses single words (unigrams). Sequences of
|
||||
multiple words can be important to account for how words have different
|
||||
meanings in different contexts or how sequences of words refer to
|
||||
distinct things like names. Dealing with context or longer sequences of
|
||||
words is a common challenge in natural language processing since the
|
||||
number of possible n-grams grows like crazy as n gets bigger. Phrase
|
||||
detection helps this problem by limiting the set of n-grams to those
|
||||
most informative.
|
||||
|
||||
But how do we detect phrases? I implemented [pointwise mutual
|
||||
information](https://en.wikipedia.org/wiki/Pointwise_mutual_information),
|
||||
which is a pretty simple way but seems to work pretty well.
|
||||
|
||||
PMI is a quantity derived from information theory. The intuition is
|
||||
that if two words occur together quite frequently compared to how often
|
||||
they appear separately then the cooccurrance is likely to be
|
||||
informative.
|
||||
|
||||
$$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)\,p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}$$
|
||||
|
||||
In `../ngrams/tf_comments.py` if `--mwe-pass=first` then a 10% sample
|
||||
of 1-4-grams (sequences of terms up to length 4) will be written to a
|
||||
file to be consumed by `../ngrams/top_comment_phrases.py`.
|
||||
`top_comment_phrases.py` computes the PMI for these possible phrases
|
||||
and writes those that occur at least 3500 times in the sample of
|
||||
n-grams and have a PMI of at least 3 (about 65000 expressions).
|
||||
|
||||
`tf_comments.py --mwe-pass=second` then uses the detected phrases and
|
||||
adds them to the term frequency data.
|
||||
|
||||
## Cosine similarity
|
||||
|
||||
Once the TF-IDF vectors are built, making a similarity score between
|
||||
two subreddits is straightforward using cosine similarity.
|
||||
|
||||
$$\text{similarity} = \cos(\theta) = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\|\,\|\mathbf{B}\|} = \frac{\sum_{i=1}^{n}{A_i\,B_i}}{\sqrt{\sum_{i=1}^{n}{A_i^2}}\,\sqrt{\sum_{i=1}^{n}{B_i^2}}}$$
|
||||
|
||||
Intuitively, we represent two subreddits as lines in a high-dimensional
|
||||
space (TF-IDF vectors). In linear algebra, the dot product ($\cdot$)
|
||||
between two vectors takes their weighted sum (e.g. linear regression is
|
||||
a dot product of a vector of covariates and a vector of weights). The
|
||||
vectors might have different lengths — if one subreddit has more words
|
||||
in comments than the other — so in cosine similarity the dot product
|
||||
is normalized by the magnitude (length) of the vectors. It turns out
|
||||
that this is equivalent to taking the cosine of the two vectors. So
|
||||
cosine similarity in essence quantifies the angle between the two lines
|
||||
in high-dimensional space. If the cosine similarity between two
|
||||
subreddits is greater then their TF-IDF vectors are more correlated.
|
||||
|
||||
Cosine similarity with TF-IDF is popular (indeed it has been applied to
|
||||
Reddit in research several times before) because it quantifies the
|
||||
correlation between the most characteristic terms for two communities.
|
||||
|
||||
Compared to other approaches to similarity like those using word
|
||||
embeddings or topic models it may struggle to handle polysemy, synonymy,
|
||||
or correlations between different terms. Using phrase detection helps
|
||||
with this a little bit. The advantages of this approach are simplicity
|
||||
and scalability. I'm thinking about using [latent semantic
|
||||
analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis) as an
|
||||
intermediate step to improve upon similarities based on raw TF-IDFs.
|
||||
|
||||
Even still, computing similarities between a large number of subreddits
|
||||
is computationally expensive and requires $n(n-1)/2$ dot-product
|
||||
evaluations. This can be sped up by passing
|
||||
`similarity-threshold=X` where $X>0$ into `cosine_similarities.py`. I
|
||||
used a cosine similarity function that's built into the spark matrix
|
||||
library which supports the `DIMSUM` algorithm for approximating
|
||||
matrix-matrix products. This algorithm is commonly used in industry
|
||||
(i.e. at Twitter, Google) for large-scale similarity scoring.
|
||||
|
||||
## See also
|
||||
|
||||
The CDSC wiki page
|
||||
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
|
||||
is the landing page for this project on the wiki. The methods writeup
|
||||
above used to live there; it now lives here so that doc and code stay
|
||||
in sync.
|
||||
Reference in New Issue
Block a user