diff --git a/ngrams/term_frequencies.py b/ngrams/term_frequencies.py index 4bf5497..ef2404f 100755 --- a/ngrams/term_frequencies.py +++ b/ngrams/term_frequencies.py @@ -100,8 +100,11 @@ def tf_posts(subreddit_weeks, mwe_pass, mwe_tokenize, stopWords): tfs = Counter([]) authors = Counter([]) for post in posts: - tokens = my_tokenizer(post.title, mwe_pass, mwe_tokenize, stopWords) - tfs.update(tokens) + title_tokens = my_tokenizer(post.title, mwe_pass, mwe_tokenize, stopWords) + tfs.update(title_tokens) + if post.body is not None and post.body != "": + body_tokens = my_tokenizer(post.body, mwe_pass, mwe_tokenize, stopWords) + tfs.update(body_tokens) authors.update([post.author]) for term, tf in tfs.items(): @@ -144,7 +147,7 @@ def weekly_tf(partition, if reddit_dataset == 'comments': batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) if reddit_dataset == 'posts': - batches = dataset.to_batches(columns=['CreatedAt','subreddit','title','author']) + batches = dataset.to_batches(columns=['CreatedAt','subreddit','title','body','author']) schema = pa.schema([pa.field('subreddit', pa.string(), nullable=nullable_schema), pa.field('term', pa.string(), nullable=nullable_schema), diff --git a/ngrams/top_comment_phrases.py b/ngrams/top_comment_phrases.py old mode 100644 new mode 100755 index 031cba5..2884eec --- a/ngrams/top_comment_phrases.py +++ b/ngrams/top_comment_phrases.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 from pyspark.sql import functions as f from pyspark.sql import Window from pyspark.sql import SparkSession @@ -5,6 +6,8 @@ import numpy as np spark = SparkSession.builder.getOrCreate() df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") +df2 = spark.read.text("/gscratch/comdata/users/nathante/reddit_post_ngrams_10p_sample/") +df = df.union(df2) df = df.withColumnRenamed("value","phrase")