diff --git a/datasets/README.md b/datasets/README.md index f2aeec2..66c10e3 100644 --- a/datasets/README.md +++ b/datasets/README.md @@ -7,14 +7,18 @@ consumes. The pipeline has two stages: -| Script | What it does | +| Stage | What it does | |---|---| -| `parquet_part1.py` | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. | -| `parquet_part2.py` | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. | +| Part 1 | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. | +| Part 2 | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. | -Both scripts use a single fire CLI with `comments` and `submissions` -subcommands, so the comments and submissions paths share all of their -plumbing — only the schema and the JSON parser differ. +Each stage has a thin entry-point script per dump type: + +| Script | Notes | +|---|---| +| `comments_part1.py`, `submissions_part1.py` | Per-file parse. `parse_dump ` and `gen_task_list` subcommands via fire. | +| `comments_part2.py`, `submissions_part2.py` | Spark sort. Launched via `start_spark_and_run.sh`. | +| `dumps_helper.py` | Shared module: schemas, simdjson parser, generic parse loop, parse_dump / gen_task_list / sort_and_write workers. The only per-type code is the two field-handler dicts and the configuration dicts at the top. | ## The two workflows @@ -50,11 +54,14 @@ runner and execute it standalone. For example: ```sh # parse one specific file (skipping the rest of the workflow) -python3 parquet_part1.py comments parse_dump RC_2025-03.zst +python3 comments_part1.py parse_dump RC_2025-03.zst # override default dump/output paths from the CLI -python3 parquet_part1.py comments parse_dump RC_2025-03.zst \ +python3 comments_part1.py parse_dump RC_2025-03.zst \ --dumpdir=/tmp/test --outdir=/tmp/out + +# regenerate just the task list +python3 submissions_part1.py gen_task_list ``` The Spark Part 2 step is launched via `start_spark_and_run.sh` (a diff --git a/datasets/add_new_month.sh b/datasets/add_new_month.sh index 839db4a..5e0af8d 100755 --- a/datasets/add_new_month.sh +++ b/datasets/add_new_month.sh @@ -34,15 +34,15 @@ fi # --- Part 1: parse the new month's dumps (no wipe) ------------------------- # parse the new comments file -python3 parquet_part1.py comments parse_dump "RC_${MONTH}.zst" +python3 comments_part1.py parse_dump "RC_${MONTH}.zst" # parse the new submissions file -python3 parquet_part1.py submissions parse_dump "RS_${MONTH}.zst" +python3 submissions_part1.py parse_dump "RS_${MONTH}.zst" # --- Part 2: re-sort the full corpus including the new data --------------- # sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 parquet_part2.py comments +start_spark_and_run.sh 1 comments_part2.py # sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 parquet_part2.py submissions +start_spark_and_run.sh 1 submissions_part2.py diff --git a/datasets/build_from_scratch.sh b/datasets/build_from_scratch.sh index a6415fe..c5be98f 100755 --- a/datasets/build_from_scratch.sh +++ b/datasets/build_from_scratch.sh @@ -12,7 +12,7 @@ # # Prerequisites: # - raw .zst dumps already staged in the dumpdir locations (see the -# parquet_part1.py defaults, or override via --dumpdir) +# defaults in dumps_helper.py, or override via --dumpdir) # - GNU parallel installed # - start_spark_and_run.sh on PATH (Hyak-provided wrapper) # @@ -31,7 +31,7 @@ TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/reddit_submissions.parquet" rm -rf "$TEMP_COMMENTS" # generate the per-file parse task list -python3 parquet_part1.py comments gen_task_list +python3 comments_part1.py gen_task_list # run all comments parse tasks in parallel parallel --joblog comments_joblog.txt --results comments_logs < parse_comments_task_list @@ -42,7 +42,7 @@ parallel --joblog comments_joblog.txt --results comments_logs < parse_comments_t rm -rf "$TEMP_SUBMISSIONS" # generate the per-file parse task list -python3 parquet_part1.py submissions gen_task_list +python3 submissions_part1.py gen_task_list # run all submissions parse tasks in parallel parallel --joblog submissions_joblog.txt --results submissions_logs < parse_submissions_task_list @@ -50,7 +50,7 @@ parallel --joblog submissions_joblog.txt --results submissions_logs < parse_subm # --- Part 2: spark sort + repartition -------------------------------------- # sort comments and write reddit_comments_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 parquet_part2.py comments +start_spark_and_run.sh 1 comments_part2.py # sort submissions and write reddit_submissions_by_{subreddit,author}.parquet -start_spark_and_run.sh 1 parquet_part2.py submissions +start_spark_and_run.sh 1 submissions_part2.py diff --git a/datasets/comments_part1.py b/datasets/comments_part1.py new file mode 100755 index 0000000..6a28fb2 --- /dev/null +++ b/datasets/comments_part1.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +"""Part 1 for comments: parse one RC_*.zst dump into a parquet file. + +CLI: + comments_part1.py parse_dump RC_2018-08.zst + comments_part1.py gen_task_list + comments_part1.py parse_dump RC_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out +""" + +import fire +from dumps_helper import COMMENTS, parse_dump, gen_task_list + + +def _parse_dump(partition, dumpdir=None, outdir=None): + parse_dump(COMMENTS, partition, dumpdir=dumpdir, outdir=outdir) + + +def _gen_task_list(dumpdir=None, tasklist=None): + gen_task_list(COMMENTS, 'comments_part1.py', dumpdir=dumpdir, tasklist=tasklist) + + +if __name__ == "__main__": + fire.Fire({'parse_dump': _parse_dump, + 'gen_task_list': _gen_task_list}) diff --git a/datasets/comments_part2.py b/datasets/comments_part2.py new file mode 100755 index 0000000..283d8f3 --- /dev/null +++ b/datasets/comments_part2.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Part 2 for comments: Spark sort + repartition the per-source parquets +produced by comments_part1.py into the final by_subreddit / by_author +datasets. + +Launched via the Hyak-provided start_spark_and_run.sh wrapper: + start_spark_and_run.sh 1 comments_part2.py +""" + +from dumps_helper import COMMENTS, sort_and_write + + +if __name__ == "__main__": + sort_and_write(COMMENTS) diff --git a/datasets/dumps_helper.py b/datasets/dumps_helper.py new file mode 100644 index 0000000..2c9a575 --- /dev/null +++ b/datasets/dumps_helper.py @@ -0,0 +1,286 @@ +"""Shared logic for the comments and submissions dump-to-parquet pipeline. + +Used by comments_part1.py / submissions_part1.py (Part 1: one compressed +dump file → one parquet file) and comments_part2.py / submissions_part2.py +(Part 2: Spark sort + repartition of the per-source parquets). + +The two dump types only differ in their schemas and a handful of +field-specific extractors. The parse loop, the file I/O wrapping, the +task-list generator, and the Spark sort are all shared here. +""" + +import os +from datetime import datetime +from itertools import islice + +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import simdjson + +from helper import find_dumps, open_fileset + + +_json = simdjson.Parser() + + +# --- field-level extractors ------------------------------------------------ + +def _ts(name): + """Extractor for a unix-timestamp field (or None if missing).""" + def handler(record): + val = record.get(name) + if val is None: + return None + return datetime.fromtimestamp(int(val), tz=None) + return handler + + +def _edited(record): + """Returns (edited, time_edited). The dump packs both into one `edited` + field that is either a bool (never edited / unknown timestamp) or a + unix timestamp.""" + val = record.get('edited') + if isinstance(val, bool): + return (val, None) + if val is None: + return (None, None) + return (True, datetime.fromtimestamp(int(val), tz=None)) + + +def _has_media(record): + """Submissions don't have a `has_media` field directly — derive it.""" + return record.get('media') is not None + + +# --- generic parse loop ---------------------------------------------------- + +def parse_record(line, fields, handlers): + """Parse one JSON line into a tuple aligned with `fields`. + + `handlers` maps field name → callable(record) returning either a single + value (one column) or a tuple of values (multiple consecutive columns, + consuming the next len(tuple)-1 entries in `fields`). + Fields without a handler are pulled from the record by name, with + missing keys yielding None. + The last field in `fields` is reserved for an error message string + and is set to None on success. + """ + try: + record = _json.parse(line) + except (ValueError, KeyError) as e: + row = [None] * len(fields) + row[-1] = f"parse error|{e}|{line}" + return tuple(row) + + row = [] + skip_next = 0 + for name in fields: + if skip_next > 0: + skip_next -= 1 + continue + handler = handlers.get(name) + if handler is None: + try: + row.append(record[name]) + except KeyError: + row.append(None) + else: + result = handler(record) + if isinstance(result, tuple): + row.extend(result) + skip_next = len(result) - 1 + else: + row.append(result) + return tuple(row) + + +# --- comments schema ------------------------------------------------------- + +COMMENT_FIELDS = [ + 'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author', + 'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type', + 'subreddit_id', 'stickied', 'is_submitter', 'body', 'error', +] + +COMMENT_SCHEMA = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('link_id', pa.string(), nullable=True), + pa.field('parent_id', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('is_submitter', pa.bool_(), nullable=True), + pa.field('body', pa.string(), nullable=True), + pa.field('error', pa.string(), nullable=True), +]) + +COMMENT_HANDLERS = { + 'created_utc': _ts('created_utc'), + 'edited': _edited, +} + + +# --- submissions schema ---------------------------------------------------- + +SUBMISSION_FIELDS = [ + 'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url', + 'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext', + 'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited', + 'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name', + 'is_self', 'stickied', 'quarantine', 'error', +] + +SUBMISSION_SCHEMA = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('title', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('permalink', pa.string(), nullable=True), + pa.field('url', pa.string(), nullable=True), + pa.field('domain', pa.string(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('over_18', pa.bool_(), nullable=True), + pa.field('has_media', pa.bool_(), nullable=True), + pa.field('selftext', pa.string(), nullable=True), + pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), + pa.field('num_comments', pa.int64(), nullable=True), + pa.field('gilded', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('subreddit_subscribers', pa.int64(), nullable=True), + pa.field('name', pa.string(), nullable=True), + pa.field('is_self', pa.bool_(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('quarantine', pa.bool_(), nullable=True), + pa.field('error', pa.string(), nullable=True), +]) + +SUBMISSION_HANDLERS = { + 'created_utc': _ts('created_utc'), + 'retrieved_on': _ts('retrieved_on'), + 'edited': _edited, + 'has_media': _has_media, +} + + +# --- per-type configuration ------------------------------------------------ + +# Defaults that the entry-point scripts pass through, exposed here so the +# field/schema/handler triplet, the canonical paths, and the dump filename +# pattern all live in one place. +COMMENTS = { + 'fields': COMMENT_FIELDS, + 'schema': COMMENT_SCHEMA, + 'handlers': COMMENT_HANDLERS, + 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", + 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", + 'file_pattern': 'RC_20*.*', + 'task_list': 'parse_comments_task_list', + 'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", + 'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet", + 'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"], + 'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"], + 'app_name': "Reddit comments to parquet", +} + +SUBMISSIONS = { + 'fields': SUBMISSION_FIELDS, + 'schema': SUBMISSION_SCHEMA, + 'handlers': SUBMISSION_HANDLERS, + 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", + 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", + 'file_pattern': 'RS_20*.*', + 'task_list': 'parse_submissions_task_list', + 'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", + 'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet", + 'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"], + 'author_sort_keys': ["author", "CreatedAt", "id"], + 'app_name': "Reddit submissions to parquet", +} + + +# --- Part 1: parse one dump file -> one parquet ---------------------------- + +def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000): + """Read one compressed dump from `dumpdir/partition` and write a parquet + file to `outdir/.parquet`. Streams chunks of `chunk_size` + rows so memory stays bounded.""" + dumpdir = dumpdir or config['dumpdir'] + outdir = outdir or config['outdir'] + schema = config['schema'] + fields = config['fields'] + handlers = config['handlers'] + + stream = open_fileset([os.path.join(dumpdir, partition)]) + rows = (parse_record(line, fields, handlers) for line in stream) + + os.makedirs(outdir, exist_ok=True) + outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") + + with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: + while True: + chunk = list(islice(rows, chunk_size)) + if not chunk: + break + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf, schema=schema) + writer.write_table(table) + + +def gen_task_list(config, script_name, dumpdir=None, tasklist=None): + """Write a parallel-friendly task list of `script_name parse_dump ` + lines, one per dump file found under `dumpdir`.""" + dumpdir = dumpdir or config['dumpdir'] + tasklist = tasklist or config['task_list'] + files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) + with open(tasklist, 'w') as of: + for fpath in files: + partition = os.path.split(fpath)[1] + of.write(f'python3 {script_name} parse_dump {partition}\n') + + +# --- Part 2: spark sort + repartition -------------------------------------- + +def sort_and_write(config): + """Read the directory of per-source parquets, sort and repartition + twice (once by subreddit, once by author), and write the two final + datasets. Pyspark is imported lazily so Part 1 callers don't pay the + Spark startup cost.""" + from pyspark.sql import SparkSession, functions as f + + spark = SparkSession.builder.appName(config['app_name']).getOrCreate() + + df = spark.read.parquet(config['outdir'], compression='snappy') + + df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) + df = df.drop('subreddit') + df = df.withColumnRenamed('subreddit_2', 'subreddit') + + df = df.withColumnRenamed("created_utc", "CreatedAt") + df = df.withColumn("Month", f.month(f.col("CreatedAt"))) + df = df.withColumn("Year", f.year(f.col("CreatedAt"))) + df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt"))) + + sub_keys = config['subreddit_sort_keys'] + df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True) + df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True) + df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') + + auth_keys = config['author_sort_keys'] + df_auth = df.repartition('author').sort(auth_keys, ascending=True) + df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True) + df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy') diff --git a/datasets/parquet_part1.py b/datasets/parquet_part1.py deleted file mode 100755 index 9089bf7..0000000 --- a/datasets/parquet_part1.py +++ /dev/null @@ -1,238 +0,0 @@ -#!/usr/bin/env python3 - -# Stage 1 of the dump-to-parquet pipeline: read a compressed Reddit dump -# (a single RC_*.zst comment file or RS_*.zst submission file) and write -# the parsed records to a per-source parquet file. -# -# Stage 2 (parquet_part2.py) re-reads the temp directory in Spark and -# produces the sorted, partitioned datasets. -# -# CLI: -# parquet_part1.py comments parse_dump RC_2018-08.zst -# parquet_part1.py comments gen_task_list -# parquet_part1.py submissions parse_dump RS_2018-08.zst -# parquet_part1.py submissions gen_task_list -# -# Override default paths with --dumpdir / --outdir when debugging. - -import json -import os -from datetime import datetime -from itertools import islice - -import fire -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import simdjson - -from helper import find_dumps, open_fileset - - -# --- comments ------------------------------------------------------------- - -COMMENT_SCHEMA = pa.schema([ - pa.field('id', pa.string(), nullable=True), - pa.field('subreddit', pa.string(), nullable=True), - pa.field('link_id', pa.string(), nullable=True), - pa.field('parent_id', pa.string(), nullable=True), - pa.field('created_utc', pa.timestamp('ms'), nullable=True), - pa.field('author', pa.string(), nullable=True), - pa.field('ups', pa.int64(), nullable=True), - pa.field('downs', pa.int64(), nullable=True), - pa.field('score', pa.int64(), nullable=True), - pa.field('edited', pa.bool_(), nullable=True), - pa.field('time_edited', pa.timestamp('ms'), nullable=True), - pa.field('subreddit_type', pa.string(), nullable=True), - pa.field('subreddit_id', pa.string(), nullable=True), - pa.field('stickied', pa.bool_(), nullable=True), - pa.field('is_submitter', pa.bool_(), nullable=True), - pa.field('body', pa.string(), nullable=True), - pa.field('error', pa.string(), nullable=True), -]) - -COMMENT_FIELDS = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] - - -def parse_comment(line): - try: - comment = json.loads(line) - except json.decoder.JSONDecodeError as e: - print(e) - print(line) - row = [None for _ in COMMENT_FIELDS] - row[-1] = f"json.decoder.JSONDecodeError|{e}|{line}" - return tuple(row) - - row = [] - for name in COMMENT_FIELDS: - if name == 'created_utc': - row.append(datetime.fromtimestamp(int(comment['created_utc']), tz=None)) - elif name == 'edited': - val = comment[name] - if type(val) == bool: - row.append(val) - row.append(None) - else: - row.append(True) - row.append(datetime.fromtimestamp(int(val), tz=None)) - elif name == "time_edited": - continue - elif name not in comment: - row.append(None) - else: - row.append(comment[name]) - return tuple(row) - - -# --- submissions ---------------------------------------------------------- - -SUBMISSION_SCHEMA = pa.schema([ - pa.field('id', pa.string(), nullable=True), - pa.field('author', pa.string(), nullable=True), - pa.field('subreddit', pa.string(), nullable=True), - pa.field('title', pa.string(), nullable=True), - pa.field('created_utc', pa.timestamp('ms'), nullable=True), - pa.field('permalink', pa.string(), nullable=True), - pa.field('url', pa.string(), nullable=True), - pa.field('domain', pa.string(), nullable=True), - pa.field('score', pa.int64(), nullable=True), - pa.field('ups', pa.int64(), nullable=True), - pa.field('downs', pa.int64(), nullable=True), - pa.field('over_18', pa.bool_(), nullable=True), - pa.field('has_media', pa.bool_(), nullable=True), - pa.field('selftext', pa.string(), nullable=True), - pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), - pa.field('num_comments', pa.int64(), nullable=True), - pa.field('gilded', pa.int64(), nullable=True), - pa.field('edited', pa.bool_(), nullable=True), - pa.field('time_edited', pa.timestamp('ms'), nullable=True), - pa.field('subreddit_type', pa.string(), nullable=True), - pa.field('subreddit_id', pa.string(), nullable=True), - pa.field('subreddit_subscribers', pa.int64(), nullable=True), - pa.field('name', pa.string(), nullable=True), - pa.field('is_self', pa.bool_(), nullable=True), - pa.field('stickied', pa.bool_(), nullable=True), - pa.field('quarantine', pa.bool_(), nullable=True), - pa.field('error', pa.string(), nullable=True), -]) - -SUBMISSION_FIELDS = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] - -_simdjson_parser = simdjson.Parser() - - -def parse_submission(line): - try: - post = _simdjson_parser.parse(line) - except ValueError as e: - row = [None for _ in SUBMISSION_FIELDS] - row[-1] = f"Error parsing json|{e}|{line}" - return tuple(row) - - row = [] - for name in SUBMISSION_FIELDS: - if name == 'created_utc' or name == 'retrieved_on': - val = post.get(name, None) - if val is not None: - row.append(datetime.fromtimestamp(int(post[name]), tz=None)) - else: - row.append(None) - elif name == 'edited': - val = post[name] - if type(val) == bool: - row.append(val) - row.append(None) - else: - row.append(True) - row.append(datetime.fromtimestamp(int(val), tz=None)) - elif name == "time_edited": - continue - elif name == 'has_media': - row.append(post.get('media', None) is not None) - elif name not in post: - row.append(None) - else: - row.append(post[name]) - return tuple(row) - - -# --- type registry -------------------------------------------------------- - -TYPES = { - 'comments': { - 'schema': COMMENT_SCHEMA, - 'parser': parse_comment, - 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", - 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", - 'file_pattern': 'RC_20*.*', - 'task_list': 'parse_comments_task_list', - 'fire_path': 'comments', - }, - 'submissions': { - 'schema': SUBMISSION_SCHEMA, - 'parser': parse_submission, - 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", - 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", - 'file_pattern': 'RS_20*.*', - 'task_list': 'parse_submissions_task_list', - 'fire_path': 'submissions', - }, -} - - -# --- shared workers ------------------------------------------------------- - -def _parse_dump(dump_type, partition, dumpdir=None, outdir=None): - config = TYPES[dump_type] - dumpdir = dumpdir or config['dumpdir'] - outdir = outdir or config['outdir'] - schema = config['schema'] - parser = config['parser'] - - N = 10000 - stream = open_fileset([os.path.join(dumpdir, partition)]) - rows = map(parser, stream) - - os.makedirs(outdir, exist_ok=True) - outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") - - with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: - while True: - chunk = list(islice(rows, N)) - if len(chunk) == 0: - break - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf, schema=schema) - writer.write_table(table) - - -def _gen_task_list(dump_type, dumpdir=None, tasklist=None): - config = TYPES[dump_type] - dumpdir = dumpdir or config['dumpdir'] - tasklist = tasklist or config['task_list'] - fire_path = config['fire_path'] - - files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) - with open(tasklist, 'w') as of: - for fpath in files: - partition = os.path.split(fpath)[1] - of.write(f'python3 parquet_part1.py {fire_path} parse_dump {partition}\n') - - -# --- fire CLI ------------------------------------------------------------- - -class _Subcommand: - def __init__(self, dump_type): - self._dump_type = dump_type - - def parse_dump(self, partition, dumpdir=None, outdir=None): - _parse_dump(self._dump_type, partition, dumpdir=dumpdir, outdir=outdir) - - def gen_task_list(self, dumpdir=None, tasklist=None): - _gen_task_list(self._dump_type, dumpdir=dumpdir, tasklist=tasklist) - - -if __name__ == "__main__": - fire.Fire({'comments': _Subcommand('comments'), - 'submissions': _Subcommand('submissions')}) diff --git a/datasets/parquet_part2.py b/datasets/parquet_part2.py deleted file mode 100755 index 1182d80..0000000 --- a/datasets/parquet_part2.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python3 - -# Stage 2 of the dump-to-parquet pipeline: read the per-source parquet -# files produced by parquet_part1.py, sort by subreddit and by author -# (two passes), and write the final repartitioned parquet datasets. -# -# CLI: -# parquet_part2.py comments -# parquet_part2.py submissions -# -# This is a Spark job; launch via start_spark_and_run.sh. - -import fire -import pyspark -from pyspark.sql import functions as f -from pyspark.sql import SparkSession - - -TYPES = { - 'comments': { - 'input_dir': "/gscratch/comdata/output/temp/reddit_comments.parquet", - 'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", - 'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet", - 'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"], - 'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"], - 'app_name': "Reddit comments to parquet", - }, - 'submissions': { - 'input_dir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", - 'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", - 'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet", - 'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"], - 'author_sort_keys': ["author", "CreatedAt", "id"], - 'app_name': "Reddit submissions to parquet", - }, -} - - -def sort_and_write(dump_type): - config = TYPES[dump_type] - - spark = SparkSession.builder.appName(config['app_name']).getOrCreate() - sc = spark.sparkContext - pyspark.SparkConf().set("spark.sql.shuffle.partitions", 2000) - pyspark.SparkConf().set('spark.sql.crossJoin.enabled', "true") - pyspark.SparkConf().set('spark.debug.maxToStringFields', 200) - - df = spark.read.parquet(config['input_dir'], compression='snappy') - - df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) - df = df.drop('subreddit') - df = df.withColumnRenamed('subreddit_2', 'subreddit') - - df = df.withColumnRenamed("created_utc", "CreatedAt") - df = df.withColumn("Month", f.month(f.col("CreatedAt"))) - df = df.withColumn("Year", f.year(f.col("CreatedAt"))) - df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt"))) - - # sort + repartition by subreddit - df_by_subreddit = df.repartition('subreddit') - df_by_subreddit = df_by_subreddit.sort(config['subreddit_sort_keys'], ascending=True) - df_by_subreddit = df_by_subreddit.sortWithinPartitions(config['subreddit_sort_keys'], ascending=True) - df_by_subreddit.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') - - # sort + repartition by author - df_by_author = df.repartition('author') - df_by_author = df_by_author.sort(config['author_sort_keys'], ascending=True) - df_by_author = df_by_author.sortWithinPartitions(config['author_sort_keys'], ascending=True) - df_by_author.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy') - - -if __name__ == "__main__": - fire.Fire(sort_and_write) diff --git a/datasets/submissions_part1.py b/datasets/submissions_part1.py new file mode 100755 index 0000000..f25f925 --- /dev/null +++ b/datasets/submissions_part1.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +"""Part 1 for submissions: parse one RS_*.zst dump into a parquet file. + +CLI: + submissions_part1.py parse_dump RS_2018-08.zst + submissions_part1.py gen_task_list + submissions_part1.py parse_dump RS_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out +""" + +import fire +from dumps_helper import SUBMISSIONS, parse_dump, gen_task_list + + +def _parse_dump(partition, dumpdir=None, outdir=None): + parse_dump(SUBMISSIONS, partition, dumpdir=dumpdir, outdir=outdir) + + +def _gen_task_list(dumpdir=None, tasklist=None): + gen_task_list(SUBMISSIONS, 'submissions_part1.py', dumpdir=dumpdir, tasklist=tasklist) + + +if __name__ == "__main__": + fire.Fire({'parse_dump': _parse_dump, + 'gen_task_list': _gen_task_list}) diff --git a/datasets/submissions_part2.py b/datasets/submissions_part2.py new file mode 100755 index 0000000..55c9be8 --- /dev/null +++ b/datasets/submissions_part2.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Part 2 for submissions: Spark sort + repartition the per-source parquets +produced by submissions_part1.py into the final by_subreddit / by_author +datasets. + +Launched via the Hyak-provided start_spark_and_run.sh wrapper: + start_spark_and_run.sh 1 submissions_part2.py +""" + +from dumps_helper import SUBMISSIONS, sort_and_write + + +if __name__ == "__main__": + sort_and_write(SUBMISSIONS)