"""Shared logic for the comments and submissions dump-to-parquet pipeline. Used by comments_part1.py / submissions_part1.py (Part 1: one compressed dump file → one parquet file) and comments_part2.py / submissions_part2.py (Part 2: Spark sort + repartition of the per-source parquets). The two dump types only differ in their schemas and a handful of field-specific extractors. The parse loop, the file I/O wrapping, the task-list generator, and the Spark sort are all shared here. """ import os from datetime import datetime from itertools import islice import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import simdjson from helper import find_dumps, open_fileset _json = simdjson.Parser() # --- field-level extractors ------------------------------------------------ def _ts(name): """Extractor for a unix-timestamp field (or None if missing).""" def handler(record): val = record.get(name) if val is None: return None return datetime.fromtimestamp(int(val), tz=None) return handler def _edited(record): """Returns (edited, time_edited). The dump packs both into one `edited` field that is either a bool (never edited / unknown timestamp) or a unix timestamp.""" val = record.get('edited') if isinstance(val, bool): return (val, None) if val is None: return (None, None) return (True, datetime.fromtimestamp(int(val), tz=None)) def _has_media(record): """Submissions don't have a `has_media` field directly — derive it.""" return record.get('media') is not None # --- generic parse loop ---------------------------------------------------- def parse_record(line, fields, handlers): """Parse one JSON line into a tuple aligned with `fields`. `handlers` maps field name → callable(record) returning either a single value (one column) or a tuple of values (multiple consecutive columns, consuming the next len(tuple)-1 entries in `fields`). Fields without a handler are pulled from the record by name, with missing keys yielding None. The last field in `fields` is reserved for an error message string and is set to None on success. """ try: record = _json.parse(line) except (ValueError, KeyError) as e: row = [None] * len(fields) row[-1] = f"parse error|{e}|{line}" return tuple(row) row = [] skip_next = 0 for name in fields: if skip_next > 0: skip_next -= 1 continue handler = handlers.get(name) if handler is None: try: row.append(record[name]) except KeyError: row.append(None) else: result = handler(record) if isinstance(result, tuple): row.extend(result) skip_next = len(result) - 1 else: row.append(result) return tuple(row) # --- comments schema ------------------------------------------------------- COMMENT_FIELDS = [ 'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author', 'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type', 'subreddit_id', 'stickied', 'is_submitter', 'body', 'error', ] COMMENT_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('link_id', pa.string(), nullable=True), pa.field('parent_id', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('is_submitter', pa.bool_(), nullable=True), pa.field('body', pa.string(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) COMMENT_HANDLERS = { 'created_utc': _ts('created_utc'), 'edited': _edited, } # --- submissions schema ---------------------------------------------------- SUBMISSION_FIELDS = [ 'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url', 'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext', 'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited', 'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name', 'is_self', 'stickied', 'quarantine', 'error', ] SUBMISSION_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('title', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('permalink', pa.string(), nullable=True), pa.field('url', pa.string(), nullable=True), pa.field('domain', pa.string(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('over_18', pa.bool_(), nullable=True), pa.field('has_media', pa.bool_(), nullable=True), pa.field('selftext', pa.string(), nullable=True), pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), pa.field('num_comments', pa.int64(), nullable=True), pa.field('gilded', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('subreddit_subscribers', pa.int64(), nullable=True), pa.field('name', pa.string(), nullable=True), pa.field('is_self', pa.bool_(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('quarantine', pa.bool_(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) SUBMISSION_HANDLERS = { 'created_utc': _ts('created_utc'), 'retrieved_on': _ts('retrieved_on'), 'edited': _edited, 'has_media': _has_media, } # --- per-type configuration ------------------------------------------------ # Defaults that the entry-point scripts pass through, exposed here so the # field/schema/handler triplet, the canonical paths, and the dump filename # pattern all live in one place. COMMENTS = { 'fields': COMMENT_FIELDS, 'schema': COMMENT_SCHEMA, 'handlers': COMMENT_HANDLERS, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", 'file_pattern': 'RC_20*.*', 'task_list': 'parse_comments_task_list', 'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", 'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet", 'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"], 'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"], 'app_name': "Reddit comments to parquet", } SUBMISSIONS = { 'fields': SUBMISSION_FIELDS, 'schema': SUBMISSION_SCHEMA, 'handlers': SUBMISSION_HANDLERS, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", 'file_pattern': 'RS_20*.*', 'task_list': 'parse_submissions_task_list', 'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", 'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet", 'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"], 'author_sort_keys': ["author", "CreatedAt", "id"], 'app_name': "Reddit submissions to parquet", } # --- Part 1: parse one dump file -> one parquet ---------------------------- def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000): """Read one compressed dump from `dumpdir/partition` and write a parquet file to `outdir/.parquet`. Streams chunks of `chunk_size` rows so memory stays bounded.""" dumpdir = dumpdir or config['dumpdir'] outdir = outdir or config['outdir'] schema = config['schema'] fields = config['fields'] handlers = config['handlers'] stream = open_fileset([os.path.join(dumpdir, partition)]) rows = (parse_record(line, fields, handlers) for line in stream) os.makedirs(outdir, exist_ok=True) outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: while True: chunk = list(islice(rows, chunk_size)) if not chunk: break pddf = pd.DataFrame(chunk, columns=schema.names) table = pa.Table.from_pandas(pddf, schema=schema) writer.write_table(table) def gen_task_list(config, script_name, dumpdir=None, tasklist=None): """Write a parallel-friendly task list of `script_name parse_dump ` lines, one per dump file found under `dumpdir`.""" dumpdir = dumpdir or config['dumpdir'] tasklist = tasklist or config['task_list'] files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) with open(tasklist, 'w') as of: for fpath in files: partition = os.path.split(fpath)[1] of.write(f'python3 {script_name} parse_dump {partition}\n') # --- Part 2: spark sort + repartition -------------------------------------- def sort_and_write(config): """Read the directory of per-source parquets, sort and repartition twice (once by subreddit, once by author), and write the two final datasets. Pyspark is imported lazily so Part 1 callers don't pay the Spark startup cost.""" from pyspark.sql import SparkSession, functions as f spark = SparkSession.builder.appName(config['app_name']).getOrCreate() df = spark.read.parquet(config['outdir'], compression='snappy') df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') df = df.withColumnRenamed('subreddit_2', 'subreddit') df = df.withColumnRenamed("created_utc", "CreatedAt") df = df.withColumn("Month", f.month(f.col("CreatedAt"))) df = df.withColumn("Year", f.year(f.col("CreatedAt"))) df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt"))) sub_keys = config['subreddit_sort_keys'] df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') auth_keys = config['author_sort_keys'] df_auth = df.repartition('author').sort(auth_keys, ascending=True) df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy')