18
0

datasets/: split parquet scripts; share logic in dumps_helper.py

Follows the helper-module pattern used in similarities/. Replaces
parquet_part1.py and parquet_part2.py (the merged single-file versions
from the previous commit) with:

- dumps_helper.py — schemas, simdjson parser, a generic parse_record
  loop with per-field handler dispatch, and parse_dump / gen_task_list
  / sort_and_write workers. The only per-type code is the field-handler
  dicts and the type-config dicts (COMMENTS, SUBMISSIONS) at the top.
- comments_part1.py, submissions_part1.py — thin Part 1 entry points
  with fire CLIs (parse_dump, gen_task_list).
- comments_part2.py, submissions_part2.py — thin Part 2 entry points
  for the Spark sort. pyspark is imported lazily inside sort_and_write
  so Part 1 callers don't pay the import cost.

Unifies on simdjson for both types (drops the json import), which is
faster on the comments dumps. Field-handler dicts make adding a new
type or field a one-place edit.

Also fixes a latent bug in the original: the FIELDS lists didn't
include time_edited (only the schema did), so error-path rows were
short by one element vs. the schema and would have failed pandas /
pyarrow alignment for any row that hit a JSON parse error. The new
FIELDS lists match the schemas exactly, and the _edited handler
returns a (edited, time_edited) tuple that the generic parse loop
expands.

Runners and README updated for the new CLIs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 16:51:41 -07:00
parent 8965a251b6
commit 33150243cd
10 changed files with 386 additions and 328 deletions

286
datasets/dumps_helper.py Normal file
View File

