1
0

changes for archiving.

This commit is contained in:
2023-05-23 17:18:19 -07:00
parent 811a0d87c4
commit 07b0dff9bc
47 changed files with 396 additions and 1549 deletions

25
ngrams/Makefile Normal file
View File

@@ -0,0 +1,25 @@
outputdir=../../data/reddit_ngrams/
inputdir=../../data/reddit_comments_by_subreddit.parquet
authors_tfdir=${outputdir}/comment_authors.parquet
srun=sbatch --wait --verbose run_job.sbatch
all: ${outputdir}/comment_authors_sorted.parquet/_SUCCESS
tf_task_list_1: tf_comments.py
${srun} bash -c "python3 tf_comments.py gen_task_list --mwe_pass='first' --outputdir=${outputdir} --tf_task_list=$@ --inputdir=${inputdir}"
${outputdir}/comment_terms.parquet:tf_task_list_1
mkdir -p sbatch_log
sbatch --wait --verbose --array=1-$(shell cat $< | wc -l) run_array.sbatch 0 $<
${outputdir}/comment_authors.parquet:${outputdir}/comment_terms.parquet
-
${outputdir}/comment_authors_sorted.parquet:${outputdir}/comment_authors.parquet sort_tf_comments.py
../start_spark_and_run.sh 3 sort_tf_comments.py --inparquet=$< --outparquet=$@ --colname=author
${outputdir}/comment_authors_sorted.parquet/_SUCCESS:${outputdir}/comment_authors_sorted.parquet
${inputdir}:
$(MAKE) -C ../datasets

19
ngrams/run_array.sbatch Executable file
View File

@@ -0,0 +1,19 @@
#!/bin/bash
#SBATCH --job-name=reddit_comment_term_frequencies
#SBATCH --account=comdata
#SBATCH --partition=compute-bigmem
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=9g
#SBATCH --ntasks=1
#SBATCH --export=ALL
#SBATCH --time=48:00:00
#SBATCH --chdir=/gscratch/comdata/users/nathante/partitioning_reddit/dataverse/cdsc_reddit/ngrams
#SBATCH --error="sbatch_log/%A_%a.out"
#SBATCH --output="sbatch_log/%A_%a.out"
TASK_NUM=$(($SLURM_ARRAY_TASK_ID + $1))
TASK_CALL=$(sed -n ${TASK_NUM}p $2)
${TASK_CALL}

18
ngrams/run_job.sbatch Normal file
View File

@@ -0,0 +1,18 @@
#!/bin/bash
#SBATCH --job-name="simulate measurement error models"
## Allocation Definition
#SBATCH --account=comdata
#SBATCH --partition=compute-bigmem
## Resources
#SBATCH --nodes=1
## Walltime (4 hours)
#SBATCH --time=4:00:00
## Memory per node
#SBATCH --mem=4G
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=1
#SBATCH --chdir /gscratch/comdata/users/nathante/partitioning_reddit/dataverse/cdsc_reddit/ngrams/
#SBATCH --output=sbatch_log/%A_%a.out
#SBATCH --error=sbatch_log/%A_%a.err
echo "$@"
"$@"

View File

@@ -3,6 +3,7 @@ import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.compute as pc
from itertools import groupby, islice, chain
import fire
from collections import Counter
@@ -15,11 +16,12 @@ import string
from random import random
from redditcleaner import clean
from pathlib import Path
from datetime import datetime
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', input_dir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", mwe_pass = 'first', excluded_users=None):
dataset = ds.dataset(Path(input_dir)/partition, format='parquet')
dataset = ds.dataset(Path(inputdir)/partition, format='parquet')
outputdir = Path(outputdir)
samppath = outputdir / "reddit_comment_ngrams_10p_sample"
@@ -37,7 +39,8 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
if mwe_pass == 'first':
if ngram_path.exists():
ngram_path.unlink()
dataset = dataset.filter(pc.field("CreatedAt") <= pa.scalar(datetime(2020,4,13)))
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@@ -160,9 +163,9 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
outchunksize = 10000
termtf_outputdir = (outputdir / "comment_terms")
termtf_outputdir = (outputdir / "comment_terms.parquet")
termtf_outputdir.mkdir(parents=True, exist_ok=True)
authortf_outputdir = (outputdir / "comment_authors")
authortf_outputdir = (outputdir / "comment_authors.parquet")
authortf_outputdir.mkdir(parents=True, exist_ok=True)
termtf_path = termtf_outputdir / partition
authortf_path = authortf_outputdir / partition
@@ -196,12 +199,12 @@ def weekly_tf(partition, outputdir = '/gscratch/comdata/output/reddit_ngrams/',
author_writer.close()
def gen_task_list(mwe_pass='first', outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
def gen_task_list(mwe_pass='first', inputdir="/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/", outputdir='/gscratch/comdata/output/reddit_ngrams/', tf_task_list='tf_task_list', excluded_users_file=None):
files = os.listdir(inputdir)
with open(tf_task_list,'w') as outfile:
for f in files:
if f.endswith(".parquet"):
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} --inputdir {inputdir} --outputdir {outputdir} --excluded_users {excluded_users_file} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,

View File

@@ -1,69 +0,0 @@
#!/usr/bin/env python3
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql import SparkSession
import numpy as np
import fire
from pathlib import Path
def main(ngram_dir="/gscratch/comdata/output/reddit_ngrams"):
spark = SparkSession.builder.getOrCreate()
ngram_dir = Path(ngram_dir)
ngram_sample = ngram_dir / "reddit_comment_ngrams_10p_sample"
df = spark.read.text(str(ngram_sample))
df = df.withColumnRenamed("value","phrase")
# count phrase occurrances
phrases = df.groupby('phrase').count()
phrases = phrases.withColumnRenamed('count','phraseCount')
phrases = phrases.filter(phrases.phraseCount > 10)
# count overall
N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount
print(f'analyzing PMI on a sample of {N} phrases')
logN = np.log(N)
phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN)
# count term occurrances
phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' '))
terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')])
win = Window.partitionBy('term')
terms = terms.withColumn('termCount',f.sum('phraseCount').over(win))
terms = terms.withColumnRenamed('count','termCount')
terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN)
terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb')
terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb')
terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb'))
# join phrases to term counts
df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI'])
df = df.sort(['phrasePWMI'],descending=True)
df = df.sortWithinPartitions(['phrasePWMI'],descending=True)
pwmi_dir = ngram_dir / "reddit_comment_ngrams_pwmi.parquet/"
df.write.parquet(str(pwmi_dir), mode='overwrite', compression='snappy')
df = spark.read.parquet(str(pwmi_dir))
df.write.csv(str(ngram_dir / "reddit_comment_ngrams_pwmi.csv/"),mode='overwrite',compression='none')
df = spark.read.parquet(str(pwmi_dir))
df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI')
# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions.
#
df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3)
df = df.toPandas()
df.to_feather(ngram_dir / "multiword_expressions.feather")
df.to_csv(ngram_dir / "multiword_expressions.csv")
if __name__ == '__main__':
fire.Fire(main)