#!/usr/bin/env python3 # Stage 1 of the dump-to-parquet pipeline: read a compressed Reddit dump # (a single RC_*.zst comment file or RS_*.zst submission file) and write # the parsed records to a per-source parquet file. # # Stage 2 (parquet_part2.py) re-reads the temp directory in Spark and # produces the sorted, partitioned datasets. # # CLI: # parquet_part1.py comments parse_dump RC_2018-08.zst # parquet_part1.py comments gen_task_list # parquet_part1.py submissions parse_dump RS_2018-08.zst # parquet_part1.py submissions gen_task_list # # Override default paths with --dumpdir / --outdir when debugging. import json import os from datetime import datetime from itertools import islice import fire import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import simdjson from helper import find_dumps, open_fileset # --- comments ------------------------------------------------------------- COMMENT_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('link_id', pa.string(), nullable=True), pa.field('parent_id', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('is_submitter', pa.bool_(), nullable=True), pa.field('body', pa.string(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) COMMENT_FIELDS = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] def parse_comment(line): try: comment = json.loads(line) except json.decoder.JSONDecodeError as e: print(e) print(line) row = [None for _ in COMMENT_FIELDS] row[-1] = f"json.decoder.JSONDecodeError|{e}|{line}" return tuple(row) row = [] for name in COMMENT_FIELDS: if name == 'created_utc': row.append(datetime.fromtimestamp(int(comment['created_utc']), tz=None)) elif name == 'edited': val = comment[name] if type(val) == bool: row.append(val) row.append(None) else: row.append(True) row.append(datetime.fromtimestamp(int(val), tz=None)) elif name == "time_edited": continue elif name not in comment: row.append(None) else: row.append(comment[name]) return tuple(row) # --- submissions ---------------------------------------------------------- SUBMISSION_SCHEMA = pa.schema([ pa.field('id', pa.string(), nullable=True), pa.field('author', pa.string(), nullable=True), pa.field('subreddit', pa.string(), nullable=True), pa.field('title', pa.string(), nullable=True), pa.field('created_utc', pa.timestamp('ms'), nullable=True), pa.field('permalink', pa.string(), nullable=True), pa.field('url', pa.string(), nullable=True), pa.field('domain', pa.string(), nullable=True), pa.field('score', pa.int64(), nullable=True), pa.field('ups', pa.int64(), nullable=True), pa.field('downs', pa.int64(), nullable=True), pa.field('over_18', pa.bool_(), nullable=True), pa.field('has_media', pa.bool_(), nullable=True), pa.field('selftext', pa.string(), nullable=True), pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), pa.field('num_comments', pa.int64(), nullable=True), pa.field('gilded', pa.int64(), nullable=True), pa.field('edited', pa.bool_(), nullable=True), pa.field('time_edited', pa.timestamp('ms'), nullable=True), pa.field('subreddit_type', pa.string(), nullable=True), pa.field('subreddit_id', pa.string(), nullable=True), pa.field('subreddit_subscribers', pa.int64(), nullable=True), pa.field('name', pa.string(), nullable=True), pa.field('is_self', pa.bool_(), nullable=True), pa.field('stickied', pa.bool_(), nullable=True), pa.field('quarantine', pa.bool_(), nullable=True), pa.field('error', pa.string(), nullable=True), ]) SUBMISSION_FIELDS = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] _simdjson_parser = simdjson.Parser() def parse_submission(line): try: post = _simdjson_parser.parse(line) except ValueError as e: row = [None for _ in SUBMISSION_FIELDS] row[-1] = f"Error parsing json|{e}|{line}" return tuple(row) row = [] for name in SUBMISSION_FIELDS: if name == 'created_utc' or name == 'retrieved_on': val = post.get(name, None) if val is not None: row.append(datetime.fromtimestamp(int(post[name]), tz=None)) else: row.append(None) elif name == 'edited': val = post[name] if type(val) == bool: row.append(val) row.append(None) else: row.append(True) row.append(datetime.fromtimestamp(int(val), tz=None)) elif name == "time_edited": continue elif name == 'has_media': row.append(post.get('media', None) is not None) elif name not in post: row.append(None) else: row.append(post[name]) return tuple(row) # --- type registry -------------------------------------------------------- TYPES = { 'comments': { 'schema': COMMENT_SCHEMA, 'parser': parse_comment, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", 'file_pattern': 'RC_20*.*', 'task_list': 'parse_comments_task_list', 'fire_path': 'comments', }, 'submissions': { 'schema': SUBMISSION_SCHEMA, 'parser': parse_submission, 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", 'file_pattern': 'RS_20*.*', 'task_list': 'parse_submissions_task_list', 'fire_path': 'submissions', }, } # --- shared workers ------------------------------------------------------- def _parse_dump(dump_type, partition, dumpdir=None, outdir=None): config = TYPES[dump_type] dumpdir = dumpdir or config['dumpdir'] outdir = outdir or config['outdir'] schema = config['schema'] parser = config['parser'] N = 10000 stream = open_fileset([os.path.join(dumpdir, partition)]) rows = map(parser, stream) os.makedirs(outdir, exist_ok=True) outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: while True: chunk = list(islice(rows, N)) if len(chunk) == 0: break pddf = pd.DataFrame(chunk, columns=schema.names) table = pa.Table.from_pandas(pddf, schema=schema) writer.write_table(table) def _gen_task_list(dump_type, dumpdir=None, tasklist=None): config = TYPES[dump_type] dumpdir = dumpdir or config['dumpdir'] tasklist = tasklist or config['task_list'] fire_path = config['fire_path'] files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) with open(tasklist, 'w') as of: for fpath in files: partition = os.path.split(fpath)[1] of.write(f'python3 parquet_part1.py {fire_path} parse_dump {partition}\n') # --- fire CLI ------------------------------------------------------------- class _Subcommand: def __init__(self, dump_type): self._dump_type = dump_type def parse_dump(self, partition, dumpdir=None, outdir=None): _parse_dump(self._dump_type, partition, dumpdir=dumpdir, outdir=outdir) def gen_task_list(self, dumpdir=None, tasklist=None): _gen_task_list(self._dump_type, dumpdir=dumpdir, tasklist=tasklist) if __name__ == "__main__": fire.Fire({'comments': _Subcommand('comments'), 'submissions': _Subcommand('submissions')})