Compare commits
3 Commits
53f5b8c03c
...
33150243cd
| Author | SHA1 | Date | |
|---|---|---|---|
| 33150243cd | |||
| 8965a251b6 | |||
| d201930951 |
246
README.md
246
README.md
@@ -2,51 +2,111 @@
|
||||
title: Utilities for Reddit Data Science
|
||||
---
|
||||
|
||||
`cdsc_reddit` is a collection of tools for working with Reddit data on the
|
||||
Hyak super computing system at the University of Washington. It is built
|
||||
around [PySpark](https://spark.apache.org/docs/latest/api/python/index.html)
|
||||
and [pyarrow](https://arrow.apache.org/docs/python/) so that the underlying
|
||||
pipelines scale to the full Pushshift archive.
|
||||
|
||||
The reddit_cdsc project contains tools for working with Reddit data. The project is designed for the hyak super computing system at The University of Washington. It consists of a set of python and bash scripts and uses the [Pyspark](https://spark.apache.org/docs/latest/api/python/index.html "Pyspark documentation") and [pyarrow](https://arrow.apache.org/docs/python/ "documentation of python arrow bindings") to process large datasets. As of November 1st 2020, the project is under active development by [Nate TeBlunthuis](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Washington.29 "Nate's profile on the Community Data Science Collective Wiki") and provides scripts for:
|
||||
The project was originally developed by [Nate
|
||||
TeBlunthuis](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Texas_at_Austin.29)
|
||||
and is now maintained by a rotating set of researchers in the Community
|
||||
Data Science Collective, including Benjamin Mako Hill, Madelyn Douglas, and
|
||||
others.
|
||||
|
||||
- Pulling and updating dumps from [Pushshift](https://pushshift.io "Pushshift.io") in `pull_pushshift_comments.sh` and `pull_pushshift_submissions.sh`.
|
||||
- Uncompressing and parsing the dumps into [Parquet](https://parquet.apache.org/ "apahce parquet website") [datasets](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets "Wikilink to documentation on the Reddit parquet datasets").
|
||||
- Running text analysis based on [TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf "Wikipedia article on tf-idf") including
|
||||
- Extracting terms from Reddit comments in `tf_comments.py`
|
||||
- Detecting common phrases based on [Pointwise mutual information](https://en.wikipedia.org/wiki/Pointwise_mutual_information) "Wikipedia article on pointwise mutual information")
|
||||
- Building TF-IDF vectors for each subreddit `idf_comments.py` and (more experimentally) at the subreddit-week level `idf_comments_weekly.py`
|
||||
- Computing cosine similarities between subreddits based on TF-IDF `term_cosine_similarity.py`.
|
||||
At a high level, the codebase covers four kinds of work:
|
||||
|
||||
Right now, two steps are still in earlier stages of progress:
|
||||
- **Ingest.** Turning Pushshift comment and submission dumps into
|
||||
partitioned Parquet datasets that are fast to query by subreddit or by
|
||||
author.
|
||||
- **Text features.** Building per-subreddit TF-IDF vectors over comment
|
||||
text, including a phrase-detection pass based on pointwise mutual
|
||||
information.
|
||||
- **Similarity, clustering, and density.** Computing cosine similarities
|
||||
between subreddits (by terms or by overlapping authors), clustering the
|
||||
resulting similarity matrices, and summarizing how dense each
|
||||
neighborhood is.
|
||||
- **Time series and visualization.** Pulling activity time series per
|
||||
subreddit and producing t-SNE plots of the clustering output.
|
||||
|
||||
- Approach comparable to tf-idf for similarity between subreddits in terms of comment authors.
|
||||
- Clustering subreddits based on cosine-similarities using [power iteration clustering (PIC)](http://www.cs.cmu.edu/~wcohen/postscript/icml2010-pic-final.pdf "Paper on power iteration clustering")
|
||||
Several pieces are still rough — the user interfaces for many of the
|
||||
scripts assume familiarity with the project, and the TF-IDF pipeline does
|
||||
not yet strip hyperlinks or bot comments, so subreddits with similar
|
||||
automod messages can look misleadingly similar.
|
||||
|
||||
The TF-IDF for comments still has some kinks to iron out to remove hyper links and bot comments. Right now subreddits that have similar automoderation messages appear very similar.
|
||||
## Repository layout
|
||||
|
||||
The user interfaces for most of the scripts are pretty crappy and need to be refined for re-use by others.
|
||||
| Directory | What's in it |
|
||||
|---|---|
|
||||
| `datasets/` | Scripts that convert the raw dumps into partitioned, sorted Parquet datasets. |
|
||||
| `ngrams/` | Term extraction from comments, phrase detection via PMI, and supporting batch scripts. |
|
||||
| `similarities/` | TF-IDF construction and cosine-similarity computation, for both terms and authors, including a weekly variant. |
|
||||
| `clustering/` | Affinity-propagation clustering of the similarity matrices and t-SNE fits for visualization. |
|
||||
| `density/` | Per-subreddit overlap density measures derived from the similarity matrices. |
|
||||
| `timeseries/` | Per-subreddit activity time series, plus tooling for choosing among clustering runs. |
|
||||
| `visualization/` | Altair-based interactive plots of subreddit clusters. |
|
||||
| `bots/` | Heuristics for flagging likely bot accounts. |
|
||||
| `examples/` | Small standalone examples using pyarrow. |
|
||||
|
||||
## Pulling data from [Pushshift](https://pushshift.io "Pushshift.io") ##
|
||||
## Sourcing the dumps
|
||||
|
||||
- `pull_pushshift_comments.sh` uses wget to download comment dumps to `/gscratch/comdata/raw_data/reddit_dumps/comments`. It doesn't download files that already exists and runs `check_comments_shas.sh` to verify the files downloaded correctly.
|
||||
Pushshift was effectively wound down after Reddit cut off third-party API
|
||||
access in 2023, and the original `files.pushshift.io` archive is gone.
|
||||
Collection of new Reddit comment and submission data has since been
|
||||
picked up by [ArcticShift](https://github.com/ArthurHeitmann/arctic_shift),
|
||||
which publishes both the historical Pushshift archive and the new data
|
||||
it continues to collect, with monthly updates redistributed as academic
|
||||
torrents by Reddit users `u/Watchful1` and `u/RaiderBDev`. Fetching the
|
||||
dumps from a torrent client is a manual prerequisite to running the rest
|
||||
of this pipeline; step-by-step instructions for the current CDSC
|
||||
workflow — including which torrents to pull and how to stage the `.zst`
|
||||
files on Hyak — live on the CDSC wiki at
|
||||
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit).
|
||||
The earlier `dumps/` directory of `pull_pushshift_*.sh` and SHA-check
|
||||
scripts has been removed since the URLs they pointed at no longer
|
||||
resolve.
|
||||
|
||||
- `pull_pushshift_submissions.sh` does the same for submissions and puts them in `/gscratch/comdata/raw_data/reddit_dumps/comments`.
|
||||
## Building Parquet datasets
|
||||
|
||||
## Building Parquet Datasets ##
|
||||
The raw dumps are huge compressed JSON files with a lot of metadata that
|
||||
we usually don't need. They aren't indexed, so it's expensive to pull data
|
||||
for just a handful of subreddits, and they are awkward to read directly
|
||||
into Spark. Extracting the useful fields and rewriting the data as
|
||||
Parquet makes everything downstream cheaper. The conversion happens in
|
||||
two steps:
|
||||
|
||||
Pushshift dumps are huge compressed json files with a lot of metadata that we may not need. It isn't indexed so it's expensive to pull data from just a handful of subreddits. It also turns out that it's a pain to read these compressed files straight into spark. Extracting useful variables from the dumps and building parquet datasets will make them easier to work with. This happens in two steps:
|
||||
1. Extracting JSON into temporary, unpartitioned Parquet files using
|
||||
pyarrow (`comments_2_parquet_part1.py`,
|
||||
`submissions_2_parquet_part1.py`).
|
||||
2. Repartitioning and sorting the data using PySpark
|
||||
(`comments_2_parquet_part2.py`, `submissions_2_parquet_part2.py`).
|
||||
|
||||
1. Extracting json into (temporary, unpartitioned) parquet files using pyarrow.
|
||||
2. Repartitioning and sorting the data using pyspark.
|
||||
The final datasets live in `/gscratch/comdata/output/`:
|
||||
|
||||
The final datasets are in `/gscratch/comdata/output.`
|
||||
- `reddit_comments_by_author.parquet` — comments partitioned and sorted by
|
||||
author (lowercase).
|
||||
- `reddit_comments_by_subreddit.parquet` — comments partitioned and sorted
|
||||
by subreddit (lowercase).
|
||||
- `reddit_submissions_by_author.parquet` — submissions partitioned and
|
||||
sorted by author (lowercase).
|
||||
- `reddit_submissions_by_subreddit.parquet` — submissions partitioned and
|
||||
sorted by subreddit (lowercase).
|
||||
|
||||
- `reddit_comments_by_author.parquet` has comments partitioned and sorted by username (lowercase).
|
||||
- `reddit_comments_by_subreddit.parquet` has comments partitioned and sorted by subreddit name (lowercase).
|
||||
- `reddit_submissions_by_author.parquet` has submissions partitioned and sorted by username (lowercase).
|
||||
- `reddit_submissions_by_subreddit.parquet` has submissions partitioned and sorted by subreddit name (lowercase).
|
||||
Splitting the work this way lets us decompress and parse the dumps in the
|
||||
Hyak backfill queue and then sort them in Spark. Partitioning makes it
|
||||
possible to read data for specific subreddits or authors efficiently, and
|
||||
sorting makes per-subreddit or per-user aggregations cheap. More
|
||||
documentation on using these files lives on the [CDSC
|
||||
wiki](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets).
|
||||
|
||||
Breaking this down into two steps is useful because it allows us to decompress and parse the dumps in the backfill queue and then sort them in spark. Partitioning the data makes it possible to efficiently read data for specific subreddits or authors. Sorting it means that you can efficiently compute agreggations at the subreddit or user level. More documentation on using these files is available [here](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets "Wikilink to documentation on the Reddit parquet datasets").
|
||||
## TF-IDF subreddit similarity
|
||||
|
||||
## TF-IDF Subreddit Similarity ##
|
||||
|
||||
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf "Wikipedia article on tf-idf") is common and simple information retrieval technique that we can use to quantify the topic of a subreddit. The goal of TF-IDF is to build a vector for each subreddit that scores every term (or phrase) according to how characteristic it is of the overall lexicon used in that subreddit. For example, the most characteristic terms in the subreddit /r/christianity in the current version of the TF-IDF model are:
|
||||
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) is a simple
|
||||
information-retrieval technique we use to quantify the topic of a
|
||||
subreddit. The goal is to build a vector for each subreddit that scores
|
||||
every term (or phrase) according to how characteristic it is of the
|
||||
lexicon used there. For example, the most characteristic terms in
|
||||
`/r/christianity` in the current model are:
|
||||
|
||||
| Term | tf_idf |
|
||||
|:------------:|:------:|
|
||||
@@ -56,61 +116,121 @@ Breaking this down into two steps is useful because it allows us to decompress a
|
||||
| bible | 0.557 |
|
||||
| scripture | 0.55 |
|
||||
|
||||
TF-IDF stands for "term frequency - inverse document frequency" because it is the product of two terms "term frequency" and "inverse document frequency." Term frequency quantifies the amount that a term appears in a subreddit (document). Inverse document frequency quantifies how much that term appears in other subreddits (documents). As you can see on the Wikipedia page, there are many possible ways of constructing and combining these terms.
|
||||
TF-IDF is the product of two pieces: *term frequency* (how often a term
|
||||
appears in a subreddit) and *inverse document frequency* (how rare the
|
||||
term is across other subreddits). There are many ways to construct and
|
||||
combine these; the [Wikipedia
|
||||
page](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) catalogs the common
|
||||
variants.
|
||||
|
||||
$x + y = z_{1,d}$
|
||||
We normalize term frequency by the maximum raw term frequency for each
|
||||
subreddit:
|
||||
|
||||
I chose to normalize term frequency by the maximum (raw) term frequency for each subreddit:
|
||||
$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\sum_{t^{'} \in d}{f_{t^{'},d}}}$
|
||||
$$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\max_{t^{'} \in d}{f_{t^{'},d}}}$$
|
||||
|
||||
I use the log inverse document frequency:
|
||||
$\mathrm{idf}_{t} = log\frac{N}{| {d \in D : t \in d} |}$
|
||||
and use the log inverse document frequency:
|
||||
|
||||
I then combine them using some smoothing to get:
|
||||
$$\mathrm{idf}_{t} = \log\frac{N}{|\{d \in D : t \in d\}|}$$
|
||||
|
||||
$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$
|
||||
combined with a smoothing term:
|
||||
|
||||
### Building TF-IDF vectors ###
|
||||
$$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$$
|
||||
|
||||
The process for building TF-IDF vectors has four steps:
|
||||
(Other normalization strategies are worth trying — see the note in
|
||||
`similarities/TODO`.)
|
||||
|
||||
1. Extracting terms using `tf_comments.py`
|
||||
2. Detecting common phrases using `top_comment_phrases.py`
|
||||
3. Extracting terms and common phrases using `tf_comments.py --mwe-pass='second'`
|
||||
4. Building idf and tf-idf scores in `idf_comments.py`
|
||||
### Building TF-IDF vectors
|
||||
|
||||
#### Running `tf_comments.py` on the backfill queue ####
|
||||
The pipeline has four steps:
|
||||
|
||||
The main reason that I did it in 4 steps instead of one is to take advantage of the backfill queue for running `tf_comments.py`. This step requires reading all of the text in every comment and converting it to a bag of words at the subreddit-level. This is a lot of computation that is easily parallelizable. The script `run_tf_jobs.sh` partially automates running steps 1 (or 3) on the backfill queue.
|
||||
1. Extract terms with `ngrams/tf_comments.py`.
|
||||
2. Detect common phrases with `ngrams/top_comment_phrases.py`.
|
||||
3. Re-extract terms together with detected phrases via
|
||||
`ngrams/tf_comments.py --mwe-pass=second`.
|
||||
4. Compute IDF and TF-IDF scores in `similarities/tfidf.py`.
|
||||
|
||||
#### Phrase detection using Pointwise Mutual Information ####
|
||||
#### Running `tf_comments.py` on the backfill queue
|
||||
|
||||
TF-IDF is simple, but only uses single words (unigrams). Sequences of multiple words can be important to account for how words have different meanings in different contexts or how sequences of words refer to distinct things like names. Dealing with context or longer sequences of words is a common challenge in natural language processing since the number of possible n-grams grows like crazy as n gets bigger. Phrase detection helps this problem by limiting the set of n-grams to those most informative.
|
||||
The main reason for the four-step layout is that `tf_comments.py` is
|
||||
trivially parallel — it reads every comment and rewrites each subreddit
|
||||
as a bag of words — so it benefits from being farmed out to the Hyak
|
||||
backfill queue. `ngrams/run_tf_jobs.sh` partially automates the dispatch.
|
||||
|
||||
But how do we detect phrases? I implemented [Pointwise mutual information](https://en.wikipedia.org/wiki/Pointwise_mutual_information) "Wikipedia article on pointwise mutual information"), which is a pretty simple way, but seems to work pretty well.
|
||||
#### Phrase detection using pointwise mutual information
|
||||
|
||||
PMI is an quantity derived from information theory. The intuition is that if two words occur together quite frequently compared to how often they appear separately then the cooccurrance is likely to be informative.
|
||||
TF-IDF over unigrams misses the fact that sequences of words often carry
|
||||
distinct meaning (names, fixed expressions, in-jokes). Considering every
|
||||
possible n-gram is prohibitive because the candidate set explodes with
|
||||
`n`, so we use phrase detection to limit ourselves to informative
|
||||
n-grams.
|
||||
|
||||
$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}.$
|
||||
We use [pointwise mutual
|
||||
information](https://en.wikipedia.org/wiki/Pointwise_mutual_information)
|
||||
(PMI), which is simple and works well in practice. The intuition is that
|
||||
if two words co-occur much more often than their marginal frequencies
|
||||
would predict, the pair is probably meaningful:
|
||||
|
||||
In `tf_comments.py` if `--mwe-pass=first` then a 10\% sample of 1-4-grams (sequences of terms up to length 4) will be written to a file to be consumed by `top_comment_phrases.py`. `top_comment_phrases.py` computes the PMI for these possible phrases and writes those that occur at least 3500 times in the sample of n-grams and have a PWMI of at least 3 (about 65000 expressions).
|
||||
$$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)\,p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}$$
|
||||
|
||||
`tf_comments.py --mwe-pass=second` then uses the detected phrases and adds them to the term frequency data.
|
||||
When `tf_comments.py` is run with `--mwe-pass=first`, it writes a 10%
|
||||
sample of 1- to 4-grams to a file. `top_comment_phrases.py` then
|
||||
computes PMI over that sample and keeps phrases that occur at least
|
||||
3,500 times and have PMI of at least 3 — roughly 65,000 expressions.
|
||||
A second pass of `tf_comments.py --mwe-pass=second` folds those phrases
|
||||
back into the term-frequency data.
|
||||
|
||||
### Cosine Similarity ###
|
||||
### Cosine similarity
|
||||
|
||||
Once the tf-idf vectors are built, making a similarity score between two subreddits is straightforward using cosine similarity.
|
||||
Once the TF-IDF vectors are built, computing a similarity score between
|
||||
two subreddits is straightforward with cosine similarity:
|
||||
|
||||
$\text{similarity} = \cos(\theta) = {\mathbf{A} \cdot \mathbf{B} \over \|\mathbf{A}\| \|\mathbf{B}\|} = \frac{ \sum\limits_{i=1}^{n}{A_i B_i} }{ \sqrt{\sum\limits_{i=1}^{n}{A_i^2}} \sqrt{\sum\limits_{i=1}^{n}{B_i^2}} }$
|
||||
$$\text{similarity} = \cos(\theta) = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\|\,\|\mathbf{B}\|} = \frac{\sum_{i=1}^{n}{A_i B_i}}{\sqrt{\sum_{i=1}^{n}{A_i^2}}\,\sqrt{\sum_{i=1}^{n}{B_i^2}}}$$
|
||||
|
||||
Intuitively, we represent two subreddits as lines in a high-dimensional space (tf-idf vectors).
|
||||
In linear algebra, the dot product ($\cdot$) between two vectors takes their weighted sum (e.g. linear regression is a dot product of a vector of covariates and a vector of weights).
|
||||
The vectors might have different lengths like if one subreddit has words in comments than the other, so in cosine similarity the dot product is normalized by the magnitude (lengths) of the vectors.
|
||||
It turns out that this is equivalent to taking the cosine of the two vectors. So cosine similarity in essence quantifies the angle between the two lines in high-dimensional space. If the cosine similarity between two subreddits is greater then their tf-idf vectors are more correlated.
|
||||
Each subreddit is a vector in a high-dimensional term space. The dot
|
||||
product gives a weighted sum of shared terms, and dividing by the
|
||||
vector magnitudes removes the effect of differing vocabulary size — what
|
||||
remains is the cosine of the angle between the two vectors. Cosine
|
||||
similarity with TF-IDF is popular (and has been used on Reddit several
|
||||
times in prior research) because it captures correlation between the
|
||||
*most characteristic* terms of two communities.
|
||||
|
||||
Cosine similarity with tf-idf is popular (indeed it has been applied to Reddit in research several times before) because it quantifies the correlation between the most characteristic terms for two communities.
|
||||
Compared to approaches based on word embeddings or topic models, this
|
||||
method can struggle with polysemy, synonymy, and correlations between
|
||||
related terms. Phrase detection helps a little. The trade-off is
|
||||
simplicity and scalability. Adding [latent semantic
|
||||
analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis) as an
|
||||
intermediate step is on the wish-list for improving on raw TF-IDF
|
||||
similarities.
|
||||
|
||||
Compared to other approach to similarity like those using word embeddings or topic models it may struggle to handle polysemy, synonymy, or correlations between different terms. Using phrase detection helps with this a little bit. The advantages of this approach are simplicity and scalability. I'm thinking about using [Latent Semantic Analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis "Wikipedia article on Latent semantic analysis") as an intermediate step to improve upon similarities based on raw tf-idfs.
|
||||
Even with these simplifications, similarity between a large number of
|
||||
subreddits is expensive — naively $n^2$ dot-products. Passing
|
||||
`--similarity-threshold=X` (with `X>0`) to the similarity scripts lets
|
||||
Spark's built-in matrix library use the DIMSUM approximation, which is
|
||||
the same algorithm Twitter and Google have used for large-scale
|
||||
similarity scoring.
|
||||
|
||||
Even still, computing similarities between a large number of subreddits is computationally expensive and requires $n^2$ dot-product evaluations.
|
||||
This can be sped up by passing `similarity-threshold=X` where $X>0$ into `term_comment_similarity.py`. I used a cosine similarity function that's built into the spark matrix library which supports the `DIMSUM` algorithm for approximating matrix-matrix products. This algorithm is commonly used in industry (i.e. at Twitter, Google) for large-scale similarity scoring.
|
||||
## Clustering, density, and time series
|
||||
|
||||
The similarity matrices feed three follow-on analyses:
|
||||
|
||||
- `clustering/clustering.py` clusters a similarity matrix using
|
||||
affinity propagation; `clustering/selection.py` and
|
||||
`clustering/fit_tsne.py` are supporting scripts for hyperparameter
|
||||
selection and 2-D embeddings.
|
||||
- `density/overlap_density.py` computes a per-subreddit overlap density
|
||||
measure from the similarity matrix.
|
||||
- `timeseries/cluster_timeseries.py` and `timeseries/choose_clusters.py`
|
||||
pull subreddit-level activity time series and join them against
|
||||
clustering output.
|
||||
|
||||
`visualization/tsne_vis.py` renders interactive Altair plots of the
|
||||
clustering output — see the prebuilt HTML files in `visualization/` for
|
||||
examples.
|
||||
|
||||
## Bot detection
|
||||
|
||||
`bots/good_bad_bot.py` computes user-level features (compression rate
|
||||
of comment text, frequency of self-identification as a bot, etc.) that
|
||||
are useful for filtering bot accounts out of downstream analyses. This
|
||||
is preliminary work; nothing in the pipeline currently consumes it
|
||||
automatically.
|
||||
|
||||
77
datasets/README.md
Normal file
77
datasets/README.md
Normal file
@@ -0,0 +1,77 @@
|
||||
# Reddit dumps → sorted parquet datasets
|
||||
|
||||
This directory holds the pipeline that turns compressed Reddit dump files
|
||||
(`RC_YYYY-MM.zst` for comments, `RS_YYYY-MM.zst` for submissions) into the
|
||||
sorted, repartitioned parquet datasets that the rest of the project
|
||||
consumes.
|
||||
|
||||
The pipeline has two stages:
|
||||
|
||||
| Stage | What it does |
|
||||
|---|---|
|
||||
| Part 1 | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. |
|
||||
| Part 2 | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. |
|
||||
|
||||
Each stage has a thin entry-point script per dump type:
|
||||
|
||||
| Script | Notes |
|
||||
|---|---|
|
||||
| `comments_part1.py`, `submissions_part1.py` | Per-file parse. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
|
||||
| `comments_part2.py`, `submissions_part2.py` | Spark sort. Launched via `start_spark_and_run.sh`. |
|
||||
| `dumps_helper.py` | Shared module: schemas, simdjson parser, generic parse loop, parse_dump / gen_task_list / sort_and_write workers. The only per-type code is the two field-handler dicts and the configuration dicts at the top. |
|
||||
|
||||
## The two workflows
|
||||
|
||||
There are two ways to run the pipeline; pick the one that matches your
|
||||
situation.
|
||||
|
||||
### Build from scratch — `build_from_scratch.sh`
|
||||
|
||||
Use this when there is no existing parquet output, or when the upstream
|
||||
data has changed in a way that requires reparsing everything. Wipes the
|
||||
per-source temp directories, processes every `RC_*` / `RS_*` dump in the
|
||||
raw dumps directory through Part 1, then runs the Part 2 Spark sort.
|
||||
|
||||
### Add a new month — `add_new_month.sh YYYY-MM`
|
||||
|
||||
Use this when one or more months of new dump files have arrived and you
|
||||
just want to bring the existing datasets up to date. Processes only the
|
||||
specified month's `RC_<MONTH>.zst` and `RS_<MONTH>.zst` files through
|
||||
Part 1 (the existing per-source parquet files are left in place), then
|
||||
re-runs the Part 2 Spark sort over the full temp directory so the final
|
||||
datasets pick up the new data.
|
||||
|
||||
The Part 2 sort is global and not incremental, so each monthly add
|
||||
re-sorts the entire corpus. That's fine for a monthly cadence; it would
|
||||
need a rearchitecture if the cost became a problem.
|
||||
|
||||
## Running steps individually
|
||||
|
||||
Both `.sh` runners are written so that every meaningful step is a separate,
|
||||
self-contained command. If something fails partway through, or you want
|
||||
to inspect intermediate state, you can copy any single line out of the
|
||||
runner and execute it standalone. For example:
|
||||
|
||||
```sh
|
||||
# parse one specific file (skipping the rest of the workflow)
|
||||
python3 comments_part1.py parse_dump RC_2025-03.zst
|
||||
|
||||
# override default dump/output paths from the CLI
|
||||
python3 comments_part1.py parse_dump RC_2025-03.zst \
|
||||
--dumpdir=/tmp/test --outdir=/tmp/out
|
||||
|
||||
# regenerate just the task list
|
||||
python3 submissions_part1.py gen_task_list
|
||||
```
|
||||
|
||||
The Spark Part 2 step is launched via `start_spark_and_run.sh` (a
|
||||
Hyak-provided wrapper not included in this repo); see the wiki for the
|
||||
launch convention.
|
||||
|
||||
## See also
|
||||
|
||||
The CDSC wiki page
|
||||
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
|
||||
documents the surrounding workflow — where the raw dump files come from
|
||||
(currently ArcticShift via academic torrents), how to stage them on
|
||||
Hyak, and how to run Spark jobs on the cluster.
|
||||
48
datasets/add_new_month.sh
Executable file
48
datasets/add_new_month.sh
Executable file
@@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Add a single new month of dumps to the existing parquet datasets.
|
||||
#
|
||||
# Processes only the RC_<month>.zst and RS_<month>.zst files (Part 1),
|
||||
# leaving the existing per-source temp parquet files untouched, then
|
||||
# re-runs the Part 2 Spark sort + repartition over the full temp dir so
|
||||
# the final by_subreddit / by_author datasets pick up the new data.
|
||||
#
|
||||
# Usage:
|
||||
# add_new_month.sh YYYY-MM
|
||||
#
|
||||
# Example:
|
||||
# add_new_month.sh 2025-03
|
||||
#
|
||||
# Every command below is independently runnable — to debug, copy a line
|
||||
# out and run it directly. For a full rebuild instead, see
|
||||
# build_from_scratch.sh.
|
||||
#
|
||||
# Note on cost: Part 2 always re-sorts the full corpus (the sort is global,
|
||||
# not incremental), so this gets slightly slower each month. For the
|
||||
# monthly cadence this is fine; if the sort becomes a bottleneck we'd
|
||||
# need to rearchitect Part 2 to merge-append instead of re-sort.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
MONTH="${1:-}"
|
||||
if [ -z "$MONTH" ]; then
|
||||
echo "Usage: $0 YYYY-MM" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Part 1: parse the new month's dumps (no wipe) -------------------------
|
||||
|
||||
# parse the new comments file
|
||||
python3 comments_part1.py parse_dump "RC_${MONTH}.zst"
|
||||
|
||||
# parse the new submissions file
|
||||
python3 submissions_part1.py parse_dump "RS_${MONTH}.zst"
|
||||
|
||||
# --- Part 2: re-sort the full corpus including the new data ---------------
|
||||
|
||||
# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 comments_part2.py
|
||||
|
||||
# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 submissions_part2.py
|
||||
56
datasets/build_from_scratch.sh
Executable file
56
datasets/build_from_scratch.sh
Executable file
@@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# Build the sorted, partitioned Reddit parquet datasets from scratch.
|
||||
#
|
||||
# Wipes the per-source temp directories, processes every RC_* and RS_* dump
|
||||
# in the raw_data dumps directory through Part 1 (per-file, parallel), then
|
||||
# runs the Part 2 Spark sort + repartition for both comments and submissions.
|
||||
#
|
||||
# Every command below is independently runnable — to debug a single stage,
|
||||
# copy the line out and run it directly. Run the whole script end-to-end
|
||||
# only when you trust each step.
|
||||
#
|
||||
# Prerequisites:
|
||||
# - raw .zst dumps already staged in the dumpdir locations (see the
|
||||
# defaults in dumps_helper.py, or override via --dumpdir)
|
||||
# - GNU parallel installed
|
||||
# - start_spark_and_run.sh on PATH (Hyak-provided wrapper)
|
||||
#
|
||||
# To add one new month to an existing build instead of rebuilding from
|
||||
# scratch, use add_new_month.sh.
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
TEMP_COMMENTS="/gscratch/comdata/output/temp/reddit_comments.parquet"
|
||||
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/reddit_submissions.parquet"
|
||||
|
||||
# --- Part 1a: comments ------------------------------------------------------
|
||||
|
||||
# wipe any existing comments temp output
|
||||
rm -rf "$TEMP_COMMENTS"
|
||||
|
||||
# generate the per-file parse task list
|
||||
python3 comments_part1.py gen_task_list
|
||||
|
||||
# run all comments parse tasks in parallel
|
||||
parallel --joblog comments_joblog.txt --results comments_logs < parse_comments_task_list
|
||||
|
||||
# --- Part 1b: submissions ---------------------------------------------------
|
||||
|
||||
# wipe any existing submissions temp output
|
||||
rm -rf "$TEMP_SUBMISSIONS"
|
||||
|
||||
# generate the per-file parse task list
|
||||
python3 submissions_part1.py gen_task_list
|
||||
|
||||
# run all submissions parse tasks in parallel
|
||||
parallel --joblog submissions_joblog.txt --results submissions_logs < parse_submissions_task_list
|
||||
|
||||
# --- Part 2: spark sort + repartition --------------------------------------
|
||||
|
||||
# sort comments and write reddit_comments_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 comments_part2.py
|
||||
|
||||
# sort submissions and write reddit_submissions_by_{subreddit,author}.parquet
|
||||
start_spark_and_run.sh 1 submissions_part2.py
|
||||
@@ -1,10 +0,0 @@
|
||||
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
|
||||
|
||||
#!/usr/bin/env bash
|
||||
echo "#!/usr/bin/bash" > job_script.sh
|
||||
#echo "source $(pwd)/../bin/activate" >> job_script.sh
|
||||
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
|
||||
|
||||
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
|
||||
|
||||
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py
|
||||
@@ -1,115 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool
|
||||
from itertools import islice
|
||||
from helper import find_dumps, open_fileset
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
def parse_comment(comment, names= None):
|
||||
if names is None:
|
||||
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
|
||||
|
||||
try:
|
||||
comment = json.loads(comment)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
print(e)
|
||||
print(comment)
|
||||
row = [None for _ in names]
|
||||
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment)
|
||||
return tuple(row)
|
||||
|
||||
row = []
|
||||
for name in names:
|
||||
if name == 'created_utc':
|
||||
row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None))
|
||||
elif name == 'edited':
|
||||
val = comment[name]
|
||||
if type(val) == bool:
|
||||
row.append(val)
|
||||
row.append(None)
|
||||
else:
|
||||
row.append(True)
|
||||
row.append(datetime.fromtimestamp(int(val),tz=None))
|
||||
elif name == "time_edited":
|
||||
continue
|
||||
elif name not in comment:
|
||||
row.append(None)
|
||||
|
||||
else:
|
||||
row.append(comment[name])
|
||||
|
||||
return tuple(row)
|
||||
|
||||
|
||||
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
|
||||
|
||||
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
|
||||
|
||||
files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
|
||||
|
||||
pool = Pool(28)
|
||||
|
||||
stream = open_fileset(files)
|
||||
|
||||
N = int(1e4)
|
||||
|
||||
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
|
||||
|
||||
schema = pa.schema([
|
||||
pa.field('id', pa.string(), nullable=True),
|
||||
pa.field('subreddit', pa.string(), nullable=True),
|
||||
pa.field('link_id', pa.string(), nullable=True),
|
||||
pa.field('parent_id', pa.string(), nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('author', pa.string(), nullable=True),
|
||||
pa.field('ups', pa.int64(), nullable=True),
|
||||
pa.field('downs', pa.int64(), nullable=True),
|
||||
pa.field('score', pa.int64(), nullable=True),
|
||||
pa.field('edited', pa.bool_(), nullable=True),
|
||||
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('subreddit_type', pa.string(), nullable=True),
|
||||
pa.field('subreddit_id', pa.string(), nullable=True),
|
||||
pa.field('stickied', pa.bool_(), nullable=True),
|
||||
pa.field('is_submitter', pa.bool_(), nullable=True),
|
||||
pa.field('body', pa.string(), nullable=True),
|
||||
pa.field('error', pa.string(), nullable=True),
|
||||
])
|
||||
|
||||
from pathlib import Path
|
||||
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
|
||||
|
||||
if not p.is_dir():
|
||||
if p.exists():
|
||||
p.unlink()
|
||||
p.mkdir()
|
||||
|
||||
else:
|
||||
list(map(Path.unlink,p.glob('*')))
|
||||
|
||||
part_size = int(1e7)
|
||||
part = 1
|
||||
n_output = 0
|
||||
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
|
||||
|
||||
while True:
|
||||
if n_output > part_size:
|
||||
if part > 1:
|
||||
writer.close()
|
||||
|
||||
part = part + 1
|
||||
n_output = 0
|
||||
|
||||
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
|
||||
|
||||
n_output += N
|
||||
chunk = islice(rows,N)
|
||||
pddf = pd.DataFrame(chunk, columns=schema.names)
|
||||
table = pa.Table.from_pandas(pddf,schema=schema)
|
||||
if table.shape[0] == 0:
|
||||
break
|
||||
writer.write_table(table)
|
||||
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# spark script to make sorted, and partitioned parquet files
|
||||
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
|
||||
|
||||
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
|
||||
df = df.drop('subreddit')
|
||||
df = df.withColumnRenamed('subreddit_2','subreddit')
|
||||
|
||||
df = df.withColumnRenamed("created_utc","CreatedAt")
|
||||
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
|
||||
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
|
||||
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
|
||||
|
||||
df = df.repartition('subreddit')
|
||||
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
|
||||
|
||||
df = df.repartition('author')
|
||||
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
|
||||
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
|
||||
24
datasets/comments_part1.py
Executable file
24
datasets/comments_part1.py
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 1 for comments: parse one RC_*.zst dump into a parquet file.
|
||||
|
||||
CLI:
|
||||
comments_part1.py parse_dump RC_2018-08.zst
|
||||
comments_part1.py gen_task_list
|
||||
comments_part1.py parse_dump RC_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out
|
||||
"""
|
||||
|
||||
import fire
|
||||
from dumps_helper import COMMENTS, parse_dump, gen_task_list
|
||||
|
||||
|
||||
def _parse_dump(partition, dumpdir=None, outdir=None):
|
||||
parse_dump(COMMENTS, partition, dumpdir=dumpdir, outdir=outdir)
|
||||
|
||||
|
||||
def _gen_task_list(dumpdir=None, tasklist=None):
|
||||
gen_task_list(COMMENTS, 'comments_part1.py', dumpdir=dumpdir, tasklist=tasklist)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire({'parse_dump': _parse_dump,
|
||||
'gen_task_list': _gen_task_list})
|
||||
14
datasets/comments_part2.py
Executable file
14
datasets/comments_part2.py
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 2 for comments: Spark sort + repartition the per-source parquets
|
||||
produced by comments_part1.py into the final by_subreddit / by_author
|
||||
datasets.
|
||||
|
||||
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
|
||||
start_spark_and_run.sh 1 comments_part2.py
|
||||
"""
|
||||
|
||||
from dumps_helper import COMMENTS, sort_and_write
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sort_and_write(COMMENTS)
|
||||
286
datasets/dumps_helper.py
Normal file
286
datasets/dumps_helper.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""Shared logic for the comments and submissions dump-to-parquet pipeline.
|
||||
|
||||
Used by comments_part1.py / submissions_part1.py (Part 1: one compressed
|
||||
dump file → one parquet file) and comments_part2.py / submissions_part2.py
|
||||
(Part 2: Spark sort + repartition of the per-source parquets).
|
||||
|
||||
The two dump types only differ in their schemas and a handful of
|
||||
field-specific extractors. The parse loop, the file I/O wrapping, the
|
||||
task-list generator, and the Spark sort are all shared here.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
from itertools import islice
|
||||
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import simdjson
|
||||
|
||||
from helper import find_dumps, open_fileset
|
||||
|
||||
|
||||
_json = simdjson.Parser()
|
||||
|
||||
|
||||
# --- field-level extractors ------------------------------------------------
|
||||
|
||||
def _ts(name):
|
||||
"""Extractor for a unix-timestamp field (or None if missing)."""
|
||||
def handler(record):
|
||||
val = record.get(name)
|
||||
if val is None:
|
||||
return None
|
||||
return datetime.fromtimestamp(int(val), tz=None)
|
||||
return handler
|
||||
|
||||
|
||||
def _edited(record):
|
||||
"""Returns (edited, time_edited). The dump packs both into one `edited`
|
||||
field that is either a bool (never edited / unknown timestamp) or a
|
||||
unix timestamp."""
|
||||
val = record.get('edited')
|
||||
if isinstance(val, bool):
|
||||
return (val, None)
|
||||
if val is None:
|
||||
return (None, None)
|
||||
return (True, datetime.fromtimestamp(int(val), tz=None))
|
||||
|
||||
|
||||
def _has_media(record):
|
||||
"""Submissions don't have a `has_media` field directly — derive it."""
|
||||
return record.get('media') is not None
|
||||
|
||||
|
||||
# --- generic parse loop ----------------------------------------------------
|
||||
|
||||
def parse_record(line, fields, handlers):
|
||||
"""Parse one JSON line into a tuple aligned with `fields`.
|
||||
|
||||
`handlers` maps field name → callable(record) returning either a single
|
||||
value (one column) or a tuple of values (multiple consecutive columns,
|
||||
consuming the next len(tuple)-1 entries in `fields`).
|
||||
Fields without a handler are pulled from the record by name, with
|
||||
missing keys yielding None.
|
||||
The last field in `fields` is reserved for an error message string
|
||||
and is set to None on success.
|
||||
"""
|
||||
try:
|
||||
record = _json.parse(line)
|
||||
except (ValueError, KeyError) as e:
|
||||
row = [None] * len(fields)
|
||||
row[-1] = f"parse error|{e}|{line}"
|
||||
return tuple(row)
|
||||
|
||||
row = []
|
||||
skip_next = 0
|
||||
for name in fields:
|
||||
if skip_next > 0:
|
||||
skip_next -= 1
|
||||
continue
|
||||
handler = handlers.get(name)
|
||||
if handler is None:
|
||||
try:
|
||||
row.append(record[name])
|
||||
except KeyError:
|
||||
row.append(None)
|
||||
else:
|
||||
result = handler(record)
|
||||
if isinstance(result, tuple):
|
||||
row.extend(result)
|
||||
skip_next = len(result) - 1
|
||||
else:
|
||||
row.append(result)
|
||||
return tuple(row)
|
||||
|
||||
|
||||
# --- comments schema -------------------------------------------------------
|
||||
|
||||
COMMENT_FIELDS = [
|
||||
'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author',
|
||||
'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type',
|
||||
'subreddit_id', 'stickied', 'is_submitter', 'body', 'error',
|
||||
]
|
||||
|
||||
COMMENT_SCHEMA = pa.schema([
|
||||
pa.field('id', pa.string(), nullable=True),
|
||||
pa.field('subreddit', pa.string(), nullable=True),
|
||||
pa.field('link_id', pa.string(), nullable=True),
|
||||
pa.field('parent_id', pa.string(), nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('author', pa.string(), nullable=True),
|
||||
pa.field('ups', pa.int64(), nullable=True),
|
||||
pa.field('downs', pa.int64(), nullable=True),
|
||||
pa.field('score', pa.int64(), nullable=True),
|
||||
pa.field('edited', pa.bool_(), nullable=True),
|
||||
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('subreddit_type', pa.string(), nullable=True),
|
||||
pa.field('subreddit_id', pa.string(), nullable=True),
|
||||
pa.field('stickied', pa.bool_(), nullable=True),
|
||||
pa.field('is_submitter', pa.bool_(), nullable=True),
|
||||
pa.field('body', pa.string(), nullable=True),
|
||||
pa.field('error', pa.string(), nullable=True),
|
||||
])
|
||||
|
||||
COMMENT_HANDLERS = {
|
||||
'created_utc': _ts('created_utc'),
|
||||
'edited': _edited,
|
||||
}
|
||||
|
||||
|
||||
# --- submissions schema ----------------------------------------------------
|
||||
|
||||
SUBMISSION_FIELDS = [
|
||||
'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url',
|
||||
'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext',
|
||||
'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited',
|
||||
'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name',
|
||||
'is_self', 'stickied', 'quarantine', 'error',
|
||||
]
|
||||
|
||||
SUBMISSION_SCHEMA = pa.schema([
|
||||
pa.field('id', pa.string(), nullable=True),
|
||||
pa.field('author', pa.string(), nullable=True),
|
||||
pa.field('subreddit', pa.string(), nullable=True),
|
||||
pa.field('title', pa.string(), nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('permalink', pa.string(), nullable=True),
|
||||
pa.field('url', pa.string(), nullable=True),
|
||||
pa.field('domain', pa.string(), nullable=True),
|
||||
pa.field('score', pa.int64(), nullable=True),
|
||||
pa.field('ups', pa.int64(), nullable=True),
|
||||
pa.field('downs', pa.int64(), nullable=True),
|
||||
pa.field('over_18', pa.bool_(), nullable=True),
|
||||
pa.field('has_media', pa.bool_(), nullable=True),
|
||||
pa.field('selftext', pa.string(), nullable=True),
|
||||
pa.field('retrieved_on', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('num_comments', pa.int64(), nullable=True),
|
||||
pa.field('gilded', pa.int64(), nullable=True),
|
||||
pa.field('edited', pa.bool_(), nullable=True),
|
||||
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
|
||||
pa.field('subreddit_type', pa.string(), nullable=True),
|
||||
pa.field('subreddit_id', pa.string(), nullable=True),
|
||||
pa.field('subreddit_subscribers', pa.int64(), nullable=True),
|
||||
pa.field('name', pa.string(), nullable=True),
|
||||
pa.field('is_self', pa.bool_(), nullable=True),
|
||||
pa.field('stickied', pa.bool_(), nullable=True),
|
||||
pa.field('quarantine', pa.bool_(), nullable=True),
|
||||
pa.field('error', pa.string(), nullable=True),
|
||||
])
|
||||
|
||||
SUBMISSION_HANDLERS = {
|
||||
'created_utc': _ts('created_utc'),
|
||||
'retrieved_on': _ts('retrieved_on'),
|
||||
'edited': _edited,
|
||||
'has_media': _has_media,
|
||||
}
|
||||
|
||||
|
||||
# --- per-type configuration ------------------------------------------------
|
||||
|
||||
# Defaults that the entry-point scripts pass through, exposed here so the
|
||||
# field/schema/handler triplet, the canonical paths, and the dump filename
|
||||
# pattern all live in one place.
|
||||
COMMENTS = {
|
||||
'fields': COMMENT_FIELDS,
|
||||
'schema': COMMENT_SCHEMA,
|
||||
'handlers': COMMENT_HANDLERS,
|
||||
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments",
|
||||
'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet",
|
||||
'file_pattern': 'RC_20*.*',
|
||||
'task_list': 'parse_comments_task_list',
|
||||
'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",
|
||||
'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet",
|
||||
'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"],
|
||||
'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"],
|
||||
'app_name': "Reddit comments to parquet",
|
||||
}
|
||||
|
||||
SUBMISSIONS = {
|
||||
'fields': SUBMISSION_FIELDS,
|
||||
'schema': SUBMISSION_SCHEMA,
|
||||
'handlers': SUBMISSION_HANDLERS,
|
||||
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions",
|
||||
'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet",
|
||||
'file_pattern': 'RS_20*.*',
|
||||
'task_list': 'parse_submissions_task_list',
|
||||
'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet",
|
||||
'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet",
|
||||
'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"],
|
||||
'author_sort_keys': ["author", "CreatedAt", "id"],
|
||||
'app_name': "Reddit submissions to parquet",
|
||||
}
|
||||
|
||||
|
||||
# --- Part 1: parse one dump file -> one parquet ----------------------------
|
||||
|
||||
def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000):
|
||||
"""Read one compressed dump from `dumpdir/partition` and write a parquet
|
||||
file to `outdir/<basename>.parquet`. Streams chunks of `chunk_size`
|
||||
rows so memory stays bounded."""
|
||||
dumpdir = dumpdir or config['dumpdir']
|
||||
outdir = outdir or config['outdir']
|
||||
schema = config['schema']
|
||||
fields = config['fields']
|
||||
handlers = config['handlers']
|
||||
|
||||
stream = open_fileset([os.path.join(dumpdir, partition)])
|
||||
rows = (parse_record(line, fields, handlers) for line in stream)
|
||||
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet")
|
||||
|
||||
with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer:
|
||||
while True:
|
||||
chunk = list(islice(rows, chunk_size))
|
||||
if not chunk:
|
||||
break
|
||||
pddf = pd.DataFrame(chunk, columns=schema.names)
|
||||
table = pa.Table.from_pandas(pddf, schema=schema)
|
||||
writer.write_table(table)
|
||||
|
||||
|
||||
def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
|
||||
"""Write a parallel-friendly task list of `script_name parse_dump <file>`
|
||||
lines, one per dump file found under `dumpdir`."""
|
||||
dumpdir = dumpdir or config['dumpdir']
|
||||
tasklist = tasklist or config['task_list']
|
||||
files = list(find_dumps(dumpdir, base_pattern=config['file_pattern']))
|
||||
with open(tasklist, 'w') as of:
|
||||
for fpath in files:
|
||||
partition = os.path.split(fpath)[1]
|
||||
of.write(f'python3 {script_name} parse_dump {partition}\n')
|
||||
|
||||
|
||||
# --- Part 2: spark sort + repartition --------------------------------------
|
||||
|
||||
def sort_and_write(config):
|
||||
"""Read the directory of per-source parquets, sort and repartition
|
||||
twice (once by subreddit, once by author), and write the two final
|
||||
datasets. Pyspark is imported lazily so Part 1 callers don't pay the
|
||||
Spark startup cost."""
|
||||
from pyspark.sql import SparkSession, functions as f
|
||||
|
||||
spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
|
||||
|
||||
df = spark.read.parquet(config['outdir'], compression='snappy')
|
||||
|
||||
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
|
||||
df = df.drop('subreddit')
|
||||
df = df.withColumnRenamed('subreddit_2', 'subreddit')
|
||||
|
||||
df = df.withColumnRenamed("created_utc", "CreatedAt")
|
||||
df = df.withColumn("Month", f.month(f.col("CreatedAt")))
|
||||
df = df.withColumn("Year", f.year(f.col("CreatedAt")))
|
||||
df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt")))
|
||||
|
||||
sub_keys = config['subreddit_sort_keys']
|
||||
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
|
||||
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
|
||||
df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy')
|
||||
|
||||
auth_keys = config['author_sort_keys']
|
||||
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
|
||||
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
|
||||
df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy')
|
||||
@@ -1,4 +0,0 @@
|
||||
#!/usr/bin/bash
|
||||
start_spark_cluster.sh
|
||||
spark-submit --master spark://$(hostname):18899 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/users/nathante/subreddit_term_similarity_weekly_5000.parquet --topN=5000
|
||||
stop-all.sh
|
||||
@@ -1,9 +0,0 @@
|
||||
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
|
||||
|
||||
#!/usr/bin/env bash
|
||||
|
||||
./parse_submissions.sh
|
||||
|
||||
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py
|
||||
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# two stages:
|
||||
# 1. from gz to arrow parquet (this script)
|
||||
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
|
||||
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool
|
||||
from itertools import islice
|
||||
from helper import find_dumps, open_fileset
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import simdjson
|
||||
import fire
|
||||
import os
|
||||
|
||||
parser = simdjson.Parser()
|
||||
|
||||
def parse_submission(post, names = None):
|
||||
if names is None:
|
||||
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
|
||||
|
||||
try:
|
||||
post = parser.parse(post)
|
||||
except (ValueError) as e:
|
||||
# print(e)
|
||||
# print(post)
|
||||
row = [None for _ in names]
|
||||
row[-1] = "Error parsing json|{0}|{1}".format(e,post)
|
||||
return tuple(row)
|
||||
|
||||
row = []
|
||||
|
||||
for name in names:
|
||||
if name == 'created_utc' or name == 'retrieved_on':
|
||||
val = post.get(name,None)
|
||||
if val is not None:
|
||||
row.append(datetime.fromtimestamp(int(post[name]),tz=None))
|
||||
else:
|
||||
row.append(None)
|
||||
elif name == 'edited':
|
||||
val = post[name]
|
||||
if type(val) == bool:
|
||||
row.append(val)
|
||||
row.append(None)
|
||||
else:
|
||||
row.append(True)
|
||||
row.append(datetime.fromtimestamp(int(val),tz=None))
|
||||
elif name == "time_edited":
|
||||
continue
|
||||
elif name == 'has_media':
|
||||
row.append(post.get('media',None) is not None)
|
||||
|
||||
elif name not in post:
|
||||
row.append(None)
|
||||
else:
|
||||
row.append(post[name])
|
||||
return tuple(row)
|
||||
|
||||
def parse_dump(partition):
|
||||
|
||||
N=10000
|
||||
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
|
||||
rows = map(parse_submission,stream)
|
||||
schema = pa.schema([
|
||||
pa.field('id', pa.string(),nullable=True),
|
||||
pa.field('author', pa.string(),nullable=True),
|
||||
pa.field('subreddit', pa.string(),nullable=True),
|
||||
pa.field('title', pa.string(),nullable=True),
|
||||
pa.field('created_utc', pa.timestamp('ms'),nullable=True),
|
||||
pa.field('permalink', pa.string(),nullable=True),
|
||||
pa.field('url', pa.string(),nullable=True),
|
||||
pa.field('domain', pa.string(),nullable=True),
|
||||
pa.field('score', pa.int64(),nullable=True),
|
||||
pa.field('ups', pa.int64(),nullable=True),
|
||||
pa.field('downs', pa.int64(),nullable=True),
|
||||
pa.field('over_18', pa.bool_(),nullable=True),
|
||||
pa.field('has_media',pa.bool_(),nullable=True),
|
||||
pa.field('selftext',pa.string(),nullable=True),
|
||||
pa.field('retrieved_on', pa.timestamp('ms'),nullable=True),
|
||||
pa.field('num_comments', pa.int64(),nullable=True),
|
||||
pa.field('gilded',pa.int64(),nullable=True),
|
||||
pa.field('edited',pa.bool_(),nullable=True),
|
||||
pa.field('time_edited',pa.timestamp('ms'),nullable=True),
|
||||
pa.field('subreddit_type',pa.string(),nullable=True),
|
||||
pa.field('subreddit_id',pa.string(),nullable=True),
|
||||
pa.field('subreddit_subscribers',pa.int64(),nullable=True),
|
||||
pa.field('name',pa.string(),nullable=True),
|
||||
pa.field('is_self',pa.bool_(),nullable=True),
|
||||
pa.field('stickied',pa.bool_(),nullable=True),
|
||||
pa.field('quarantine',pa.bool_(),nullable=True),
|
||||
pa.field('error',pa.string(),nullable=True)])
|
||||
|
||||
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
|
||||
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
|
||||
|
||||
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
|
||||
while True:
|
||||
chunk = islice(rows,N)
|
||||
pddf = pd.DataFrame(chunk, columns=schema.names)
|
||||
table = pa.Table.from_pandas(pddf,schema=schema)
|
||||
if table.shape[0] == 0:
|
||||
break
|
||||
writer.write_table(table)
|
||||
|
||||
writer.close()
|
||||
|
||||
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
|
||||
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
|
||||
with open("parse_submissions_task_list",'w') as of:
|
||||
for fpath in files:
|
||||
partition = os.path.split(fpath)[1]
|
||||
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire({'parse_dump':parse_dump,
|
||||
'gen_task_list':gen_task_list})
|
||||
@@ -1,42 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# spark script to make sorted, and partitioned parquet files
|
||||
|
||||
import pyspark
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
import os
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
sc = spark.sparkContext
|
||||
|
||||
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
|
||||
conf = conf.set("spark.sql.shuffle.partitions",2000)
|
||||
conf = conf.set('spark.sql.crossJoin.enabled',"true")
|
||||
conf = conf.set('spark.debug.maxToStringFields',200)
|
||||
sqlContext = pyspark.SQLContext(sc)
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
|
||||
|
||||
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
|
||||
df = df.drop('subreddit')
|
||||
df = df.withColumnRenamed('subreddit_2','subreddit')
|
||||
df = df.withColumnRenamed("created_utc","CreatedAt")
|
||||
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
|
||||
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
|
||||
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
|
||||
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
|
||||
|
||||
# next we gotta resort it all.
|
||||
df = df.repartition("subreddit")
|
||||
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
|
||||
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
|
||||
df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
|
||||
|
||||
|
||||
# # we also want to have parquet files sorted by author then reddit.
|
||||
df = df.repartition("author")
|
||||
df3 = df.sort(["author","CreatedAt","id"],ascending=True)
|
||||
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
|
||||
df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')
|
||||
24
datasets/submissions_part1.py
Executable file
24
datasets/submissions_part1.py
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 1 for submissions: parse one RS_*.zst dump into a parquet file.
|
||||
|
||||
CLI:
|
||||
submissions_part1.py parse_dump RS_2018-08.zst
|
||||
submissions_part1.py gen_task_list
|
||||
submissions_part1.py parse_dump RS_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out
|
||||
"""
|
||||
|
||||
import fire
|
||||
from dumps_helper import SUBMISSIONS, parse_dump, gen_task_list
|
||||
|
||||
|
||||
def _parse_dump(partition, dumpdir=None, outdir=None):
|
||||
parse_dump(SUBMISSIONS, partition, dumpdir=dumpdir, outdir=outdir)
|
||||
|
||||
|
||||
def _gen_task_list(dumpdir=None, tasklist=None):
|
||||
gen_task_list(SUBMISSIONS, 'submissions_part1.py', dumpdir=dumpdir, tasklist=tasklist)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire({'parse_dump': _parse_dump,
|
||||
'gen_task_list': _gen_task_list})
|
||||
14
datasets/submissions_part2.py
Executable file
14
datasets/submissions_part2.py
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Part 2 for submissions: Spark sort + repartition the per-source parquets
|
||||
produced by submissions_part1.py into the final by_subreddit / by_author
|
||||
datasets.
|
||||
|
||||
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
|
||||
start_spark_and_run.sh 1 submissions_part2.py
|
||||
"""
|
||||
|
||||
from dumps_helper import SUBMISSIONS, sort_and_write
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sort_and_write(SUBMISSIONS)
|
||||
@@ -1,33 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# run from a build_machine
|
||||
|
||||
import requests
|
||||
from os import path
|
||||
import hashlib
|
||||
|
||||
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
|
||||
shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
|
||||
|
||||
shasums = shasums1 + shasums2
|
||||
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
|
||||
|
||||
for l in shasums.strip().split('\n'):
|
||||
sha256_hash = hashlib.sha256()
|
||||
parts = l.split(' ')
|
||||
|
||||
correct_sha256 = parts[0]
|
||||
filename = parts[-1]
|
||||
print(f"checking {filename}")
|
||||
fpath = path.join(dumpdir,filename)
|
||||
if path.isfile(fpath):
|
||||
with open(fpath,'rb') as f:
|
||||
for byte_block in iter(lambda: f.read(4096),b""):
|
||||
sha256_hash.update(byte_block)
|
||||
|
||||
if sha256_hash.hexdigest() == correct_sha256:
|
||||
print(f"{filename} checks out")
|
||||
else:
|
||||
print(f"ERROR! {filename} has the wrong hash. Redownload and recheck!")
|
||||
else:
|
||||
print(f"Skipping {filename} as it doesn't exist")
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# run from a build_machine
|
||||
|
||||
import requests
|
||||
from os import path
|
||||
import hashlib
|
||||
|
||||
file1 = requests.get("https://files.pushshift.io/reddit/submissions/sha256sums.txt").text
|
||||
file2 = requests.get("https://files.pushshift.io/reddit/submissions/old_v1_data/sha256sums.txt").text
|
||||
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
|
||||
|
||||
for l in file1.strip().split('\n') + file2.strip().split('\n'):
|
||||
sha256_hash = hashlib.sha256()
|
||||
parts = l.split(' ')
|
||||
|
||||
correct_sha256 = parts[0]
|
||||
filename = parts[-1]
|
||||
print(f"checking {filename}")
|
||||
fpath = path.join(dumpdir,filename)
|
||||
if path.isfile(fpath):
|
||||
with open(fpath,'rb') as f:
|
||||
for byte_block in iter(lambda: f.read(4096),b""):
|
||||
sha256_hash.update(byte_block)
|
||||
|
||||
if sha256_hash.hexdigest() == correct_sha256:
|
||||
print(f"{filename} checks out")
|
||||
else:
|
||||
print(f"ERROR! {filename} has the wrong hash. Redownload and recheck!")
|
||||
else:
|
||||
print(f"Skipping {filename} as it doesn't exist")
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
user_agent='nathante teblunthuis <nathante@uw.edu>'
|
||||
output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments'
|
||||
base_url='https://files.pushshift.io/reddit/comments/'
|
||||
|
||||
wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
|
||||
|
||||
./check_comments_shas.py
|
||||
@@ -1,14 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
user_agent='nathante teblunthuis <nathante@uw.edu>'
|
||||
output_dir='/gscratch/comdata/raw_data/reddit_dumps/submissions'
|
||||
base_url='https://files.pushshift.io/reddit/submissions/'
|
||||
|
||||
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
|
||||
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
|
||||
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
|
||||
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
|
||||
|
||||
./check_submission_shas.py
|
||||
@@ -1,21 +0,0 @@
|
||||
from pyspark.sql import SparkSession
|
||||
from similarities_helper import build_tfidf_dataset
|
||||
import pandas as pd
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
# remove [deleted] and AutoModerator (TODO remove other bots)
|
||||
df = df.filter(df.author != '[deleted]')
|
||||
df = df.filter(df.author != 'AutoModerator')
|
||||
|
||||
df = build_tfidf_dataset(df, include_subs, 'author')
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
|
||||
|
||||
spark.stop()
|
||||
@@ -1,27 +0,0 @@
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import Window
|
||||
from similarities_helper import build_weekly_tfidf_dataset
|
||||
import pandas as pd
|
||||
|
||||
|
||||
## TODO:need to exclude automoderator / bot posts.
|
||||
## TODO:need to exclude better handle hyperlinks.
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
# remove [deleted] and AutoModerator (TODO remove other bots)
|
||||
# df = df.filter(df.author != '[deleted]')
|
||||
# df = df.filter(df.author != 'AutoModerator')
|
||||
|
||||
df = build_weekly_tfidf_dataset(df, include_subs, 'term')
|
||||
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
|
||||
spark.stop()
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import Window
|
||||
import numpy as np
|
||||
import pyarrow
|
||||
import pandas as pd
|
||||
import fire
|
||||
from itertools import islice
|
||||
from pathlib import Path
|
||||
from similarities_helper import *
|
||||
|
||||
#tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/subreddit_terms.parquet')
|
||||
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
conf = spark.sparkContext.getConf()
|
||||
print(outfile)
|
||||
tfidf = spark.read.parquet(tfidf_path)
|
||||
|
||||
if included_subreddits is None:
|
||||
included_subreddits = select_topN_subreddits(topN)
|
||||
|
||||
else:
|
||||
included_subreddits = set(open(included_subreddits))
|
||||
|
||||
print("creating temporary parquet with matrix indicies")
|
||||
tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, included_subreddits)
|
||||
|
||||
tfidf = spark.read.parquet(tempdir.name)
|
||||
|
||||
# the ids can change each week.
|
||||
subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
|
||||
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
|
||||
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
|
||||
spark.stop()
|
||||
|
||||
weeks = list(subreddit_names.week.drop_duplicates())
|
||||
for week in weeks:
|
||||
print("loading matrix")
|
||||
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
|
||||
print('computing similarities')
|
||||
sims = column_similarities(mat)
|
||||
del mat
|
||||
|
||||
names = subreddit_names.loc[subreddit_names.week==week]
|
||||
|
||||
sims = sims.rename({i:sr for i, sr in enumerate(names.subreddit.values)},axis=1)
|
||||
sims['subreddit'] = names.subreddit.values
|
||||
write_weekly_similarities(outfile, sims, week)
|
||||
|
||||
|
||||
|
||||
def cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500):
|
||||
'''
|
||||
Compute similarities between subreddits based on tfi-idf vectors of author comments
|
||||
|
||||
included_subreddits : string
|
||||
Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
|
||||
|
||||
min_df : int (default = 0.1 * (number of included_subreddits)
|
||||
exclude terms that appear in fewer than this number of documents.
|
||||
|
||||
outfile: string
|
||||
where to output csv and feather outputs
|
||||
'''
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
conf = spark.sparkContext.getConf()
|
||||
print(outfile)
|
||||
|
||||
tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet')
|
||||
|
||||
if included_subreddits is None:
|
||||
included_subreddits = select_topN_subreddits(topN)
|
||||
|
||||
else:
|
||||
included_subreddits = set(open(included_subreddits))
|
||||
|
||||
print("creating temporary parquet with matrix indicies")
|
||||
tempdir = prep_tfidf_entries(tfidf, 'author', min_df, included_subreddits)
|
||||
tfidf = spark.read.parquet(tempdir.name)
|
||||
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
|
||||
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
|
||||
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
|
||||
spark.stop()
|
||||
|
||||
print("loading matrix")
|
||||
mat = read_tfidf_matrix(tempdir.name,'author')
|
||||
print('computing similarities')
|
||||
sims = column_similarities(mat)
|
||||
del mat
|
||||
|
||||
sims = pd.DataFrame(sims.todense())
|
||||
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
|
||||
sims['subreddit'] = subreddit_names.subreddit.values
|
||||
|
||||
p = Path(outfile)
|
||||
|
||||
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
|
||||
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
|
||||
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
|
||||
|
||||
sims.to_feather(outfile)
|
||||
tempdir.cleanup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
fire.Fire(author_cosine_similarities)
|
||||
@@ -1,61 +0,0 @@
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import Window
|
||||
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
|
||||
import numpy as np
|
||||
import pyarrow
|
||||
import pandas as pd
|
||||
import fire
|
||||
from itertools import islice
|
||||
from pathlib import Path
|
||||
from similarities_helper import prep_tfidf_entries, read_tfidf_matrix, column_similarities, select_topN
|
||||
import scipy
|
||||
|
||||
# outfile='test_similarities_500.feather';
|
||||
# min_df = None;
|
||||
# included_subreddits=None; topN=100; exclude_phrases=True;
|
||||
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
conf = spark.sparkContext.getConf()
|
||||
print(outfile)
|
||||
print(exclude_phrases)
|
||||
|
||||
tfidf = spark.read.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_terms.parquet')
|
||||
|
||||
if included_subreddits is None:
|
||||
included_subreddits = select_topN_subreddits(topN)
|
||||
else:
|
||||
included_subreddits = set(open(included_subreddits))
|
||||
|
||||
if exclude_phrases == True:
|
||||
tfidf = tfidf.filter(~f.col(term).contains("_"))
|
||||
|
||||
print("creating temporary parquet with matrix indicies")
|
||||
tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
|
||||
tfidf = spark.read.parquet(tempdir.name)
|
||||
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
|
||||
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
|
||||
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
|
||||
spark.stop()
|
||||
|
||||
print("loading matrix")
|
||||
mat = read_tfidf_matrix(tempdir.name,'term')
|
||||
print('computing similarities')
|
||||
sims = column_similarities(mat)
|
||||
del mat
|
||||
|
||||
sims = pd.DataFrame(sims.todense())
|
||||
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
|
||||
sims['subreddit'] = subreddit_names.subreddit.values
|
||||
|
||||
p = Path(outfile)
|
||||
|
||||
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
|
||||
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
|
||||
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
|
||||
|
||||
sims.to_feather(outfile)
|
||||
tempdir.cleanup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
fire.Fire(term_cosine_similarities)
|
||||
@@ -1,21 +0,0 @@
|
||||
from pyspark.sql import SparkSession
|
||||
from similarities_helper import build_tfidf_dataset
|
||||
import pandas as pd
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
# remove [deleted] and AutoModerator (TODO remove other bots)
|
||||
df = df.filter(df.author != '[deleted]')
|
||||
df = df.filter(df.author != 'AutoModerator')
|
||||
|
||||
df = build_tfidf_dataset(df, include_subs, 'author')
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf/subreddit_comment_authors.parquet',mode='overwrite',compression='snappy')
|
||||
|
||||
spark.stop()
|
||||
@@ -1,21 +0,0 @@
|
||||
from pyspark.sql import SparkSession
|
||||
from similarities_helper import build_weekly_tfidf_dataset
|
||||
import pandas as pd
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
# remove [deleted] and AutoModerator (TODO remove other bots)
|
||||
df = df.filter(df.author != '[deleted]')
|
||||
df = df.filter(df.author != 'AutoModerator')
|
||||
|
||||
df = build_weekly_tfidf_dataset(df, include_subs, 'author')
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet', mode='overwrite', compression='snappy')
|
||||
|
||||
spark.stop()
|
||||
@@ -1,18 +0,0 @@
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import Window
|
||||
from similarities_helper import build_tfidf_dataset
|
||||
|
||||
## TODO:need to exclude automoderator / bot posts.
|
||||
## TODO:need to exclude better handle hyperlinks.
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
df = build_tfidf_dataset(df, include_subs, 'term')
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/reddit_similarity/subreddit_terms.parquet',mode='overwrite',compression='snappy')
|
||||
spark.stop()
|
||||
@@ -1,27 +0,0 @@
|
||||
from pyspark.sql import functions as f
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import Window
|
||||
from similarities_helper import build_weekly_tfidf_dataset
|
||||
import pandas as pd
|
||||
|
||||
|
||||
## TODO:need to exclude automoderator / bot posts.
|
||||
## TODO:need to exclude better handle hyperlinks.
|
||||
|
||||
spark = SparkSession.builder.getOrCreate()
|
||||
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
|
||||
|
||||
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
|
||||
|
||||
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
|
||||
|
||||
# remove [deleted] and AutoModerator (TODO remove other bots)
|
||||
# df = df.filter(df.author != '[deleted]')
|
||||
# df = df.filter(df.author != 'AutoModerator')
|
||||
|
||||
df = build_weekly_tfidf_dataset(df, include_subs, 'term')
|
||||
|
||||
|
||||
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
|
||||
spark.stop()
|
||||
|
||||
Reference in New Issue
Block a user