@@ -0,0 +1,286 @@
"""Shared logic for the comments and submissions dump-to-parquet pipeline.
Used by comments_part1.py / submissions_part1.py (Part 1: one compressed
dump file → one parquet file) and comments_part2.py / submissions_part2.py
(Part 2: Spark sort + repartition of the per-source parquets).
The two dump types only differ in their schemas and a handful of
field-specific extractors. The parse loop, the file I/O wrapping, the
task-list generator, and the Spark sort are all shared here.
"""
import os
from datetime import datetime
from itertools import islice
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import simdjson
from helper import find_dumps, open_fileset
_json = simdjson.Parser()
# --- field-level extractors ------------------------------------------------
def _ts(name):
"""Extractor for a unix-timestamp field (or None if missing)."""
def handler(record):
val = record.get(name)
if val is None:
return None
return datetime.fromtimestamp(int(val), tz=None)
return handler
def _edited(record):
"""Returns (edited, time_edited). The dump packs both into one `edited`
field that is either a bool (never edited / unknown timestamp) or a
unix timestamp."""
val = record.get('edited')
if isinstance(val, bool):
return (val, None)
if val is None:
return (None, None)
return (True, datetime.fromtimestamp(int(val), tz=None))
def _has_media(record):
"""Submissions don't have a `has_media` field directly — derive it."""
return record.get('media') is not None
# --- generic parse loop ----------------------------------------------------
def parse_record(line, fields, handlers):
"""Parse one JSON line into a tuple aligned with `fields`.
`handlers` maps field name → callable(record) returning either a single
value (one column) or a tuple of values (multiple consecutive columns,
consuming the next len(tuple)-1 entries in `fields`).
Fields without a handler are pulled from the record by name, with
missing keys yielding None.
The last field in `fields` is reserved for an error message string
and is set to None on success.
"""
try:
record = _json.parse(line)
except (ValueError, KeyError) as e:
row = [None] * len(fields)
row[-1] = f"parse error|{e}|{line}"
return tuple(row)
row = []
skip_next = 0
for name in fields:
if skip_next > 0:
skip_next -= 1
continue
handler = handlers.get(name)
if handler is None:
try:
row.append(record[name])
except KeyError:
row.append(None)
else:
result = handler(record)
if isinstance(result, tuple):
row.extend(result)
skip_next = len(result) - 1
else:
row.append(result)
return tuple(row)
# --- comments schema -------------------------------------------------------
COMMENT_FIELDS = [
'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author',
'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type',
'subreddit_id', 'stickied', 'is_submitter', 'body', 'error',
]
COMMENT_SCHEMA = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('link_id', pa.string(), nullable=True),
pa.field('parent_id', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('is_submitter', pa.bool_(), nullable=True),
pa.field('body', pa.string(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
COMMENT_HANDLERS = {
'created_utc': _ts('created_utc'),
'edited': _edited,
}
# --- submissions schema ----------------------------------------------------
SUBMISSION_FIELDS = [
'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url',
'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext',
'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited',
'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name',
'is_self', 'stickied', 'quarantine', 'error',
]
SUBMISSION_SCHEMA = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('title', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('permalink', pa.string(), nullable=True),
pa.field('url', pa.string(), nullable=True),
pa.field('domain', pa.string(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('over_18', pa.bool_(), nullable=True),
pa.field('has_media', pa.bool_(), nullable=True),
pa.field('selftext', pa.string(), nullable=True),
pa.field('retrieved_on', pa.timestamp('ms'), nullable=True),
pa.field('num_comments', pa.int64(), nullable=True),
pa.field('gilded', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('subreddit_subscribers', pa.int64(), nullable=True),
pa.field('name', pa.string(), nullable=True),
pa.field('is_self', pa.bool_(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('quarantine', pa.bool_(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
SUBMISSION_HANDLERS = {
'created_utc': _ts('created_utc'),
'retrieved_on': _ts('retrieved_on'),
'edited': _edited,
'has_media': _has_media,
}
# --- per-type configuration ------------------------------------------------
# Defaults that the entry-point scripts pass through, exposed here so the
# field/schema/handler triplet, the canonical paths, and the dump filename
# pattern all live in one place.
COMMENTS = {
'fields': COMMENT_FIELDS,
'schema': COMMENT_SCHEMA,
'handlers': COMMENT_HANDLERS,
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments",
'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet",
'file_pattern': 'RC_20*.*',
'task_list': 'parse_comments_task_list',
'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",
'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet",
'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"],
'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"],
'app_name': "Reddit comments to parquet",
}
SUBMISSIONS = {
'fields': SUBMISSION_FIELDS,
'schema': SUBMISSION_SCHEMA,
'handlers': SUBMISSION_HANDLERS,
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions",
'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet",
'file_pattern': 'RS_20*.*',
'task_list': 'parse_submissions_task_list',
'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet",
'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet",
'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"],
'author_sort_keys': ["author", "CreatedAt", "id"],
'app_name': "Reddit submissions to parquet",
}
# --- Part 1: parse one dump file -> one parquet ----------------------------
def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000):
"""Read one compressed dump from `dumpdir/partition` and write a parquet
file to `outdir/<basename>.parquet`. Streams chunks of `chunk_size`
rows so memory stays bounded."""
dumpdir = dumpdir or config['dumpdir']
outdir = outdir or config['outdir']
schema = config['schema']
fields = config['fields']
handlers = config['handlers']
stream = open_fileset([os.path.join(dumpdir, partition)])
rows = (parse_record(line, fields, handlers) for line in stream)
os.makedirs(outdir, exist_ok=True)
outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet")
with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer:
while True:
chunk = list(islice(rows, chunk_size))
if not chunk:
break
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf, schema=schema)
writer.write_table(table)
def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
"""Write a parallel-friendly task list of `script_name parse_dump <file>`
lines, one per dump file found under `dumpdir`."""
dumpdir = dumpdir or config['dumpdir']
tasklist = tasklist or config['task_list']
files = list(find_dumps(dumpdir, base_pattern=config['file_pattern']))
with open(tasklist, 'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 {script_name} parse_dump {partition}\n')
# --- Part 2: spark sort + repartition --------------------------------------
def sort_and_write(config):
"""Read the directory of per-source parquets, sort and repartition
twice (once by subreddit, once by author), and write the two final
datasets. Pyspark is imported lazily so Part 1 callers don't pay the
Spark startup cost."""
from pyspark.sql import SparkSession, functions as f
spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
df = spark.read.parquet(config['outdir'], compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2', 'subreddit')
df = df.withColumnRenamed("created_utc", "CreatedAt")
df = df.withColumn("Month", f.month(f.col("CreatedAt")))
df = df.withColumn("Year", f.year(f.col("CreatedAt")))
df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt")))
sub_keys = config['subreddit_sort_keys']
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy')
auth_keys = config['author_sort_keys']
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy')