13
0

git-annex in nathante@mox2.hyak.local:/gscratch/comdata/users/nathante/cdsc-reddit

This commit is contained in:
Nate E TeBlunthuis 2020-11-11 16:39:44 -08:00
parent 4447c60265
commit 6baa08889b
10 changed files with 76 additions and 42 deletions

View File

@ -13,7 +13,7 @@ spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf() conf = spark.sparkContext.getConf()
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0; # outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True): def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
''' '''
Compute similarities between subreddits based on tfi-idf vectors of author comments Compute similarities between subreddits based on tfi-idf vectors of author comments
@ -32,9 +32,8 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
''' '''
print(outfile) print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet_test1/part-00000-107cee94-92d8-4265-b804-40f1e7f1aaf2-c000.snappy.parquet') tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
if included_subreddits is None: if included_subreddits is None:
included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN)) included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
@ -55,12 +54,14 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
sim_dist = sim_dist.repartition(1) sim_dist = sim_dist.repartition(1)
sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy') sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
spark.stop()
#instead of toLocalMatrix() why not read as entries and put strait into numpy #instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet) sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
spark.stop()
df['subreddit_id_new'] = df['subreddit_id_new'] - 1 df['subreddit_id_new'] = df['subreddit_id_new'] - 1
df = df.sort_values('subreddit_id_new').reset_index(drop=True) df = df.sort_values('subreddit_id_new').reset_index(drop=True)
df = df.set_index('subreddit_id_new') df = df.set_index('subreddit_id_new')
@ -75,4 +76,4 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
return similarities return similarities
if __name__ == '__main__': if __name__ == '__main__':
fire.Fire(term_cosine_similarities) fire.Fire(author_cosine_similarities)

View File

@ -13,8 +13,12 @@
#SBATCH --mem=32G #SBATCH --mem=32G
#SBATCH --cpus-per-task=4 #SBATCH --cpus-per-task=4
#SBATCH --ntasks=1 #SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
source ./bin/activate
module load parallel_sql module load parallel_sql
echo $(which perl)
conda list pyarrow
which python3
#Put here commands to load other modules (e.g. matlab etc.) #Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database #Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have #and run them on the node (in parallel). So a 16 core node will have

View File

@ -2,7 +2,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh echo "#!/usr/bin/bash" > job_script.sh
echo "source $(pwd)/../bin/activate" >> job_script.sh #echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh

View File

@ -8,8 +8,6 @@ import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*"
def parse_comment(comment, names= None): def parse_comment(comment, names= None):
if names is None: if names is None:
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"] names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
@ -48,15 +46,15 @@ def parse_comment(comment, names= None):
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')]) # conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
files = list(find_dumps(dumpdir, base_pattern="RC_20*.*")) files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
pool = Pool(28) pool = Pool(28)
stream = open_fileset(files) stream = open_fileset(files)
N = 100000 N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28)) rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
@ -80,13 +78,38 @@ schema = pa.schema([
pa.field('error', pa.string(), nullable=True), pa.field('error', pa.string(), nullable=True),
]) ])
with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer: from pathlib import Path
while True: p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names) if not p.is_dir():
table = pa.Table.from_pandas(pddf,schema=schema) if p.exists():
if table.shape[0] == 0: p.unlink()
break p.mkdir()
writer.write_table(table)
else:
list(map(Path.unlink,p.glob('*')))
part_size = int(1e7)
part = 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
while True:
if n_output > part_size:
if part > 1:
writer.close()
part = part + 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
n_output += N
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
writer.close()

View File

@ -7,7 +7,7 @@ from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2") df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit') df = df.drop('subreddit')
@ -21,9 +21,9 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit') df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True) df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy') df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
df = df.repartition('author') df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')

View File

@ -14,7 +14,7 @@ def find_dumps(dumpdir, base_pattern):
fname, ext = path.splitext(fpath) fname, ext = path.splitext(fpath)
dumpext[fname].append(ext) dumpext[fname].append(ext)
ext_priority = ['.zst','.xz','.bz2'] ext_priority = ['.zst','.xz','.bz2','.gz']
for base, exts in dumpext.items(): for base, exts in dumpext.items():
ext = [ext for ext in ext_priority if ext in exts][0] ext = [ext for ext in ext_priority if ext in exts][0]

View File

@ -8,7 +8,7 @@ import pandas as pd
import fire import fire
from itertools import islice from itertools import islice
from pathlib import Path from pathlib import Path
from similarities_helper import build_cosine_similarities from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf() conf = spark.sparkContext.getConf()
@ -57,12 +57,11 @@ https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get
sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy') sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
spark.stop()
#instead of toLocalMatrix() why not read as entries and put strait into numpy #instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet) sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas() df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
spark.stop()
df['subreddit_id_new'] = df['subreddit_id_new'] - 1 df['subreddit_id_new'] = df['subreddit_id_new'] - 1
df = df.sort_values('subreddit_id_new').reset_index(drop=True) df = df.sort_values('subreddit_id_new').reset_index(drop=True)
df = df.set_index('subreddit_id_new') df = df.set_index('subreddit_id_new')

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.dataset as ds import pyarrow.dataset as ds
import pyarrow.parquet as pq import pyarrow.parquet as pq
from itertools import groupby, islice, chain from itertools import groupby, islice, chain
import fire import fire
from collections import Counter from collections import Counter
import pandas as pd
import os import os
import datetime import datetime
import re import re
@ -22,7 +22,6 @@ urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-
# compute term frequencies for comments in each subreddit by week # compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'): def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"): if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
@ -31,8 +30,9 @@ def weekly_tf(partition, mwe_pass = 'first'):
ngram_output = partition.replace("parquet","txt") ngram_output = partition.replace("parquet","txt")
if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"): if mwe_pass == 'first':
os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}") if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@ -167,14 +167,20 @@ def weekly_tf(partition, mwe_pass = 'first'):
pddf = pddf.loc[pddf.is_token == True, schema.names] pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns') author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names] author_pddf = author_pddf.loc[:,author_schema.names]
table = pa.Table.from_pandas(pddf,schema=schema) table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema) author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
if table.shape[0] == 0: do_break = True
if table.shape[0] != 0:
writer.write_table(table)
do_break = False
if author_table.shape[0] != 0:
author_writer.write_table(author_table)
do_break = False
if do_break:
break break
writer.write_table(table)
author_writer.write_table(author_table)
writer.close() writer.close()
author_writer.close() author_writer.close()

View File

@ -1,19 +1,19 @@
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from similarities_helper import build_tfidf_dataset from similarities_helper import build_tfidf_dataset
## TODO:need to exclude automoderator / bot posts.
## TODO:need to exclude better handle hyperlinks.
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet") df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt")) include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
include_subs = {s.strip('\n') for s in include_subs} include_subs = {s.strip('\n') for s in include_subs}
# remove [deleted] and AutoModerator (TODO remove other bots)
df = df.filter(df.author != '[deleted]') df = df.filter(df.author != '[deleted]')
df = df.filter(df.author != 'AutoModerator') df = df.filter(df.author != 'AutoModerator')
df = build_tfidf_dataset(df, include_subs, 'author') df = build_tfidf_dataset(df, include_subs, 'author')
df.cache()
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy') df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
spark.stop()

View File

@ -15,3 +15,4 @@ include_subs = {s.strip('\n') for s in include_subs}
df = build_tfidf_dataset(df, include_subs, 'term') df = build_tfidf_dataset(df, include_subs, 'term')
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy') df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
spark.stop()