"""Shared logic for the comments and submissions dump-to-parquet pipeline. Used by comments_part1.py / submissions_part1.py (Part 1: one compressed dump file → one parquet file) and comments_part2.py / submissions_part2.py (Part 2: Spark sort + repartition of the per-source parquets). The two dump types only differ in their schemas and a handful of field-specific extractors. The parse loop, the file I/O wrapping, the task-list generator, and the Spark sort are all shared here. """ import os import shutil from datetime import datetime from itertools import islice import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import simdjson from helper import find_dumps, open_fileset _json = simdjson.Parser() # --- field-level extractors ------------------------------------------------ def _ts(name): """Extractor for a unix-timestamp field (or None if missing).""" def handler(record): val = record.get(name) if val is None: return None return datetime.fromtimestamp(int(val), tz=None) return handler def _edited(record): """Returns (edited, time_edited). The dump packs both into one `edited` field that is either a bool (never edited / unknown timestamp) or a unix timestamp.""" val = record.get('edited') if isinstance(val, bool): return (val, None) if val is None: return (None, None) return (True, datetime.fromtimestamp(int(val), tz=None)) def _has_media(record): """Submissions don't have a `has_media` field directly — derive it.""" return record.get('media') is not None # --- generic parse loop ---------------------------------------------------- def parse_record(line, fields, handlers): """Parse one JSON line into a tuple aligned with `fields`. `handlers` maps field name → callable(record) returning either a single value (one column) or a tuple of values (multiple consecutive columns, consuming the next len(tuple)-1 entries in `fields`). Fields without a handler are pulled from the record by name, with missing keys yielding None. The last field in `fields` is reserved for an error message string and is set to None on success. """ try: record = _json.parse(line) except (ValueError, KeyError) as e: row = [None] * len(fields) row[-1] = f"parse error|{e}|{line}" return tuple(row) row = [] skip_next = 0 for name in fields: if skip_next > 0: skip_next -= 1 continue handler = handlers.get(name) if handler is None: try: row.append(record[name]) except KeyError: row.append(None) else: result = handler(record) if isinstance(result, tuple): row.extend(result) skip_next = len(result) - 1 else: row.append(result) return tuple(row) # --- comments schema ------------------------------------------------------- COMMENT_FIELDS = [ 'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author', 'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type', 'subreddit_id', 'stickied', 'is_submitter', 'body', 'error', ] COMMENT_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('link_id', pa.string(), nullable=True), pa.field('parent_id', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('is_submitter', pa.bool_(), nullable=True), pa.field('body', pa.string(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) COMMENT_HANDLERS = { 'created_utc': _ts('created_utc'), 'edited': _edited, } # --- submissions schema ---------------------------------------------------- SUBMISSION_FIELDS = [ 'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url', 'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext', 'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited', 'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name', 'is_self', 'stickied', 'quarantine', 'error', ] SUBMISSION_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('title', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('permalink', pa.string(), nullable=True), pa.field('url', pa.string(), nullable=True), pa.field('domain', pa.string(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('over_18', pa.bool_(), nullable=True), pa.field('has_media', pa.bool_(), nullable=True), pa.field('selftext', pa.string(), nullable=True), pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), pa.field('num_comments', pa.int64(), nullable=True), pa.field('gilded', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('subreddit_subscribers', pa.int64(), nullable=True), pa.field('name', pa.string(), nullable=True), pa.field('is_self', pa.bool_(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('quarantine', pa.bool_(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) SUBMISSION_HANDLERS = { 'created_utc': _ts('created_utc'), 'retrieved_on': _ts('retrieved_on'), 'edited': _edited, 'has_media': _has_media, } # --- per-type configuration ------------------------------------------------ # Defaults that the entry-point scripts pass through, exposed here so the # field/schema/handler triplet, the canonical paths, and the dump filename # pattern all live in one place. COMMENTS = { 'fields': COMMENT_FIELDS, 'schema': COMMENT_SCHEMA, 'handlers': COMMENT_HANDLERS, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", 'file_pattern': 'RC_20*.*', 'task_list': 'parse_comments_task_list', 'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", 'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet", 'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"], 'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"], 'app_name': "Reddit comments to parquet", } SUBMISSIONS = { 'fields': SUBMISSION_FIELDS, 'schema': SUBMISSION_SCHEMA, 'handlers': SUBMISSION_HANDLERS, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", 'file_pattern': 'RS_20*.*', 'task_list': 'parse_submissions_task_list', 'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", 'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet", 'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"], 'author_sort_keys': ["author", "CreatedAt", "id"], 'app_name': "Reddit submissions to parquet", } # --- Part 1: parse one dump file -> one parquet ---------------------------- def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000): """Read one compressed dump from `dumpdir/partition` and write a parquet file to `outdir/.parquet`. Streams chunks of `chunk_size` rows so memory stays bounded.""" dumpdir = dumpdir or config['dumpdir'] outdir = outdir or config['outdir'] schema = config['schema'] fields = config['fields'] handlers = config['handlers'] stream = open_fileset([os.path.join(dumpdir, partition)]) rows = (parse_record(line, fields, handlers) for line in stream) os.makedirs(outdir, exist_ok=True) outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: while True: chunk = list(islice(rows, chunk_size)) if not chunk: break pddf = pd.DataFrame(chunk, columns=schema.names) table = pa.Table.from_pandas(pddf, schema=schema) writer.write_table(table) def gen_task_list(config, script_name, dumpdir=None, tasklist=None): """Write a parallel-friendly task list of `script_name parse_dump ` lines, one per dump file found under `dumpdir`.""" dumpdir = dumpdir or config['dumpdir'] tasklist = tasklist or config['task_list'] files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) with open(tasklist, 'w') as of: for fpath in files: partition = os.path.split(fpath)[1] of.write(f'python3 {script_name} parse_dump {partition}\n') # --- Part 2: spark sort + repartition -------------------------------------- def sort_and_write(config, indir=None, mode='overwrite'): """Read a directory of per-source parquets, sort and repartition twice (once by subreddit, once by author), and write the two final datasets. indir defaults to config['outdir']. mode is passed to parquet write and may be 'overwrite' (default, used by build_from_scratch) or 'append' (used by add_months to layer new data alongside existing files). Pyspark is imported lazily so Part 1 callers don't pay the Spark startup cost. """ from pyspark.sql import SparkSession, functions as f indir = indir or config['outdir'] spark = SparkSession.builder.appName(config['app_name']).getOrCreate() df = spark.read.parquet(indir, compression='snappy') df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') df = df.withColumnRenamed('subreddit_2', 'subreddit') df = df.withColumnRenamed("created_utc", "CreatedAt") df = df.withColumn("Month", f.month(f.col("CreatedAt"))) df = df.withColumn("Year", f.year(f.col("CreatedAt"))) df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt"))) sub_keys = config['subreddit_sort_keys'] df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) df_sub.write.parquet(config['output_by_subreddit'], mode=mode, compression='snappy') auth_keys = config['author_sort_keys'] df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) df_auth.write.parquet(config['output_by_author'], mode=mode, compression='snappy') def merge_layers(config): """Collapse all accumulated layers in the final datasets into a single clean layer. Reads the existing by_subreddit dataset (which contains all layers), re-sorts twice, writes to temp paths, then atomically replaces the originals by renaming. Safe to interrupt after the writes complete but before the renames — the originals are untouched until the .merging directories exist. The .old directories are left behind if the process is interrupted after renaming; delete them manually once satisfied. Pyspark is imported lazily so Part 1 callers don't pay the Spark startup cost. """ from pyspark.sql import SparkSession spark = SparkSession.builder.appName(config['app_name'] + ' merge layers').getOrCreate() # Both final datasets have identical rows; read from by_subreddit. df = spark.read.parquet(config['output_by_subreddit']) tmp_sub = config['output_by_subreddit'] + '.merging' tmp_auth = config['output_by_author'] + '.merging' sub_keys = config['subreddit_sort_keys'] df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) df_sub.write.parquet(tmp_sub, mode='overwrite', compression='snappy') auth_keys = config['author_sort_keys'] df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) df_auth.write.parquet(tmp_auth, mode='overwrite', compression='snappy') # Atomic swap: rename old → .old, then .merging → final, then delete .old. old_sub = config['output_by_subreddit'] + '.old' old_auth = config['output_by_author'] + '.old' os.rename(config['output_by_subreddit'], old_sub) os.rename(tmp_sub, config['output_by_subreddit']) os.rename(config['output_by_author'], old_auth) os.rename(tmp_auth, config['output_by_author']) shutil.rmtree(old_sub) shutil.rmtree(old_auth)