git-annex in
This commit is contained in:
@@ -1,26 +0,0 @@
|
||||
#!/bin/bash
|
||||
## parallel_sql_job.sh
|
||||
#SBATCH --job-name=tf_subreddit_comments
|
||||
## Allocation Definition
|
||||
#SBATCH --account=comdata-ckpt
|
||||
#SBATCH --partition=ckpt
|
||||
## Resources
|
||||
## Nodes. This should always be 1 for parallel-sql.
|
||||
#SBATCH --nodes=1
|
||||
## Walltime (12 hours)
|
||||
#SBATCH --time=12:00:00
|
||||
## Memory per node
|
||||
#SBATCH --mem=32G
|
||||
#SBATCH --cpus-per-task=4
|
||||
#SBATCH --ntasks=1
|
||||
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
|
||||
source ./bin/activate
|
||||
module load parallel_sql
|
||||
echo $(which perl)
|
||||
conda list pyarrow
|
||||
which python3
|
||||
#Put here commands to load other modules (e.g. matlab etc.)
|
||||
#Below command means that parallel_sql will get tasks from the database
|
||||
#and run them on the node (in parallel). So a 16 core node will have
|
||||
#16 tasks running at one time.
|
||||
parallel-sql --sql -a parallel --exit-on-term --jobs 4
|
||||
@@ -1,10 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
|
||||
|
||||
#!/usr/bin/env bash
|
||||
echo "#!/usr/bin/bash" > job_script.sh
|
||||
#echo "source $(pwd)/../bin/activate" >> job_script.sh
|
||||
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
|
||||
|
||||
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
|
||||
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 --pty job_script.sh
|
||||
|
||||
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool
|
||||
from itertools import islice
|
||||
from helper import find_dumps, open_fileset
|
||||
from helper import open_input_file, find_dumps
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
from pathlib import Path
|
||||
import fire
|
||||
|
||||
def parse_comment(comment, names= None):
|
||||
if names is None:
|
||||
@@ -46,70 +49,63 @@ def parse_comment(comment, names= None):
|
||||
|
||||
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
|
||||
|
||||
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
|
||||
def parse_dump(partition):
|
||||
|
||||
files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
|
||||
dumpdir = f"/gscratch/comdata/raw_data/reddit_dumps/comments/{partition}"
|
||||
|
||||
pool = Pool(28)
|
||||
stream = open_input_file(dumpdir)
|
||||
rows = map(parse_comment, stream)
|
||||
|
||||
stream = open_fileset(files)
|
||||
schema = pa.schema([
|
||||
pa.field('id', pa.string(), nullable=True),
|
||||
pa.field('subreddit', pa.string(), nullable=True),
|
||||
pa.field('link_id', pa.string(), nullable=True),
|
||||
pa.field('parent_id', pa.string(), nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('author', pa.string(), nullable=True),
|
||||
pa.field('ups', pa.int64(), nullable=True),
|
||||
pa.field('downs', pa.int64(), nullable=True),
|
||||
pa.field('score', pa.int64(), nullable=True),
|
||||
pa.field('edited', pa.bool_(), nullable=True),
|
||||
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('subreddit_type', pa.string(), nullable=True),
|
||||
pa.field('subreddit_id', pa.string(), nullable=True),
|
||||
pa.field('stickied', pa.bool_(), nullable=True),
|
||||
pa.field('is_submitter', pa.bool_(), nullable=True),
|
||||
pa.field('body', pa.string(), nullable=True),
|
||||
pa.field('error', pa.string(), nullable=True),
|
||||
])
|
||||
|
||||
N = int(1e4)
|
||||
p = Path("/gscratch/comdata/output/temp/reddit_comments.parquet")
|
||||
p.mkdir(exist_ok=True,parents=True)
|
||||
|
||||
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
|
||||
N=10000
|
||||
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet",
|
||||
schema=schema,
|
||||
compression='snappy',
|
||||
flavor='spark') as writer:
|
||||
|
||||
schema = pa.schema([
|
||||
pa.field('id', pa.string(), nullable=True),
|
||||
pa.field('subreddit', pa.string(), nullable=True),
|
||||
pa.field('link_id', pa.string(), nullable=True),
|
||||
pa.field('parent_id', pa.string(), nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('author', pa.string(), nullable=True),
|
||||
pa.field('ups', pa.int64(), nullable=True),
|
||||
pa.field('downs', pa.int64(), nullable=True),
|
||||
pa.field('score', pa.int64(), nullable=True),
|
||||
pa.field('edited', pa.bool_(), nullable=True),
|
||||
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('subreddit_type', pa.string(), nullable=True),
|
||||
pa.field('subreddit_id', pa.string(), nullable=True),
|
||||
pa.field('stickied', pa.bool_(), nullable=True),
|
||||
pa.field('is_submitter', pa.bool_(), nullable=True),
|
||||
pa.field('body', pa.string(), nullable=True),
|
||||
pa.field('error', pa.string(), nullable=True),
|
||||
])
|
||||
while True:
|
||||
chunk = islice(rows,N)
|
||||
pddf = pd.DataFrame(chunk, columns=schema.names)
|
||||
table = pa.Table.from_pandas(pddf,schema=schema)
|
||||
if table.shape[0] == 0:
|
||||
break
|
||||
writer.write_table(table)
|
||||
|
||||
from pathlib import Path
|
||||
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
|
||||
|
||||
if not p.is_dir():
|
||||
if p.exists():
|
||||
p.unlink()
|
||||
p.mkdir()
|
||||
|
||||
else:
|
||||
list(map(Path.unlink,p.glob('*')))
|
||||
|
||||
part_size = int(1e7)
|
||||
part = 1
|
||||
n_output = 0
|
||||
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
|
||||
|
||||
while True:
|
||||
if n_output > part_size:
|
||||
if part > 1:
|
||||
writer.close()
|
||||
|
||||
part = part + 1
|
||||
n_output = 0
|
||||
|
||||
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
|
||||
|
||||
n_output += N
|
||||
chunk = islice(rows,N)
|
||||
pddf = pd.DataFrame(chunk, columns=schema.names)
|
||||
table = pa.Table.from_pandas(pddf,schema=schema)
|
||||
if table.shape[0] == 0:
|
||||
break
|
||||
writer.write_table(table)
|
||||
writer.close()
|
||||
|
||||
|
||||
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/comments", overwrite=True):
|
||||
files = list(find_dumps(dumpdir,base_pattern="RC_20*.*"))
|
||||
with open("comments_task_list.sh",'w') as of:
|
||||
for fpath in files:
|
||||
partition = os.path.split(fpath)[1]
|
||||
if (not Path(f"/gscratch/comdata/output/temp/reddit_comments.parquet/{partition}.parquet").exists()) or (overwrite is True):
|
||||
of.write(f'python3 comments_2_parquet_part1.py parse_dump {partition}\n')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
fire.Fire({'parse_dump':parse_dump,
|
||||
'gen_task_list':gen_task_list})
|
||||
|
||||
|
||||
@@ -2,12 +2,19 @@
|
||||
|
||||
# spark script to make sorted, and partitioned parquet files
|
||||
|
||||
import pyspark
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
|
||||
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
|
||||
conf = conf.set("spark.sql.shuffle.partitions",2000)
|
||||
conf = conf.set('spark.sql.crossJoin.enabled',"true")
|
||||
conf = conf.set('spark.debug.maxToStringFields',200)
|
||||
sc = spark.sparkContext
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_comments.parquet",compression='snappy')
|
||||
|
||||
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
|
||||
df = df.drop('subreddit')
|
||||
@@ -21,9 +28,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
|
||||
df = df.repartition('subreddit')
|
||||
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
|
||||
df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
|
||||
|
||||
df = df.repartition('author')
|
||||
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
|
||||
df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
|
||||
|
||||
@@ -24,8 +24,7 @@ def open_fileset(files):
|
||||
for fh in files:
|
||||
print(fh)
|
||||
lines = open_input_file(fh)
|
||||
for line in lines:
|
||||
yield line
|
||||
yield from lines
|
||||
|
||||
def open_input_file(input_filename):
|
||||
if re.match(r'.*\.7z$', input_filename):
|
||||
@@ -39,7 +38,7 @@ def open_input_file(input_filename):
|
||||
elif re.match(r'.*\.xz', input_filename):
|
||||
cmd = ["xzcat",'-dk', '-T 20',input_filename]
|
||||
elif re.match(r'.*\.zst',input_filename):
|
||||
cmd = ['zstd','-dck', input_filename]
|
||||
cmd = ['/kloneusr/bin/zstd','-dck', input_filename, '--memory=2048MB --stdout']
|
||||
elif re.match(r'.*\.gz',input_filename):
|
||||
cmd = ['gzip','-dc', input_filename]
|
||||
try:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/bash
|
||||
start_spark_cluster.sh
|
||||
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
|
||||
stop-all.sh
|
||||
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py
|
||||
singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh
|
||||
|
||||
4
datasets/submissions_2_parquet.sh
Normal file → Executable file
4
datasets/submissions_2_parquet.sh
Normal file → Executable file
@@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
|
||||
|
||||
#!/usr/bin/env bash
|
||||
|
||||
./parse_submissions.sh
|
||||
srun -p compute-bigmem -A comdata --nodes=1 --mem-per-cpu=9g -c 40 --time=120:00:00 python3 $(pwd)/submissions_2_parquet_part1.py gen_task_list
|
||||
|
||||
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py
|
||||
|
||||
|
||||
@@ -3,26 +3,23 @@
|
||||
# two stages:
|
||||
# 1. from gz to arrow parquet (this script)
|
||||
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
|
||||
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool
|
||||
from pathlib import Path
|
||||
from itertools import islice
|
||||
from helper import find_dumps, open_fileset
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import simdjson
|
||||
import fire
|
||||
import os
|
||||
|
||||
parser = simdjson.Parser()
|
||||
import json
|
||||
|
||||
def parse_submission(post, names = None):
|
||||
if names is None:
|
||||
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
|
||||
|
||||
try:
|
||||
post = parser.parse(post)
|
||||
post = json.loads(post)
|
||||
except (ValueError) as e:
|
||||
# print(e)
|
||||
# print(post)
|
||||
@@ -92,8 +89,7 @@ def parse_dump(partition):
|
||||
pa.field('quarantine',pa.bool_(),nullable=True),
|
||||
pa.field('error',pa.string(),nullable=True)])
|
||||
|
||||
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
|
||||
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
|
||||
Path("/gscratch/comdata/output/temp/reddit_submissions.parquet/").mkdir(exist_ok=True,parents=True)
|
||||
|
||||
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
|
||||
while True:
|
||||
@@ -108,7 +104,7 @@ def parse_dump(partition):
|
||||
|
||||
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
|
||||
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
|
||||
with open("parse_submissions_task_list",'w') as of:
|
||||
with open("submissions_task_list.sh",'w') as of:
|
||||
for fpath in files:
|
||||
partition = os.path.split(fpath)[1]
|
||||
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
|
||||
|
||||
Reference in New Issue
Block a user