diff --git a/datasets/README.md b/datasets/README.md new file mode 100644 index 0000000..f2aeec2 --- /dev/null +++ b/datasets/README.md @@ -0,0 +1,70 @@ +# Reddit dumps → sorted parquet datasets + +This directory holds the pipeline that turns compressed Reddit dump files +(`RC_YYYY-MM.zst` for comments, `RS_YYYY-MM.zst` for submissions) into the +sorted, repartitioned parquet datasets that the rest of the project +consumes. + +The pipeline has two stages: + +| Script | What it does | +|---|---| +| `parquet_part1.py` | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. | +| `parquet_part2.py` | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. | + +Both scripts use a single fire CLI with `comments` and `submissions` +subcommands, so the comments and submissions paths share all of their +plumbing — only the schema and the JSON parser differ. + +## The two workflows + +There are two ways to run the pipeline; pick the one that matches your +situation. + +### Build from scratch — `build_from_scratch.sh` + +Use this when there is no existing parquet output, or when the upstream +data has changed in a way that requires reparsing everything. Wipes the +per-source temp directories, processes every `RC_*` / `RS_*` dump in the +raw dumps directory through Part 1, then runs the Part 2 Spark sort. + +### Add a new month — `add_new_month.sh YYYY-MM` + +Use this when one or more months of new dump files have arrived and you +just want to bring the existing datasets up to date. Processes only the +specified month's `RC_.zst` and `RS_.zst` files through +Part 1 (the existing per-source parquet files are left in place), then +re-runs the Part 2 Spark sort over the full temp directory so the final +datasets pick up the new data. + +The Part 2 sort is global and not incremental, so each monthly add +re-sorts the entire corpus. That's fine for a monthly cadence; it would +need a rearchitecture if the cost became a problem. + +## Running steps individually + +Both `.sh` runners are written so that every meaningful step is a separate, +self-contained command. If something fails partway through, or you want +to inspect intermediate state, you can copy any single line out of the +runner and execute it standalone. For example: + +```sh +# parse one specific file (skipping the rest of the workflow) +python3 parquet_part1.py comments parse_dump RC_2025-03.zst + +# override default dump/output paths from the CLI +python3 parquet_part1.py comments parse_dump RC_2025-03.zst \ + --dumpdir=/tmp/test --outdir=/tmp/out +``` + +The Spark Part 2 step is launched via `start_spark_and_run.sh` (a +Hyak-provided wrapper not included in this repo); see the wiki for the +launch convention. + +## See also + +The CDSC wiki page +[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit) +documents the surrounding workflow — where the raw dump files come from +(currently ArcticShift via academic torrents), how to stage them on +Hyak, and how to run Spark jobs on the cluster. diff --git a/datasets/add_new_month.sh b/datasets/add_new_month.sh new file mode 100755 index 0000000..839db4a --- /dev/null +++ b/datasets/add_new_month.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +# +# Add a single new month of dumps to the existing parquet datasets. +# +# Processes only the RC_.zst and RS_.zst files (Part 1), +# leaving the existing per-source temp parquet files untouched, then +# re-runs the Part 2 Spark sort + repartition over the full temp dir so +# the final by_subreddit / by_author datasets pick up the new data. +# +# Usage: +# add_new_month.sh YYYY-MM +# +# Example: +# add_new_month.sh 2025-03 +# +# Every command below is independently runnable — to debug, copy a line +# out and run it directly. For a full rebuild instead, see +# build_from_scratch.sh. +# +# Note on cost: Part 2 always re-sorts the full corpus (the sort is global, +# not incremental), so this gets slightly slower each month. For the +# monthly cadence this is fine; if the sort becomes a bottleneck we'd +# need to rearchitect Part 2 to merge-append instead of re-sort. + +set -e +cd "$(dirname "$0")" + +MONTH="${1:-}" +if [ -z "$MONTH" ]; then + echo "Usage: $0 YYYY-MM" >&2 + exit 1 +fi + +# --- Part 1: parse the new month's dumps (no wipe) ------------------------- + +# parse the new comments file +python3 parquet_part1.py comments parse_dump "RC_${MONTH}.zst" + +# parse the new submissions file +python3 parquet_part1.py submissions parse_dump "RS_${MONTH}.zst" + +# --- Part 2: re-sort the full corpus including the new data --------------- + +# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 parquet_part2.py comments + +# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 parquet_part2.py submissions diff --git a/datasets/build_from_scratch.sh b/datasets/build_from_scratch.sh new file mode 100755 index 0000000..a6415fe --- /dev/null +++ b/datasets/build_from_scratch.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# +# Build the sorted, partitioned Reddit parquet datasets from scratch. +# +# Wipes the per-source temp directories, processes every RC_* and RS_* dump +# in the raw_data dumps directory through Part 1 (per-file, parallel), then +# runs the Part 2 Spark sort + repartition for both comments and submissions. +# +# Every command below is independently runnable — to debug a single stage, +# copy the line out and run it directly. Run the whole script end-to-end +# only when you trust each step. +# +# Prerequisites: +# - raw .zst dumps already staged in the dumpdir locations (see the +# parquet_part1.py defaults, or override via --dumpdir) +# - GNU parallel installed +# - start_spark_and_run.sh on PATH (Hyak-provided wrapper) +# +# To add one new month to an existing build instead of rebuilding from +# scratch, use add_new_month.sh. + +set -e +cd "$(dirname "$0")" + +TEMP_COMMENTS="/gscratch/comdata/output/temp/reddit_comments.parquet" +TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/reddit_submissions.parquet" + +# --- Part 1a: comments ------------------------------------------------------ + +# wipe any existing comments temp output +rm -rf "$TEMP_COMMENTS" + +# generate the per-file parse task list +python3 parquet_part1.py comments gen_task_list + +# run all comments parse tasks in parallel +parallel --joblog comments_joblog.txt --results comments_logs < parse_comments_task_list + +# --- Part 1b: submissions --------------------------------------------------- + +# wipe any existing submissions temp output +rm -rf "$TEMP_SUBMISSIONS" + +# generate the per-file parse task list +python3 parquet_part1.py submissions gen_task_list + +# run all submissions parse tasks in parallel +parallel --joblog submissions_joblog.txt --results submissions_logs < parse_submissions_task_list + +# --- Part 2: spark sort + repartition -------------------------------------- + +# sort comments and write reddit_comments_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 parquet_part2.py comments + +# sort submissions and write reddit_submissions_by_{subreddit,author}.parquet +start_spark_and_run.sh 1 parquet_part2.py submissions diff --git a/datasets/comments_2_parquet.sh b/datasets/comments_2_parquet.sh deleted file mode 100755 index 56ecc4d..0000000 --- a/datasets/comments_2_parquet.sh +++ /dev/null @@ -1,10 +0,0 @@ -## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete - -#!/usr/bin/env bash -echo "#!/usr/bin/bash" > job_script.sh -#echo "source $(pwd)/../bin/activate" >> job_script.sh -echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh - -srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh - -start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py diff --git a/datasets/comments_2_parquet_part1.py b/datasets/comments_2_parquet_part1.py deleted file mode 100755 index d3c7b7c..0000000 --- a/datasets/comments_2_parquet_part1.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/env python3 -import json -from datetime import datetime -from multiprocessing import Pool -from itertools import islice -from helper import find_dumps, open_fileset -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq - -def parse_comment(comment, names= None): - if names is None: - names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] - - try: - comment = json.loads(comment) - except json.decoder.JSONDecodeError as e: - print(e) - print(comment) - row = [None for _ in names] - row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment) - return tuple(row) - - row = [] - for name in names: - if name == 'created_utc': - row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None)) - elif name == 'edited': - val = comment[name] - if type(val) == bool: - row.append(val) - row.append(None) - else: - row.append(True) - row.append(datetime.fromtimestamp(int(val),tz=None)) - elif name == "time_edited": - continue - elif name not in comment: - row.append(None) - - else: - row.append(comment[name]) - - return tuple(row) - - -# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) - -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/" - -files = list(find_dumps(dumpdir, base_pattern="RC_20*")) - -pool = Pool(28) - -stream = open_fileset(files) - -N = int(1e4) - -rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) - -schema = pa.schema([ - pa.field('id', pa.string(), nullable=True), - pa.field('subreddit', pa.string(), nullable=True), - pa.field('link_id', pa.string(), nullable=True), - pa.field('parent_id', pa.string(), nullable=True), - pa.field('created_utc', pa.timestamp('ms'), nullable=True), - pa.field('author', pa.string(), nullable=True), - pa.field('ups', pa.int64(), nullable=True), - pa.field('downs', pa.int64(), nullable=True), - pa.field('score', pa.int64(), nullable=True), - pa.field('edited', pa.bool_(), nullable=True), - pa.field('time_edited', pa.timestamp('ms'), nullable=True), - pa.field('subreddit_type', pa.string(), nullable=True), - pa.field('subreddit_id', pa.string(), nullable=True), - pa.field('stickied', pa.bool_(), nullable=True), - pa.field('is_submitter', pa.bool_(), nullable=True), - pa.field('body', pa.string(), nullable=True), - pa.field('error', pa.string(), nullable=True), -]) - -from pathlib import Path -p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2") - -if not p.is_dir(): - if p.exists(): - p.unlink() - p.mkdir() - -else: - list(map(Path.unlink,p.glob('*'))) - -part_size = int(1e7) -part = 1 -n_output = 0 -writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - -while True: - if n_output > part_size: - if part > 1: - writer.close() - - part = part + 1 - n_output = 0 - - writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark') - - n_output += N - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) - - diff --git a/datasets/comments_2_parquet_part2.py b/datasets/comments_2_parquet_part2.py deleted file mode 100755 index 0d5cc9e..0000000 --- a/datasets/comments_2_parquet_part2.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -# spark script to make sorted, and partitioned parquet files - -from pyspark.sql import functions as f -from pyspark.sql import SparkSession - -spark = SparkSession.builder.getOrCreate() - -df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy') - -df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) -df = df.drop('subreddit') -df = df.withColumnRenamed('subreddit_2','subreddit') - -df = df.withColumnRenamed("created_utc","CreatedAt") -df = df.withColumn("Month",f.month(f.col("CreatedAt"))) -df = df.withColumn("Year",f.year(f.col("CreatedAt"))) -df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) - -df = df.repartition('subreddit') -df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) -df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy') - -df = df.repartition('author') -df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy') diff --git a/datasets/job_script.sh b/datasets/job_script.sh deleted file mode 100755 index d90b618..0000000 --- a/datasets/job_script.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/bash -start_spark_cluster.sh -spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000 -stop-all.sh diff --git a/datasets/parquet_part1.py b/datasets/parquet_part1.py new file mode 100755 index 0000000..9089bf7 --- /dev/null +++ b/datasets/parquet_part1.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 + +# Stage 1 of the dump-to-parquet pipeline: read a compressed Reddit dump +# (a single RC_*.zst comment file or RS_*.zst submission file) and write +# the parsed records to a per-source parquet file. +# +# Stage 2 (parquet_part2.py) re-reads the temp directory in Spark and +# produces the sorted, partitioned datasets. +# +# CLI: +# parquet_part1.py comments parse_dump RC_2018-08.zst +# parquet_part1.py comments gen_task_list +# parquet_part1.py submissions parse_dump RS_2018-08.zst +# parquet_part1.py submissions gen_task_list +# +# Override default paths with --dumpdir / --outdir when debugging. + +import json +import os +from datetime import datetime +from itertools import islice + +import fire +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import simdjson + +from helper import find_dumps, open_fileset + + +# --- comments ------------------------------------------------------------- + +COMMENT_SCHEMA = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('link_id', pa.string(), nullable=True), + pa.field('parent_id', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('is_submitter', pa.bool_(), nullable=True), + pa.field('body', pa.string(), nullable=True), + pa.field('error', pa.string(), nullable=True), +]) + +COMMENT_FIELDS = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] + + +def parse_comment(line): + try: + comment = json.loads(line) + except json.decoder.JSONDecodeError as e: + print(e) + print(line) + row = [None for _ in COMMENT_FIELDS] + row[-1] = f"json.decoder.JSONDecodeError|{e}|{line}" + return tuple(row) + + row = [] + for name in COMMENT_FIELDS: + if name == 'created_utc': + row.append(datetime.fromtimestamp(int(comment['created_utc']), tz=None)) + elif name == 'edited': + val = comment[name] + if type(val) == bool: + row.append(val) + row.append(None) + else: + row.append(True) + row.append(datetime.fromtimestamp(int(val), tz=None)) + elif name == "time_edited": + continue + elif name not in comment: + row.append(None) + else: + row.append(comment[name]) + return tuple(row) + + +# --- submissions ---------------------------------------------------------- + +SUBMISSION_SCHEMA = pa.schema([ + pa.field('id', pa.string(), nullable=True), + pa.field('author', pa.string(), nullable=True), + pa.field('subreddit', pa.string(), nullable=True), + pa.field('title', pa.string(), nullable=True), + pa.field('created_utc', pa.timestamp('ms'), nullable=True), + pa.field('permalink', pa.string(), nullable=True), + pa.field('url', pa.string(), nullable=True), + pa.field('domain', pa.string(), nullable=True), + pa.field('score', pa.int64(), nullable=True), + pa.field('ups', pa.int64(), nullable=True), + pa.field('downs', pa.int64(), nullable=True), + pa.field('over_18', pa.bool_(), nullable=True), + pa.field('has_media', pa.bool_(), nullable=True), + pa.field('selftext', pa.string(), nullable=True), + pa.field('retrieved_on', pa.timestamp('ms'), nullable=True), + pa.field('num_comments', pa.int64(), nullable=True), + pa.field('gilded', pa.int64(), nullable=True), + pa.field('edited', pa.bool_(), nullable=True), + pa.field('time_edited', pa.timestamp('ms'), nullable=True), + pa.field('subreddit_type', pa.string(), nullable=True), + pa.field('subreddit_id', pa.string(), nullable=True), + pa.field('subreddit_subscribers', pa.int64(), nullable=True), + pa.field('name', pa.string(), nullable=True), + pa.field('is_self', pa.bool_(), nullable=True), + pa.field('stickied', pa.bool_(), nullable=True), + pa.field('quarantine', pa.bool_(), nullable=True), + pa.field('error', pa.string(), nullable=True), +]) + +SUBMISSION_FIELDS = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] + +_simdjson_parser = simdjson.Parser() + + +def parse_submission(line): + try: + post = _simdjson_parser.parse(line) + except ValueError as e: + row = [None for _ in SUBMISSION_FIELDS] + row[-1] = f"Error parsing json|{e}|{line}" + return tuple(row) + + row = [] + for name in SUBMISSION_FIELDS: + if name == 'created_utc' or name == 'retrieved_on': + val = post.get(name, None) + if val is not None: + row.append(datetime.fromtimestamp(int(post[name]), tz=None)) + else: + row.append(None) + elif name == 'edited': + val = post[name] + if type(val) == bool: + row.append(val) + row.append(None) + else: + row.append(True) + row.append(datetime.fromtimestamp(int(val), tz=None)) + elif name == "time_edited": + continue + elif name == 'has_media': + row.append(post.get('media', None) is not None) + elif name not in post: + row.append(None) + else: + row.append(post[name]) + return tuple(row) + + +# --- type registry -------------------------------------------------------- + +TYPES = { + 'comments': { + 'schema': COMMENT_SCHEMA, + 'parser': parse_comment, + 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments", + 'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet", + 'file_pattern': 'RC_20*.*', + 'task_list': 'parse_comments_task_list', + 'fire_path': 'comments', + }, + 'submissions': { + 'schema': SUBMISSION_SCHEMA, + 'parser': parse_submission, + 'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions", + 'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", + 'file_pattern': 'RS_20*.*', + 'task_list': 'parse_submissions_task_list', + 'fire_path': 'submissions', + }, +} + + +# --- shared workers ------------------------------------------------------- + +def _parse_dump(dump_type, partition, dumpdir=None, outdir=None): + config = TYPES[dump_type] + dumpdir = dumpdir or config['dumpdir'] + outdir = outdir or config['outdir'] + schema = config['schema'] + parser = config['parser'] + + N = 10000 + stream = open_fileset([os.path.join(dumpdir, partition)]) + rows = map(parser, stream) + + os.makedirs(outdir, exist_ok=True) + outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet") + + with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer: + while True: + chunk = list(islice(rows, N)) + if len(chunk) == 0: + break + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf, schema=schema) + writer.write_table(table) + + +def _gen_task_list(dump_type, dumpdir=None, tasklist=None): + config = TYPES[dump_type] + dumpdir = dumpdir or config['dumpdir'] + tasklist = tasklist or config['task_list'] + fire_path = config['fire_path'] + + files = list(find_dumps(dumpdir, base_pattern=config['file_pattern'])) + with open(tasklist, 'w') as of: + for fpath in files: + partition = os.path.split(fpath)[1] + of.write(f'python3 parquet_part1.py {fire_path} parse_dump {partition}\n') + + +# --- fire CLI ------------------------------------------------------------- + +class _Subcommand: + def __init__(self, dump_type): + self._dump_type = dump_type + + def parse_dump(self, partition, dumpdir=None, outdir=None): + _parse_dump(self._dump_type, partition, dumpdir=dumpdir, outdir=outdir) + + def gen_task_list(self, dumpdir=None, tasklist=None): + _gen_task_list(self._dump_type, dumpdir=dumpdir, tasklist=tasklist) + + +if __name__ == "__main__": + fire.Fire({'comments': _Subcommand('comments'), + 'submissions': _Subcommand('submissions')}) diff --git a/datasets/parquet_part2.py b/datasets/parquet_part2.py new file mode 100755 index 0000000..1182d80 --- /dev/null +++ b/datasets/parquet_part2.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +# Stage 2 of the dump-to-parquet pipeline: read the per-source parquet +# files produced by parquet_part1.py, sort by subreddit and by author +# (two passes), and write the final repartitioned parquet datasets. +# +# CLI: +# parquet_part2.py comments +# parquet_part2.py submissions +# +# This is a Spark job; launch via start_spark_and_run.sh. + +import fire +import pyspark +from pyspark.sql import functions as f +from pyspark.sql import SparkSession + + +TYPES = { + 'comments': { + 'input_dir': "/gscratch/comdata/output/temp/reddit_comments.parquet", + 'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", + 'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet", + 'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"], + 'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"], + 'app_name': "Reddit comments to parquet", + }, + 'submissions': { + 'input_dir': "/gscratch/comdata/output/temp/reddit_submissions.parquet", + 'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", + 'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet", + 'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"], + 'author_sort_keys': ["author", "CreatedAt", "id"], + 'app_name': "Reddit submissions to parquet", + }, +} + + +def sort_and_write(dump_type): + config = TYPES[dump_type] + + spark = SparkSession.builder.appName(config['app_name']).getOrCreate() + sc = spark.sparkContext + pyspark.SparkConf().set("spark.sql.shuffle.partitions", 2000) + pyspark.SparkConf().set('spark.sql.crossJoin.enabled', "true") + pyspark.SparkConf().set('spark.debug.maxToStringFields', 200) + + df = spark.read.parquet(config['input_dir'], compression='snappy') + + df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) + df = df.drop('subreddit') + df = df.withColumnRenamed('subreddit_2', 'subreddit') + + df = df.withColumnRenamed("created_utc", "CreatedAt") + df = df.withColumn("Month", f.month(f.col("CreatedAt"))) + df = df.withColumn("Year", f.year(f.col("CreatedAt"))) + df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt"))) + + # sort + repartition by subreddit + df_by_subreddit = df.repartition('subreddit') + df_by_subreddit = df_by_subreddit.sort(config['subreddit_sort_keys'], ascending=True) + df_by_subreddit = df_by_subreddit.sortWithinPartitions(config['subreddit_sort_keys'], ascending=True) + df_by_subreddit.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy') + + # sort + repartition by author + df_by_author = df.repartition('author') + df_by_author = df_by_author.sort(config['author_sort_keys'], ascending=True) + df_by_author = df_by_author.sortWithinPartitions(config['author_sort_keys'], ascending=True) + df_by_author.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy') + + +if __name__ == "__main__": + fire.Fire(sort_and_write) diff --git a/datasets/submissions_2_parquet.sh b/datasets/submissions_2_parquet.sh deleted file mode 100644 index f133069..0000000 --- a/datasets/submissions_2_parquet.sh +++ /dev/null @@ -1,9 +0,0 @@ -## this should be run manually since we don't have a nice way to wait on parallel_sql jobs - -#!/usr/bin/env bash - -./parse_submissions.sh - -start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py - - diff --git a/datasets/submissions_2_parquet_part1.py b/datasets/submissions_2_parquet_part1.py deleted file mode 100755 index 16d1988..0000000 --- a/datasets/submissions_2_parquet_part1.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python3 - -# two stages: -# 1. from gz to arrow parquet (this script) -# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) - -from datetime import datetime -from multiprocessing import Pool -from itertools import islice -from helper import find_dumps, open_fileset -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import simdjson -import fire -import os - -parser = simdjson.Parser() - -def parse_submission(post, names = None): - if names is None: - names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] - - try: - post = parser.parse(post) - except (ValueError) as e: - # print(e) - # print(post) - row = [None for _ in names] - row[-1] = "Error parsing json|{0}|{1}".format(e,post) - return tuple(row) - - row = [] - - for name in names: - if name == 'created_utc' or name == 'retrieved_on': - val = post.get(name,None) - if val is not None: - row.append(datetime.fromtimestamp(int(post[name]),tz=None)) - else: - row.append(None) - elif name == 'edited': - val = post[name] - if type(val) == bool: - row.append(val) - row.append(None) - else: - row.append(True) - row.append(datetime.fromtimestamp(int(val),tz=None)) - elif name == "time_edited": - continue - elif name == 'has_media': - row.append(post.get('media',None) is not None) - - elif name not in post: - row.append(None) - else: - row.append(post[name]) - return tuple(row) - -def parse_dump(partition): - - N=10000 - stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"]) - rows = map(parse_submission,stream) - schema = pa.schema([ - pa.field('id', pa.string(),nullable=True), - pa.field('author', pa.string(),nullable=True), - pa.field('subreddit', pa.string(),nullable=True), - pa.field('title', pa.string(),nullable=True), - pa.field('created_utc', pa.timestamp('ms'),nullable=True), - pa.field('permalink', pa.string(),nullable=True), - pa.field('url', pa.string(),nullable=True), - pa.field('domain', pa.string(),nullable=True), - pa.field('score', pa.int64(),nullable=True), - pa.field('ups', pa.int64(),nullable=True), - pa.field('downs', pa.int64(),nullable=True), - pa.field('over_18', pa.bool_(),nullable=True), - pa.field('has_media',pa.bool_(),nullable=True), - pa.field('selftext',pa.string(),nullable=True), - pa.field('retrieved_on', pa.timestamp('ms'),nullable=True), - pa.field('num_comments', pa.int64(),nullable=True), - pa.field('gilded',pa.int64(),nullable=True), - pa.field('edited',pa.bool_(),nullable=True), - pa.field('time_edited',pa.timestamp('ms'),nullable=True), - pa.field('subreddit_type',pa.string(),nullable=True), - pa.field('subreddit_id',pa.string(),nullable=True), - pa.field('subreddit_subscribers',pa.int64(),nullable=True), - pa.field('name',pa.string(),nullable=True), - pa.field('is_self',pa.bool_(),nullable=True), - pa.field('stickied',pa.bool_(),nullable=True), - pa.field('quarantine',pa.bool_(),nullable=True), - pa.field('error',pa.string(),nullable=True)]) - - if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"): - os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/") - - with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer: - while True: - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) - - writer.close() - -def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"): - files = list(find_dumps(dumpdir,base_pattern="RS_20*.*")) - with open("parse_submissions_task_list",'w') as of: - for fpath in files: - partition = os.path.split(fpath)[1] - of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n') - -if __name__ == "__main__": - fire.Fire({'parse_dump':parse_dump, - 'gen_task_list':gen_task_list}) diff --git a/datasets/submissions_2_parquet_part2.py b/datasets/submissions_2_parquet_part2.py deleted file mode 100644 index 3a58617..0000000 --- a/datasets/submissions_2_parquet_part2.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 - -# spark script to make sorted, and partitioned parquet files - -import pyspark -from pyspark.sql import functions as f -from pyspark.sql import SparkSession -import os - -spark = SparkSession.builder.getOrCreate() - -sc = spark.sparkContext - -conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet") -conf = conf.set("spark.sql.shuffle.partitions",2000) -conf = conf.set('spark.sql.crossJoin.enabled',"true") -conf = conf.set('spark.debug.maxToStringFields',200) -sqlContext = pyspark.SQLContext(sc) - -df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/") - -df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) -df = df.drop('subreddit') -df = df.withColumnRenamed('subreddit_2','subreddit') -df = df.withColumnRenamed("created_utc","CreatedAt") -df = df.withColumn("Month",f.month(f.col("CreatedAt"))) -df = df.withColumn("Year",f.year(f.col("CreatedAt"))) -df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt"))) -df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3]) - -# next we gotta resort it all. -df = df.repartition("subreddit") -df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True) -df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True) -df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') - - -# # we also want to have parquet files sorted by author then reddit. -df = df.repartition("author") -df3 = df.sort(["author","CreatedAt","id"],ascending=True) -df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True) -df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')