From c3d2834110f2564ae78422257a921fca6ba1fae3 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Fri, 6 Dec 2024 08:09:02 -0800 Subject: [PATCH] use pyarrow instead of spark to write data --- ngrams/top_comment_phrases.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/ngrams/top_comment_phrases.py b/ngrams/top_comment_phrases.py index d6807a4..61ba52e 100755 --- a/ngrams/top_comment_phrases.py +++ b/ngrams/top_comment_phrases.py @@ -4,7 +4,7 @@ from pyspark.sql import Window from pyspark.sql import SparkSession import numpy as np -spark = SparkSession.builder.config(map={'spark.executor.memory':'900g','spark.executor.cores':128}).getOrCreate() +spark = SparkSession.builder.config(map={'spark.executor.memory':'900g','spark.executor.cores':128,'spark.sql.execution.arrow.pyspark.enabled':False}).getOrCreate() df = spark.read.text("/gscratch/comdata/output/reddit_ngrams/reddit_comment_ngrams_10p_sample/") df2 = spark.read.text("/gscratch/comdata/output/reddit_ngrams/reddit_post_ngrams_10p_sample/") df = df.union(df2) @@ -50,12 +50,11 @@ df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_ df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none') -df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet") -df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI') +import pyarrow.parquet as pq +import pyarrow.feather as feather +from pyarrow import csv -# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions. -# -df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3) -df = df.toPandas() -df.to_feather("/gscratch/comdata/output/reddit_ngrams/reddit_multiword_expressions.feather") -df.to_csv("/gscratch/comdata/output/reddit_ngrams/reddit_multiword_expressions.csv") +table = pq.read_table("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet", filters = [[('phraseCount','>', 3500),('phrasePWMI','>',3)]], columns=['phrase','phraseCount','phraseLogProb','phrasePWMI']) + +feather.write_feather(table,"/gscratch/comdata/output/reddit_ngrams/reddit_multiword_expressions.feather") +csv.write_csv(table,"/gscratch/comdata/output/reddit_ngrams/reddit_multiword_expressions.csv")