13
0

Build comments dataset similarly to submissions and improve partitioning scheme

This commit is contained in:
Nate E TeBlunthuis 2020-07-07 11:45:43 -07:00
parent fc6575a287
commit 40d4563770
8 changed files with 208 additions and 220 deletions

View File

@ -1,139 +0,0 @@
#!/usr/bin/env python3
import pyspark
from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
conf = SparkConf().setAppName("Reddit comments to parquet")
conf = conf.set('spark.sql.crossJoin.enabled',"true")
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
globstr = "/gscratch/comdata/raw_data/reddit_dumps/comments/RC_20*.bz2"
import re
import glob
import json
from subprocess import Popen, PIPE
from datetime import datetime
import pandas as pd
from multiprocessing import Pool
def open_fileset(globstr):
files = glob.glob(globstr)
for fh in files:
print(fh)
lines = open_input_file(fh)
for line in lines:
yield json.loads(line)
def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.bz', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk',input_filename]
try:
input_file = Popen(cmd, stdout=PIPE).stdout
except NameError:
input_file = open(input_filename, 'r')
return input_file
def include_row(comment, subreddits_to_track = []):
subreddit = comment['subreddit'].lower()
return subreddit in subreddits_to_track
def parse_comment(comment, names= None):
if names is None:
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
try:
comment = json.loads(comment)
except json.decoder.JSONDecodeError as e:
print(e)
print(comment)
row = [None for _ in names]
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment)
return tuple(row)
row = []
for name in names:
if name == 'created_utc':
row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None))
elif name == 'edited':
val = comment[name]
if type(val) == bool:
row.append(val)
row.append(None)
else:
row.append(True)
row.append(datetime.fromtimestamp(int(val),tz=None))
elif name == "time_edited":
continue
elif name not in comment:
row.append(None)
else:
row.append(comment[name])
return tuple(row)
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
sqlContext = pyspark.SQLContext(sc)
comments = sc.textFile(globstr)
schema = StructType().add("id", StringType(), True)
schema = schema.add("subreddit", StringType(), True)
schema = schema.add("link_id", StringType(), True)
schema = schema.add("parent_id", StringType(), True)
schema = schema.add("created_utc", TimestampType(), True)
schema = schema.add("author", StringType(), True)
schema = schema.add("ups", LongType(), True)
schema = schema.add("downs", LongType(), True)
schema = schema.add("score", LongType(), True)
schema = schema.add("edited", BooleanType(), True)
schema = schema.add("time_edited", TimestampType(), True)
schema = schema.add("subreddit_type", StringType(), True)
schema = schema.add("subreddit_id", StringType(), True)
schema = schema.add("stickied", BooleanType(), True)
schema = schema.add("is_submitter", BooleanType(), True)
schema = schema.add("body", StringType(), True)
schema = schema.add("error", StringType(), True)
rows = comments.map(lambda c: parse_comment(c, schema.fieldNames()))
#!/usr/bin/env python3
df = sqlContext.createDataFrame(rows, schema)
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2','subreddit')
df = df.withColumnRenamed("created_utc","CreatedAt")
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
# cache so we don't have to extract everythin twice
df = df.cache()
df2 = df.sort(["subreddit","author","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", partitionBy=["Year",'Month'],mode='overwrite')
df3 = df.sort(["author","CreatetdAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", partitionBy=["Year",'Month'],mode='overwrite')

9
comments_2_parquet.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
echo "!#/usr/bin/bash" > job_script.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 job_script.sh
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py

92
comments_2_parquet_part1.py Executable file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python3
import json
from datetime import datetime
from multiprocessing import Pool
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*"
def parse_comment(comment, names= None):
if names is None:
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
try:
comment = json.loads(comment)
except json.decoder.JSONDecodeError as e:
print(e)
print(comment)
row = [None for _ in names]
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment)
return tuple(row)
row = []
for name in names:
if name == 'created_utc':
row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None))
elif name == 'edited':
val = comment[name]
if type(val) == bool:
row.append(val)
row.append(None)
else:
row.append(True)
row.append(datetime.fromtimestamp(int(val),tz=None))
elif name == "time_edited":
continue
elif name not in comment:
row.append(None)
else:
row.append(comment[name])
return tuple(row)
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
files = list(find_dumps(dumpdir, base_pattern="RC_20*.*"))
pool = Pool(28)
stream = open_fileset(files)
N = 100000
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
schema = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('link_id', pa.string(), nullable=True),
pa.field('parent_id', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('is_submitter', pa.bool_(), nullable=True),
pa.field('body', pa.string(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
writer.close()

29
comments_2_parquet_part2.py Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
# spark script to make sorted, and partitioned parquet files
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2")
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2','subreddit')
df = df.withColumnRenamed("created_utc","CreatedAt")
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite')

57
helper.py Normal file
View File

@ -0,0 +1,57 @@
from subprocess import Popen, PIPE
import re
from collections import defaultdict
from os import path
import glob
def find_dumps(dumpdir, base_pattern):
files = glob.glob(path.join(dumpdir,base_pattern))
# build a dictionary of possible extensions for each dump
dumpext = defaultdict(list)
for fpath in files:
fname, ext = path.splitext(fpath)
dumpext[fname].append(ext)
ext_priority = ['.zst','.xz','.bz2']
for base, exts in dumpext.items():
found = False
if len(exts) == 1:
yield base + exts[0]
found = True
else:
for ext in ext_priority:
if ext in exts:
yield base + ext
found = True
assert(found == True)
def open_fileset(files):
for fh in files:
print(fh)
lines = open_input_file(fh)
for line in lines:
yield line
def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.bz', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename]
try:
input_file = Popen(cmd, stdout=PIPE).stdout
except NameError as e:
print(e)
input_file = open(input_filename, 'r')
return input_file

View File

@ -1,8 +1,10 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# part2 should be run on one ore more spark nodes echo "!#/usr/bin/bash" > job_script.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/submissions_2_parquet_part1.py" >> job_script.sh
./submissions_2_parquet_part1.py srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 job_script.sh
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@ -4,75 +4,14 @@
# 1. from gz to arrow parquet (this script) # 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
from collections import defaultdict
from os import path
import glob
import json import json
import re
from datetime import datetime from datetime import datetime
from subprocess import Popen, PIPE from multiprocessing import Pool
from multiprocessing import Pool, SimpleQueue from itertools import islice
from helper import find_dumps, open_fileset
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions" import pandas as pd
import pyarrow as pa
def find_json_files(dumpdir): import pyarrow.parquet as pq
base_pattern = "RS_20*.*"
files = glob.glob(path.join(dumpdir,base_pattern))
# build a dictionary of possible extensions for each dump
dumpext = defaultdict(list)
for fpath in files:
fname, ext = path.splitext(fpath)
dumpext[fname].append(ext)
ext_priority = ['.zst','.xz','.bz2']
for base, exts in dumpext.items():
found = False
if len(exts) == 1:
yield base + exts[0]
found = True
else:
for ext in ext_priority:
if ext in exts:
yield base + ext
found = True
assert(found == True)
files = list(find_json_files(dumpdir))
def read_file(fh):
lines = open_input_file(fh)
for line in lines:
yield line
def open_fileset(files):
for fh in files:
print(fh)
lines = open_input_file(fh)
for line in lines:
yield line
def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*']
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.bz', input_filename):
cmd = ["bzcat", "-dk", input_filename]
elif re.match(r'.*\.xz', input_filename):
cmd = ["xzcat",'-dk', '-T 20',input_filename]
elif re.match(r'.*\.zst',input_filename):
cmd = ['zstd','-dck', input_filename]
try:
input_file = Popen(cmd, stdout=PIPE).stdout
except NameError as e:
print(e)
input_file = open(input_filename, 'r')
return input_file
def parse_submission(post, names = None): def parse_submission(post, names = None):
@ -116,6 +55,10 @@ def parse_submission(post, names = None):
row.append(post[name]) row.append(post[name])
return tuple(row) return tuple(row)
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
files = list(find_dumps(dumpdir))
pool = Pool(28) pool = Pool(28)
stream = open_fileset(files) stream = open_fileset(files)
@ -124,11 +67,6 @@ N = 100000
rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28)) rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28))
from itertools import islice
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
schema = pa.schema([ schema = pa.schema([
pa.field('id', pa.string(),nullable=True), pa.field('id', pa.string(),nullable=True),
pa.field('author', pa.string(),nullable=True), pa.field('author', pa.string(),nullable=True),

View File

@ -2,12 +2,8 @@
# spark script to make sorted, and partitioned parquet files # spark script to make sorted, and partitioned parquet files
import pyspark
from pyspark.sql import functions as f from pyspark.sql import functions as f
from pyspark.sql.types import * from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
import os
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
@ -31,12 +27,16 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3]) df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
# next we gotta resort it all. # next we gotta resort it all.
df2 = df.sort(["subreddit","author","id","Year","Month","Day"],ascending=True) df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", partitionBy=["Year",'Month'], mode='overwrite') df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet", partitionBy=["Year",'Month'], mode='overwrite')
# # we also want to have parquet files sorted by author then reddit. # # we also want to have parquet files sorted by author then reddit.
df3 = df.sort(["author","CreatedAt","subreddit","id","Year","Month","Day"],ascending=True) df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet", partitionBy=["Year",'Month'], mode='overwrite') df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet", partitionBy=["Year",'Month'], mode='overwrite')
os.remove("/gscratch/comdata/output/reddit_submissions.parquet_temp") os.remove("/gscratch/comdata/output/reddit_submissions.parquet_temp")