18
0

32 Commits

Author SHA1 Message Date
33150243cd datasets/: split parquet scripts; share logic in dumps_helper.py
Follows the helper-module pattern used in similarities/. Replaces
parquet_part1.py and parquet_part2.py (the merged single-file versions
from the previous commit) with:

- dumps_helper.py — schemas, simdjson parser, a generic parse_record
  loop with per-field handler dispatch, and parse_dump / gen_task_list
  / sort_and_write workers. The only per-type code is the field-handler
  dicts and the type-config dicts (COMMENTS, SUBMISSIONS) at the top.
- comments_part1.py, submissions_part1.py — thin Part 1 entry points
  with fire CLIs (parse_dump, gen_task_list).
- comments_part2.py, submissions_part2.py — thin Part 2 entry points
  for the Spark sort. pyspark is imported lazily inside sort_and_write
  so Part 1 callers don't pay the import cost.

Unifies on simdjson for both types (drops the json import), which is
faster on the comments dumps. Field-handler dicts make adding a new
type or field a one-place edit.

Also fixes a latent bug in the original: the FIELDS lists didn't
include time_edited (only the schema did), so error-path rows were
short by one element vs. the schema and would have failed pandas /
pyarrow alignment for any row that hit a JSON parse error. The new
FIELDS lists match the schemas exactly, and the _edited handler
returns a (edited, time_edited) tuple that the generic parse loop
expands.

Runners and README updated for the new CLIs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 16:51:41 -07:00
8965a251b6 refactor datasets/ pipeline; add build/add-month workflows
Replace the four per-type scripts (comments/submissions x part1/part2)
with two merged scripts that share all of their plumbing — only the
schema and JSON parser differ between types. Drop the per-source part
rolling; one parquet per input zst, since Spark handles big parquet
files via internal row groups.

Add two thin runner scripts for the two common workflows:
build_from_scratch.sh wipes the temp dirs and processes everything,
add_new_month.sh takes YYYY-MM and parses just that month before
re-running the Spark sort. Every step in the runners is a separate
command so individual stages can be copied out and run standalone
for debugging.

Also fixes several lurking bugs in the original code: the hardcoded
/gscratch/comdata/users/nathante/ output path in comments Part 2;
the df2 = df.sortWithinPartitions typo in submissions Part 2 that
threw away the preceding global sort; references to a missing
parse_submissions.sh in the old .sh runners; and the asymmetry where
comments_2_parquet_part1.py wasn't per-file/fire-driven the way
submissions_2_parquet_part1.py was.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 16:30:54 -07:00
d201930951 rewrite README, remove dead pushshift scripts and old/
Pushshift's files.pushshift.io archive is gone since Reddit cut off
third-party API access in 2023, so the dumps/ pull and SHA-check scripts
no longer work. The old/ directory of pre-refactor scripts was likewise
superseded by current versions in similarities/.

README rewritten to credit Nate as original developer, name current
maintainers, document the directory layout, point at the CDSC wiki for
the ArcticShift/torrent-based workflow, fix several stale script paths,
and correct an incorrect tf-normalization formula (max, not sum).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 15:53:33 -07:00
53f5b8c03c add note to try other tf normalization strategies. 2022-03-31 12:17:16 -07:00
14ab979f59 Merge branch 'master' of code:cdsc_reddit 2021-08-03 15:03:40 -07:00
Nate E TeBlunthuis
c6122bb429 Merge branch 'master' of code:cdsc_reddit 2021-07-28 15:32:21 -07:00
Nate E TeBlunthuis
596e1ff339 no longer do we need to get daily dumps 2021-07-28 15:32:04 -07:00
Nate E TeBlunthuis
6a3bfa26ee bugfix 2021-04-26 22:31:05 -07:00
Nate E TeBlunthuis
3a758f1fc8 Merge branch 'charliepatch' of code:cdsc_reddit into charliepatch 2021-04-26 13:58:25 -07:00
Nate E TeBlunthuis
806cfc948f support passing in list of tfidf vectors.
Also lowercases included subreddits.
2021-04-26 13:20:43 -07:00
Nate E TeBlunthuis
0fe120e4ab support passing in list of tfidf vectors.
Also lowercases included subreddits.
2021-04-26 11:44:56 -07:00
Nate E TeBlunthuis
f20365c07e Merge branch 'master' of code:cdsc_reddit 2021-04-22 10:46:26 -07:00
Nate E TeBlunthuis
34e0a0a30d version of weekly_cosine_similarities.py from klone 2021-04-22 10:38:10 -07:00
Nate E TeBlunthuis
003a48aea5 bugfix in weekly similarities 2021-04-22 10:37:04 -07:00
Nate E TeBlunthuis
37dd0ef55f bugfixes in clustering selection. 2021-04-21 16:56:25 -07:00
Nate E TeBlunthuis
ac06a8757a calculate some user-level attributes to detect bots 2021-04-20 11:34:36 -07:00
Nate E TeBlunthuis
01a4c35358 grid sweep selection for clustering hyperparameters 2021-04-20 11:33:54 -07:00
Nate E TeBlunthuis
628a70734b Merge branch 'master' of code:cdsc_reddit 2021-04-05 23:21:35 -07:00
Nate E TeBlunthuis
f0176d9f0d Changes for cosine similarities on klone. 2021-04-05 23:21:06 -07:00
Nate E TeBlunthuis
36cb0a5546 add code for pulling activity time series from parquet. 2021-03-24 16:08:57 -07:00
Nate E TeBlunthuis
06430903f0 add included_subreddits parameter to cosine similarities. 2021-02-22 18:38:34 -08:00
Nate E TeBlunthuis
4dc949de5f Changes from hyak. 2021-02-22 16:03:48 -08:00
Nate E TeBlunthuis
140d1bdd17 fix bug in viz. 2021-01-27 20:26:15 -08:00
Nate E TeBlunthuis
554660275f add visualization for 10000 subreddits based on author-tf similarities. 2021-01-27 20:22:24 -08:00
Nate E TeBlunthuis
b4dd9acbd8 Merge branch 'master' of code:cdsc_reddit 2021-01-27 20:09:23 -08:00
dbe4c87f8b add cluster selection to visualization 2021-01-27 20:08:07 -08:00
Nate E TeBlunthuis
3155600514 remove nsfw subs from topN 2020-12-28 21:11:44 -08:00
Nate E TeBlunthuis
4e20dce188 Updating to support wang-style user overlaps. 2020-12-24 22:38:04 -08:00
Nate E TeBlunthuis
56269deee3 Some improvements to run affinity clustering on larger dataset and
compute density.
2020-12-12 20:42:47 -08:00
Nate E TeBlunthuis
e6294b5b90 Refactor and reorganze. 2020-12-08 17:32:20 -08:00
Nate E TeBlunthuis
a60747292e Add code for running tf-idf at the weekly level. 2020-12-01 22:54:48 -08:00
db5879d6c9 refactor visualization code. 2020-11-17 16:46:49 -08:00
58 changed files with 2088 additions and 941 deletions

246
README.md
View File

@@ -2,51 +2,111 @@
title: Utilities for Reddit Data Science
---
`cdsc_reddit` is a collection of tools for working with Reddit data on the
Hyak super computing system at the University of Washington. It is built
around [PySpark](https://spark.apache.org/docs/latest/api/python/index.html)
and [pyarrow](https://arrow.apache.org/docs/python/) so that the underlying
pipelines scale to the full Pushshift archive.
The reddit_cdsc project contains tools for working with Reddit data. The project is designed for the hyak super computing system at The University of Washington. It consists of a set of python and bash scripts and uses the [Pyspark](https://spark.apache.org/docs/latest/api/python/index.html "Pyspark documentation") and [pyarrow](https://arrow.apache.org/docs/python/ "documentation of python arrow bindings") to process large datasets. As of November 1st 2020, the project is under active development by [Nate TeBlunthuis](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Washington.29 "Nate's profile on the Community Data Science Collective Wiki") and provides scripts for:
The project was originally developed by [Nate
TeBlunthuis](https://wiki.communitydata.science/People#Nathan_TeBlunthuis_.28University_of_Texas_at_Austin.29)
and is now maintained by a rotating set of researchers in the Community
Data Science Collective, including Benjamin Mako Hill, Madelyn Douglas, and
others.
- Pulling and updating dumps from [Pushshift](https://pushshift.io "Pushshift.io") in `pull_pushshift_comments.sh` and `pull_pushshift_submissions.sh`.
- Uncompressing and parsing the dumps into [Parquet](https://parquet.apache.org/ "apahce parquet website") [datasets](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets "Wikilink to documentation on the Reddit parquet datasets").
- Running text analysis based on [TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf "Wikipedia article on tf-idf") including
- Extracting terms from Reddit comments in `tf_comments.py`
- Detecting common phrases based on [Pointwise mutual information](https://en.wikipedia.org/wiki/Pointwise_mutual_information) "Wikipedia article on pointwise mutual information")
- Building TF-IDF vectors for each subreddit `idf_comments.py` and (more experimentally) at the subreddit-week level `idf_comments_weekly.py`
- Computing cosine similarities between subreddits based on TF-IDF `term_cosine_similarity.py`.
At a high level, the codebase covers four kinds of work:
Right now, two steps are still in earlier stages of progress:
- **Ingest.** Turning Pushshift comment and submission dumps into
partitioned Parquet datasets that are fast to query by subreddit or by
author.
- **Text features.** Building per-subreddit TF-IDF vectors over comment
text, including a phrase-detection pass based on pointwise mutual
information.
- **Similarity, clustering, and density.** Computing cosine similarities
between subreddits (by terms or by overlapping authors), clustering the
resulting similarity matrices, and summarizing how dense each
neighborhood is.
- **Time series and visualization.** Pulling activity time series per
subreddit and producing t-SNE plots of the clustering output.
- Approach comparable to tf-idf for similarity between subreddits in terms of comment authors.
- Clustering subreddits based on cosine-similarities using [power iteration clustering (PIC)](http://www.cs.cmu.edu/~wcohen/postscript/icml2010-pic-final.pdf "Paper on power iteration clustering")
Several pieces are still rough — the user interfaces for many of the
scripts assume familiarity with the project, and the TF-IDF pipeline does
not yet strip hyperlinks or bot comments, so subreddits with similar
automod messages can look misleadingly similar.
The TF-IDF for comments still has some kinks to iron out to remove hyper links and bot comments. Right now subreddits that have similar automoderation messages appear very similar.
## Repository layout
The user interfaces for most of the scripts are pretty crappy and need to be refined for re-use by others.
| Directory | What's in it |
|---|---|
| `datasets/` | Scripts that convert the raw dumps into partitioned, sorted Parquet datasets. |
| `ngrams/` | Term extraction from comments, phrase detection via PMI, and supporting batch scripts. |
| `similarities/` | TF-IDF construction and cosine-similarity computation, for both terms and authors, including a weekly variant. |
| `clustering/` | Affinity-propagation clustering of the similarity matrices and t-SNE fits for visualization. |
| `density/` | Per-subreddit overlap density measures derived from the similarity matrices. |
| `timeseries/` | Per-subreddit activity time series, plus tooling for choosing among clustering runs. |
| `visualization/` | Altair-based interactive plots of subreddit clusters. |
| `bots/` | Heuristics for flagging likely bot accounts. |
| `examples/` | Small standalone examples using pyarrow. |
## Pulling data from [Pushshift](https://pushshift.io "Pushshift.io") ##
## Sourcing the dumps
- `pull_pushshift_comments.sh` uses wget to download comment dumps to `/gscratch/comdata/raw_data/reddit_dumps/comments`. It doesn't download files that already exists and runs `check_comments_shas.sh` to verify the files downloaded correctly.
Pushshift was effectively wound down after Reddit cut off third-party API
access in 2023, and the original `files.pushshift.io` archive is gone.
Collection of new Reddit comment and submission data has since been
picked up by [ArcticShift](https://github.com/ArthurHeitmann/arctic_shift),
which publishes both the historical Pushshift archive and the new data
it continues to collect, with monthly updates redistributed as academic
torrents by Reddit users `u/Watchful1` and `u/RaiderBDev`. Fetching the
dumps from a torrent client is a manual prerequisite to running the rest
of this pipeline; step-by-step instructions for the current CDSC
workflow — including which torrents to pull and how to stage the `.zst`
files on Hyak — live on the CDSC wiki at
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit).
The earlier `dumps/` directory of `pull_pushshift_*.sh` and SHA-check
scripts has been removed since the URLs they pointed at no longer
resolve.
- `pull_pushshift_submissions.sh` does the same for submissions and puts them in `/gscratch/comdata/raw_data/reddit_dumps/comments`.
## Building Parquet datasets
## Building Parquet Datasets ##
The raw dumps are huge compressed JSON files with a lot of metadata that
we usually don't need. They aren't indexed, so it's expensive to pull data
for just a handful of subreddits, and they are awkward to read directly
into Spark. Extracting the useful fields and rewriting the data as
Parquet makes everything downstream cheaper. The conversion happens in
two steps:
Pushshift dumps are huge compressed json files with a lot of metadata that we may not need. It isn't indexed so it's expensive to pull data from just a handful of subreddits. It also turns out that it's a pain to read these compressed files straight into spark. Extracting useful variables from the dumps and building parquet datasets will make them easier to work with. This happens in two steps:
1. Extracting JSON into temporary, unpartitioned Parquet files using
pyarrow (`comments_2_parquet_part1.py`,
`submissions_2_parquet_part1.py`).
2. Repartitioning and sorting the data using PySpark
(`comments_2_parquet_part2.py`, `submissions_2_parquet_part2.py`).
1. Extracting json into (temporary, unpartitioned) parquet files using pyarrow.
2. Repartitioning and sorting the data using pyspark.
The final datasets live in `/gscratch/comdata/output/`:
The final datasets are in `/gscratch/comdata/output.`
- `reddit_comments_by_author.parquet` — comments partitioned and sorted by
author (lowercase).
- `reddit_comments_by_subreddit.parquet` — comments partitioned and sorted
by subreddit (lowercase).
- `reddit_submissions_by_author.parquet` — submissions partitioned and
sorted by author (lowercase).
- `reddit_submissions_by_subreddit.parquet` — submissions partitioned and
sorted by subreddit (lowercase).
- `reddit_comments_by_author.parquet` has comments partitioned and sorted by username (lowercase).
- `reddit_comments_by_subreddit.parquet` has comments partitioned and sorted by subreddit name (lowercase).
- `reddit_submissions_by_author.parquet` has submissions partitioned and sorted by username (lowercase).
- `reddit_submissions_by_subreddit.parquet` has submissions partitioned and sorted by subreddit name (lowercase).
Splitting the work this way lets us decompress and parse the dumps in the
Hyak backfill queue and then sort them in Spark. Partitioning makes it
possible to read data for specific subreddits or authors efficiently, and
sorting makes per-subreddit or per-user aggregations cheap. More
documentation on using these files lives on the [CDSC
wiki](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets).
Breaking this down into two steps is useful because it allows us to decompress and parse the dumps in the backfill queue and then sort them in spark. Partitioning the data makes it possible to efficiently read data for specific subreddits or authors. Sorting it means that you can efficiently compute agreggations at the subreddit or user level. More documentation on using these files is available [here](https://wiki.communitydata.science/CommunityData:Hyak_Datasets#Reading_Reddit_parquet_datasets "Wikilink to documentation on the Reddit parquet datasets").
## TF-IDF subreddit similarity
## TF-IDF Subreddit Similarity ##
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf "Wikipedia article on tf-idf") is common and simple information retrieval technique that we can use to quantify the topic of a subreddit. The goal of TF-IDF is to build a vector for each subreddit that scores every term (or phrase) according to how characteristic it is of the overall lexicon used in that subreddit. For example, the most characteristic terms in the subreddit /r/christianity in the current version of the TF-IDF model are:
[TF-IDF](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) is a simple
information-retrieval technique we use to quantify the topic of a
subreddit. The goal is to build a vector for each subreddit that scores
every term (or phrase) according to how characteristic it is of the
lexicon used there. For example, the most characteristic terms in
`/r/christianity` in the current model are:
| Term | tf_idf |
|:------------:|:------:|
@@ -56,61 +116,121 @@ Breaking this down into two steps is useful because it allows us to decompress a
| bible | 0.557 |
| scripture | 0.55 |
TF-IDF stands for "term frequency - inverse document frequency" because it is the product of two terms "term frequency" and "inverse document frequency." Term frequency quantifies the amount that a term appears in a subreddit (document). Inverse document frequency quantifies how much that term appears in other subreddits (documents). As you can see on the Wikipedia page, there are many possible ways of constructing and combining these terms.
TF-IDF is the product of two pieces: *term frequency* (how often a term
appears in a subreddit) and *inverse document frequency* (how rare the
term is across other subreddits). There are many ways to construct and
combine these; the [Wikipedia
page](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) catalogs the common
variants.
$x + y = z_{1,d}$
We normalize term frequency by the maximum raw term frequency for each
subreddit:
I chose to normalize term frequency by the maximum (raw) term frequency for each subreddit:
$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\sum_{t^{'} \in d}{f_{t^{'},d}}}$
$$\mathrm{tf}_{t,d} = \frac{f_{t,d}}{\max_{t^{'} \in d}{f_{t^{'},d}}}$$
I use the log inverse document frequency:
$\mathrm{idf}_{t} = log\frac{N}{| {d \in D : t \in d} |}$
and use the log inverse document frequency:
I then combine them using some smoothing to get:
$$\mathrm{idf}_{t} = \log\frac{N}{|\{d \in D : t \in d\}|}$$
$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$
combined with a smoothing term:
### Building TF-IDF vectors ###
$$\mathrm{tfidf}_{t,d} = (0.5 + 0.5 \cdot \mathrm{tf}_{t,d}) \cdot \mathrm{idf}_{t}$$
The process for building TF-IDF vectors has four steps:
(Other normalization strategies are worth trying — see the note in
`similarities/TODO`.)
1. Extracting terms using `tf_comments.py`
2. Detecting common phrases using `top_comment_phrases.py`
3. Extracting terms and common phrases using `tf_comments.py --mwe-pass='second'`
4. Building idf and tf-idf scores in `idf_comments.py`
### Building TF-IDF vectors
#### Running `tf_comments.py` on the backfill queue ####
The pipeline has four steps:
The main reason that I did it in 4 steps instead of one is to take advantage of the backfill queue for running `tf_comments.py`. This step requires reading all of the text in every comment and converting it to a bag of words at the subreddit-level. This is a lot of computation that is easily parallelizable. The script `run_tf_jobs.sh` partially automates running steps 1 (or 3) on the backfill queue.
1. Extract terms with `ngrams/tf_comments.py`.
2. Detect common phrases with `ngrams/top_comment_phrases.py`.
3. Re-extract terms together with detected phrases via
`ngrams/tf_comments.py --mwe-pass=second`.
4. Compute IDF and TF-IDF scores in `similarities/tfidf.py`.
#### Phrase detection using Pointwise Mutual Information ####
#### Running `tf_comments.py` on the backfill queue
TF-IDF is simple, but only uses single words (unigrams). Sequences of multiple words can be important to account for how words have different meanings in different contexts or how sequences of words refer to distinct things like names. Dealing with context or longer sequences of words is a common challenge in natural language processing since the number of possible n-grams grows like crazy as n gets bigger. Phrase detection helps this problem by limiting the set of n-grams to those most informative.
The main reason for the four-step layout is that `tf_comments.py` is
trivially parallel — it reads every comment and rewrites each subreddit
as a bag of words — so it benefits from being farmed out to the Hyak
backfill queue. `ngrams/run_tf_jobs.sh` partially automates the dispatch.
But how do we detect phrases? I implemented [Pointwise mutual information](https://en.wikipedia.org/wiki/Pointwise_mutual_information) "Wikipedia article on pointwise mutual information"), which is a pretty simple way, but seems to work pretty well.
#### Phrase detection using pointwise mutual information
PMI is an quantity derived from information theory. The intuition is that if two words occur together quite frequently compared to how often they appear separately then the cooccurrance is likely to be informative.
TF-IDF over unigrams misses the fact that sequences of words often carry
distinct meaning (names, fixed expressions, in-jokes). Considering every
possible n-gram is prohibitive because the candidate set explodes with
`n`, so we use phrase detection to limit ourselves to informative
n-grams.
$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}.$
We use [pointwise mutual
information](https://en.wikipedia.org/wiki/Pointwise_mutual_information)
(PMI), which is simple and works well in practice. The intuition is that
if two words co-occur much more often than their marginal frequencies
would predict, the pair is probably meaningful:
In `tf_comments.py` if `--mwe-pass=first` then a 10\% sample of 1-4-grams (sequences of terms up to length 4) will be written to a file to be consumed by `top_comment_phrases.py`. `top_comment_phrases.py` computes the PMI for these possible phrases and writes those that occur at least 3500 times in the sample of n-grams and have a PWMI of at least 3 (about 65000 expressions).
$$\operatorname{pmi}(x;y) \equiv \log\frac{p(x,y)}{p(x)\,p(y)} = \log\frac{p(x|y)}{p(x)} = \log\frac{p(y|x)}{p(y)}$$
`tf_comments.py --mwe-pass=second` then uses the detected phrases and adds them to the term frequency data.
When `tf_comments.py` is run with `--mwe-pass=first`, it writes a 10%
sample of 1- to 4-grams to a file. `top_comment_phrases.py` then
computes PMI over that sample and keeps phrases that occur at least
3,500 times and have PMI of at least 3 — roughly 65,000 expressions.
A second pass of `tf_comments.py --mwe-pass=second` folds those phrases
back into the term-frequency data.
### Cosine Similarity ###
### Cosine similarity
Once the tf-idf vectors are built, making a similarity score between two subreddits is straightforward using cosine similarity.
Once the TF-IDF vectors are built, computing a similarity score between
two subreddits is straightforward with cosine similarity:
$\text{similarity} = \cos(\theta) = {\mathbf{A} \cdot \mathbf{B} \over \|\mathbf{A}\| \|\mathbf{B}\|} = \frac{ \sum\limits_{i=1}^{n}{A_i B_i} }{ \sqrt{\sum\limits_{i=1}^{n}{A_i^2}} \sqrt{\sum\limits_{i=1}^{n}{B_i^2}} }$
$$\text{similarity} = \cos(\theta) = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\|\,\|\mathbf{B}\|} = \frac{\sum_{i=1}^{n}{A_i B_i}}{\sqrt{\sum_{i=1}^{n}{A_i^2}}\,\sqrt{\sum_{i=1}^{n}{B_i^2}}}$$
Intuitively, we represent two subreddits as lines in a high-dimensional space (tf-idf vectors).
In linear algebra, the dot product ($\cdot$) between two vectors takes their weighted sum (e.g. linear regression is a dot product of a vector of covariates and a vector of weights).
The vectors might have different lengths like if one subreddit has words in comments than the other, so in cosine similarity the dot product is normalized by the magnitude (lengths) of the vectors.
It turns out that this is equivalent to taking the cosine of the two vectors. So cosine similarity in essence quantifies the angle between the two lines in high-dimensional space. If the cosine similarity between two subreddits is greater then their tf-idf vectors are more correlated.
Each subreddit is a vector in a high-dimensional term space. The dot
product gives a weighted sum of shared terms, and dividing by the
vector magnitudes removes the effect of differing vocabulary size — what
remains is the cosine of the angle between the two vectors. Cosine
similarity with TF-IDF is popular (and has been used on Reddit several
times in prior research) because it captures correlation between the
*most characteristic* terms of two communities.
Cosine similarity with tf-idf is popular (indeed it has been applied to Reddit in research several times before) because it quantifies the correlation between the most characteristic terms for two communities.
Compared to approaches based on word embeddings or topic models, this
method can struggle with polysemy, synonymy, and correlations between
related terms. Phrase detection helps a little. The trade-off is
simplicity and scalability. Adding [latent semantic
analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis) as an
intermediate step is on the wish-list for improving on raw TF-IDF
similarities.
Compared to other approach to similarity like those using word embeddings or topic models it may struggle to handle polysemy, synonymy, or correlations between different terms. Using phrase detection helps with this a little bit. The advantages of this approach are simplicity and scalability. I'm thinking about using [Latent Semantic Analysis](https://en.wikipedia.org/wiki/Latent_semantic_analysis "Wikipedia article on Latent semantic analysis") as an intermediate step to improve upon similarities based on raw tf-idfs.
Even with these simplifications, similarity between a large number of
subreddits is expensive — naively $n^2$ dot-products. Passing
`--similarity-threshold=X` (with `X>0`) to the similarity scripts lets
Spark's built-in matrix library use the DIMSUM approximation, which is
the same algorithm Twitter and Google have used for large-scale
similarity scoring.
Even still, computing similarities between a large number of subreddits is computationally expensive and requires $n^2$ dot-product evaluations.
This can be sped up by passing `similarity-threshold=X` where $X>0$ into `term_comment_similarity.py`. I used a cosine similarity function that's built into the spark matrix library which supports the `DIMSUM` algorithm for approximating matrix-matrix products. This algorithm is commonly used in industry (i.e. at Twitter, Google) for large-scale similarity scoring.
## Clustering, density, and time series
The similarity matrices feed three follow-on analyses:
- `clustering/clustering.py` clusters a similarity matrix using
affinity propagation; `clustering/selection.py` and
`clustering/fit_tsne.py` are supporting scripts for hyperparameter
selection and 2-D embeddings.
- `density/overlap_density.py` computes a per-subreddit overlap density
measure from the similarity matrix.
- `timeseries/cluster_timeseries.py` and `timeseries/choose_clusters.py`
pull subreddit-level activity time series and join them against
clustering output.
`visualization/tsne_vis.py` renders interactive Altair plots of the
clustering output — see the prebuilt HTML files in `visualization/` for
examples.
## Bot detection
`bots/good_bad_bot.py` computes user-level features (compression rate
of comment text, frequency of self-identification as a bot, etc.) that
are useful for filtering bot accounts out of downstream analyses. This
is preliminary work; nothing in the pipeline currently consumes it
automatically.

View File

@@ -1,79 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
'''
Compute similarities between subreddits based on tfi-idf vectors of author comments
included_subreddits : string
Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
similarity_threshold : double (default = 0)
set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
min_df : int (default = 0.1 * (number of included_subreddits)
exclude terms that appear in fewer than this number of documents.
outfile: string
where to output csv and feather outputs
'''
print(outfile)
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
if included_subreddits is None:
included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
included_subreddits = {s.strip('\n') for s in included_subreddits}
else:
included_subreddits = set(open(included_subreddits))
sim_dist, tfidf = cosine_similarities(tfidf, 'author', min_df, included_subreddits, similarity_threshold)
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sim_dist = sim_dist.entries.toDF()
sim_dist = sim_dist.repartition(1)
sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
#instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
spark.stop()
df['subreddit_id_new'] = df['subreddit_id_new'] - 1
df = df.sort_values('subreddit_id_new').reset_index(drop=True)
df = df.set_index('subreddit_id_new')
similarities = sim_entries.join(df, on='i')
similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
similarities = similarities.join(df, on='j')
similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
similarities.to_feather(output_feather)
similarities.to_csv(output_csv)
return similarities
if __name__ == '__main__':
fire.Fire(author_cosine_similarities)

74
bots/good_bad_bot.py Normal file
View File

@@ -0,0 +1,74 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import FloatType
import zlib
def zlib_entropy_rate(s):
sb = s.encode()
if len(sb) == 0:
return None
else:
return len(zlib.compress(s.encode(),level=6))/len(s.encode())
zlib_entropy_rate_udf = f.udf(zlib_entropy_rate,FloatType())
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet",compression='snappy')
df = df.withColumn("saidbot",f.lower(f.col("body")).like("%bot%"))
# df = df.filter(df.subreddit=='seattle')
# df = df.cache()
botreplies = df.filter(f.lower(df.body).rlike(".*[good|bad] bot.*"))
botreplies = botreplies.select([f.col("parent_id").substr(4,100).alias("bot_comment_id"),f.lower(f.col("body")).alias("good_bad_bot"),f.col("link_id").alias("gbbb_link_id")])
botreplies = botreplies.groupby(['bot_comment_id']).agg(f.count('good_bad_bot').alias("N_goodbad_votes"),
f.sum((f.lower(f.col('good_bad_bot')).like('%good bot%').astype("double"))).alias("n_good_votes"),
f.sum((f.lower(f.col('good_bad_bot')).like('%bad bot%').astype("double"))).alias("n_bad_votes"))
comments_by_author = df.select(['author','id','saidbot']).groupBy('author').agg(f.count('id').alias("N_comments"),
f.mean(f.col('saidbot').astype("double")).alias("prop_saidbot"),
f.sum(f.col('saidbot').astype("double")).alias("n_saidbot"))
# pd_comments_by_author = comments_by_author.toPandas()
# pd_comments_by_author['frac'] = 500 / pd_comments_by_author['N_comments']
# pd_comments_by_author.loc[pd_comments_by_author.frac > 1, 'frac'] = 1
# fractions = pd_comments_by_author.loc[:,['author','frac']]
# fractions = fractions.set_index('author').to_dict()['frac']
# sampled_author_comments = df.sampleBy("author",fractions).groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
df = df.withColumn("randn",f.randn(seed=1968))
win = Window.partitionBy("author").orderBy("randn")
df = df.withColumn("randRank",f.rank().over(win))
sampled_author_comments = df.filter(f.col("randRank") <= 1000)
sampled_author_comments = sampled_author_comments.groupBy('author').agg(f.concat_ws(" ", f.collect_list('body')).alias('comments'))
author_entropy_rates = sampled_author_comments.select(['author',zlib_entropy_rate_udf(f.col('comments')).alias("entropy_rate")])
parents = df.join(botreplies, on=df.id==botreplies.bot_comment_id,how='right_outer')
win1 = Window.partitionBy("author")
parents = parents.withColumn("first_bot_reply",f.min(f.col("CreatedAt")).over(win1))
first_bot_reply = parents.filter(f.col("first_bot_reply")==f.col("CreatedAt"))
first_bot_reply = first_bot_reply.withColumnRenamed("CreatedAt","FB_CreatedAt")
first_bot_reply = first_bot_reply.withColumnRenamed("id","FB_id")
comments_since_first_bot_reply = df.join(first_bot_reply,on = 'author',how='right_outer').filter(f.col("CreatedAt")>=f.col("first_bot_reply"))
comments_since_first_bot_reply = comments_since_first_bot_reply.groupBy("author").agg(f.count("id").alias("N_comments_since_firstbot"))
bots = parents.groupby(['author']).agg(f.sum('N_goodbad_votes').alias("N_goodbad_votes"),
f.sum(f.col('n_good_votes')).alias("n_good_votes"),
f.sum(f.col('n_bad_votes')).alias("n_bad_votes"),
f.count(f.col('author')).alias("N_bot_posts"))
bots = bots.join(comments_by_author,on="author",how='left_outer')
bots = bots.join(comments_since_first_bot_reply,on="author",how='left_outer')
bots = bots.join(author_entropy_rates,on='author',how='left_outer')
bots = bots.orderBy("N_goodbad_votes",ascending=False)
bots = bots.repartition(1)
bots.write.parquet("/gscratch/comdata/output/reddit_good_bad_bot.parquet",mode='overwrite')

View File

@@ -1,33 +0,0 @@
#!/usr/bin/env python3
# run from a build_machine
import requests
from os import path
import hashlib
shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text
shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text
shasums = shasums1 + shasums2
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
for l in shasums.strip().split('\n'):
sha256_hash = hashlib.sha256()
parts = l.split(' ')
correct_sha256 = parts[0]
filename = parts[-1]
print(f"checking {filename}")
fpath = path.join(dumpdir,filename)
if path.isfile(fpath):
with open(fpath,'rb') as f:
for byte_block in iter(lambda: f.read(4096),b""):
sha256_hash.update(byte_block)
if sha256_hash.hexdigest() == correct_sha256:
print(f"{filename} checks out")
else:
print(f"ERROR! {filename} has the wrong hash. Redownload and recheck!")
else:
print(f"Skipping {filename} as it doesn't exist")

View File

@@ -1,31 +0,0 @@
#!/usr/bin/env python3
# run from a build_machine
import requests
from os import path
import hashlib
file1 = requests.get("https://files.pushshift.io/reddit/submissions/sha256sums.txt").text
file2 = requests.get("https://files.pushshift.io/reddit/submissions/old_v1_data/sha256sums.txt").text
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions"
for l in file1.strip().split('\n') + file2.strip().split('\n'):
sha256_hash = hashlib.sha256()
parts = l.split(' ')
correct_sha256 = parts[0]
filename = parts[-1]
print(f"checking {filename}")
fpath = path.join(dumpdir,filename)
if path.isfile(fpath):
with open(fpath,'rb') as f:
for byte_block in iter(lambda: f.read(4096),b""):
sha256_hash.update(byte_block)
if sha256_hash.hexdigest() == correct_sha256:
print(f"{filename} checks out")
else:
print(f"ERROR! {filename} has the wrong hash. Redownload and recheck!")
else:
print(f"Skipping {filename} as it doesn't exist")

55
clustering/Makefile Normal file
View File

@@ -0,0 +1,55 @@
#srun_cdsc='srun -p comdata-int -A comdata --time=300:00:00 --time-min=00:15:00 --mem=100G --ntasks=1 --cpus-per-task=28'
srun_singularity=source /gscratch/comdata/users/nathante/cdsc_reddit/bin/activate && srun_singularity.sh
similarity_data=/gscratch/comdata/output/reddit_similarity
clustering_data=/gscratch/comdata/output/reddit_clustering
selection_grid="--max_iter=3000 --convergence_iter=15,30,100 --damping=0.5,0.6,0.7,0.8,0.85,0.9,0.95,0.97,0.99, --preference_quantile=0.1,0.3,0.5,0.7,0.9"
#selection_grid="--max_iter=3000 --convergence_iter=[15] --preference_quantile=[0.5] --damping=[0.99]"
all:$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
$(clustering_data)/subreddit_comment_authors_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_authors_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_10k.feather $(clustering_data)/subreddit_comment_authors_10k $(clustering_data)/subreddit_comment_authors_10k/selection_data.csv $(selection_grid) -J 20
$(clustering_data)/subreddit_comment_terms_10k/selection_data.csv:selection.py $(similarity_data)/subreddit_comment_terms_10k.feather clustering.py
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_terms_10k.feather $(clustering_data)/subreddit_comment_terms_10k $(clustering_data)/subreddit_comment_terms_10k/selection_data.csv $(selection_grid) -J 20
$(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv:clustering.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather
$(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors-tf_10k.feather $(clustering_data)/subreddit_comment_authors-tf_10k $(clustering_data)/subreddit_comment_authors-tf_10k/selection_data.csv $(selection_grid) -J 20
# $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_authors_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors_30k.feather $(clustering_data)/subreddit_comment_authors_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_authors_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS:selection.py $(similarity_data)/subreddit_comment_terms_30k.feather clustering.py
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_terms_30k.feather $(clustering_data)/subreddit_comment_terms_30k $(selection_grid) -J 10 && touch $(clustering_data)/subreddit_comment_terms_30k.feather/SUCCESS
# $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS:clustering.py $(similarity_data)/subreddit_comment_authors-tf_30k.feather
# $(srun_singularity) python3 selection.py $(similarity_data)/subreddit_comment_authors-tf_30k.feather $(clustering_data)/subreddit_comment_authors-tf_30k $(selection_grid) -J 8 && touch $(clustering_data)/subreddit_authors-tf_similarities_30k.feather/SUCCESS
# $(clustering_data)/subreddit_comment_authors_100k.feather:clustering.py $(similarity_data)/subreddit_comment_authors_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/subreddit_comment_authors_100k.feather $(clustering_data)/subreddit_comment_authors_100k.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.85 --damping=0.85
# $(clustering_data)/comment_terms_100k.feather:clustering.py $(similarity_data)/subreddit_comment_terms_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/comment_terms_10000.feather $(clustering_data)/comment_terms_10000.feather ---max_iter=1000 --convergence_iter=15 --preference_quantile=0.9 --damping=0.5
# $(clustering_data)/subreddit_comment_author-tf_100k.feather:clustering.py $(similarity_data)/subreddit_comment_author-tf_100k.feather
# $(srun_singularity) python3 clustering.py $(similarity_data)/subreddit_comment_author-tf_100k.parquet $(clustering_data)/subreddit_comment_author-tf_100k.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.5 --damping=0.85
# it's pretty difficult to get a result that isn't one huge megacluster. A sign that it's bullcrap
# /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather
# ./clustering.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather ---max_iter=400 --convergence_iter=15 --preference_quantile=0.9 --damping=0.85
# /gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather:fit_tsne.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
# start_spark_and_run.sh 1 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet --output=/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather
# /gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather:fit_tsne.py /gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather
# python3 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather --output=/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather
# /gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather:clustering.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
# # $srun_cdsc python3
# start_spark_and_run.sh 1 fit_tsne.py --similarities=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --output=/gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather

33
clustering.py → clustering/clustering.py Normal file → Executable file
View File

@@ -1,27 +1,43 @@
#!/usr/bin/env python3
# TODO: replace prints with logging.
import sys
import pandas as pd
import numpy as np
from sklearn.cluster import AffinityPropagation
import fire
from pathlib import Path
def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968):
def read_similarity_mat(similarities, use_threads=True):
df = pd.read_feather(similarities, use_threads=use_threads)
mat = np.array(df.drop('_subreddit',1))
n = mat.shape[0]
mat[range(n),range(n)] = 1
return (df._subreddit,mat)
def affinity_clustering(similarities, *args, **kwargs):
subreddits, mat = read_similarity_mat(similarities)
return _affinity_clustering(mat, subreddits, *args, **kwargs)
def _affinity_clustering(mat, subreddits, output, damping=0.9, max_iter=100000, convergence_iter=30, preference_quantile=0.5, random_state=1968, verbose=True):
'''
similarities: feather file with a dataframe of similarity scores
preference_quantile: parameter controlling how many clusters to make. higher values = more clusters. 0.85 is a good value with 3000 subreddits.
damping: parameter controlling how iterations are merged. Higher values make convergence faster and more dependable. 0.85 is a good value for the 10000 subreddits by author.
'''
df = pd.read_feather(similarities)
n = df.shape[0]
mat = np.array(df.drop('subreddit',1))
mat[range(n),range(n)] = 1
print(f"damping:{damping}; convergenceIter:{convergence_iter}; preferenceQuantile:{preference_quantile}")
preference = np.quantile(mat,preference_quantile)
print(f"preference is {preference}")
print("data loaded")
sys.stdout.flush()
clustering = AffinityPropagation(damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
copy=False,
preference=preference,
affinity='precomputed',
verbose=verbose,
random_state=random_state).fit(mat)
@@ -30,7 +46,7 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv
print(f"found {len(set(clusters))} clusters")
cluster_data = pd.DataFrame({'subreddit': df.subreddit,'cluster':clustering.labels_})
cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
cluster_sizes = cluster_data.groupby("cluster").count()
print(f"the largest cluster has {cluster_sizes.subreddit.max()} members")
@@ -39,7 +55,10 @@ def affinity_clustering(similarities, output, damping=0.5, max_iter=100000, conv
print(f"{(cluster_sizes.subreddit==1).sum()} clusters have 1 member")
sys.stdout.flush()
cluster_data.to_feather(output)
print(f"saved {output}")
return clustering
if __name__ == "__main__":
fire.Fire(affinity_clustering)

View File

@@ -5,7 +5,7 @@ from numpy import random
import numpy as np
from sklearn.manifold import TSNE
similarities = "term_similarities_10000.feather"
similarities = "/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet"
def fit_tsne(similarities, output, learning_rate=750, perplexity=50, n_iter=10000, early_exaggeration=20):
'''

101
clustering/selection.py Normal file
View File

@@ -0,0 +1,101 @@
from sklearn.metrics import silhouette_score
from sklearn.cluster import AffinityPropagation
from functools import partial
from clustering import _affinity_clustering, read_similarity_mat
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count, Array, Process
from pathlib import Path
from itertools import product, starmap
import numpy as np
import pandas as pd
import fire
import sys
# silhouette is the only one that doesn't need the feature matrix. So it's probably the only one that's worth trying.
@dataclass
class clustering_result:
outpath:Path
damping:float
max_iter:int
convergence_iter:int
preference_quantile:float
silhouette_score:float
alt_silhouette_score:float
name:str
def sim_to_dist(mat):
dist = 1-mat
dist[dist < 0] = 0
np.fill_diagonal(dist,0)
return dist
def do_clustering(damping, convergence_iter, preference_quantile, name, mat, subreddits, max_iter, outdir:Path, random_state, verbose, alt_mat, overwrite=False):
if name is None:
name = f"damping-{damping}_convergenceIter-{convergence_iter}_preferenceQuantile-{preference_quantile}"
print(name)
sys.stdout.flush()
outpath = outdir / (str(name) + ".feather")
print(outpath)
clustering = _affinity_clustering(mat, subreddits, outpath, damping, max_iter, convergence_iter, preference_quantile, random_state, verbose)
mat = sim_to_dist(clustering.affinity_matrix_)
score = silhouette_score(mat, clustering.labels_, metric='precomputed')
if alt_mat is not None:
alt_distances = sim_to_dist(alt_mat)
alt_score = silhouette_score(alt_mat, clustering.labels_, metric='precomputed')
res = clustering_result(outpath=outpath,
damping=damping,
max_iter=max_iter,
convergence_iter=convergence_iter,
preference_quantile=preference_quantile,
silhouette_score=score,
alt_silhouette_score=score,
name=str(name))
return res
# alt similiarities is for checking the silhouette coefficient of an alternative measure of similarity (e.g., topic similarities for user clustering).
def select_affinity_clustering(similarities, outdir, outinfo, damping=[0.9], max_iter=100000, convergence_iter=[30], preference_quantile=[0.5], random_state=1968, verbose=True, alt_similarities=None, J=None):
damping = list(map(float,damping))
convergence_iter = convergence_iter = list(map(int,convergence_iter))
preference_quantile = list(map(float,preference_quantile))
if type(outdir) is str:
outdir = Path(outdir)
outdir.mkdir(parents=True,exist_ok=True)
subreddits, mat = read_similarity_mat(similarities,use_threads=True)
if alt_similarities is not None:
alt_mat = read_similarity_mat(alt_similarities,use_threads=True)
else:
alt_mat = None
if J is None:
J = cpu_count()
pool = Pool(J)
# get list of tuples: the combinations of hyperparameters
hyper_grid = product(damping, convergence_iter, preference_quantile)
hyper_grid = (t + (str(i),) for i, t in enumerate(hyper_grid))
_do_clustering = partial(do_clustering, mat=mat, subreddits=subreddits, outdir=outdir, max_iter=max_iter, random_state=random_state, verbose=verbose, alt_mat=alt_mat)
# similarities = Array('d', mat)
# call pool.starmap
print("running clustering selection")
clustering_data = pool.starmap(_do_clustering, hyper_grid)
clustering_data = pd.DataFrame(list(clustering_data))
clustering_data.to_csv(outinfo)
return clustering_data
if __name__ == "__main__":
x = fire.Fire(select_affinity_clustering)

View File

@@ -1,10 +0,0 @@
## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh
#echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
start_spark_and_run.sh 1 $(pwd)/comments_2_parquet_part2.py

View File

@@ -1,115 +0,0 @@
#!/usr/bin/env python3
import json
from datetime import datetime
from multiprocessing import Pool
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
def parse_comment(comment, names= None):
if names is None:
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
try:
comment = json.loads(comment)
except json.decoder.JSONDecodeError as e:
print(e)
print(comment)
row = [None for _ in names]
row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,comment)
return tuple(row)
row = []
for name in names:
if name == 'created_utc':
row.append(datetime.fromtimestamp(int(comment['created_utc']),tz=None))
elif name == 'edited':
val = comment[name]
if type(val) == bool:
row.append(val)
row.append(None)
else:
row.append(True)
row.append(datetime.fromtimestamp(int(val),tz=None))
elif name == "time_edited":
continue
elif name not in comment:
row.append(None)
else:
row.append(comment[name])
return tuple(row)
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
pool = Pool(28)
stream = open_fileset(files)
N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
schema = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('link_id', pa.string(), nullable=True),
pa.field('parent_id', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('is_submitter', pa.bool_(), nullable=True),
pa.field('body', pa.string(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
from pathlib import Path
p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
if not p.is_dir():
if p.exists():
p.unlink()
p.mkdir()
else:
list(map(Path.unlink,p.glob('*')))
part_size = int(1e7)
part = 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
while True:
if n_output > part_size:
if part > 1:
writer.close()
part = part + 1
n_output = 0
writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
n_output += N
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)

View File

@@ -1,29 +0,0 @@
#!/usr/bin/env python3
# spark script to make sorted, and partitioned parquet files
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2','subreddit')
df = df.withColumnRenamed("created_utc","CreatedAt")
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')

77
datasets/README.md Normal file
View File

@@ -0,0 +1,77 @@
# Reddit dumps → sorted parquet datasets
This directory holds the pipeline that turns compressed Reddit dump files
(`RC_YYYY-MM.zst` for comments, `RS_YYYY-MM.zst` for submissions) into the
sorted, repartitioned parquet datasets that the rest of the project
consumes.
The pipeline has two stages:
| Stage | What it does |
|---|---|
| Part 1 | Reads one compressed dump and writes one parquet file. Per-file, parallelizable. Runs without Spark. |
| Part 2 | Reads the directory of per-file parquets in Spark, sorts and repartitions by subreddit, then by author, and writes the final `reddit_*_by_*.parquet` datasets. Always re-sorts the full corpus. |
Each stage has a thin entry-point script per dump type:
| Script | Notes |
|---|---|
| `comments_part1.py`, `submissions_part1.py` | Per-file parse. `parse_dump <file>` and `gen_task_list` subcommands via fire. |
| `comments_part2.py`, `submissions_part2.py` | Spark sort. Launched via `start_spark_and_run.sh`. |
| `dumps_helper.py` | Shared module: schemas, simdjson parser, generic parse loop, parse_dump / gen_task_list / sort_and_write workers. The only per-type code is the two field-handler dicts and the configuration dicts at the top. |
## The two workflows
There are two ways to run the pipeline; pick the one that matches your
situation.
### Build from scratch — `build_from_scratch.sh`
Use this when there is no existing parquet output, or when the upstream
data has changed in a way that requires reparsing everything. Wipes the
per-source temp directories, processes every `RC_*` / `RS_*` dump in the
raw dumps directory through Part 1, then runs the Part 2 Spark sort.
### Add a new month — `add_new_month.sh YYYY-MM`
Use this when one or more months of new dump files have arrived and you
just want to bring the existing datasets up to date. Processes only the
specified month's `RC_<MONTH>.zst` and `RS_<MONTH>.zst` files through
Part 1 (the existing per-source parquet files are left in place), then
re-runs the Part 2 Spark sort over the full temp directory so the final
datasets pick up the new data.
The Part 2 sort is global and not incremental, so each monthly add
re-sorts the entire corpus. That's fine for a monthly cadence; it would
need a rearchitecture if the cost became a problem.
## Running steps individually
Both `.sh` runners are written so that every meaningful step is a separate,
self-contained command. If something fails partway through, or you want
to inspect intermediate state, you can copy any single line out of the
runner and execute it standalone. For example:
```sh
# parse one specific file (skipping the rest of the workflow)
python3 comments_part1.py parse_dump RC_2025-03.zst
# override default dump/output paths from the CLI
python3 comments_part1.py parse_dump RC_2025-03.zst \
--dumpdir=/tmp/test --outdir=/tmp/out
# regenerate just the task list
python3 submissions_part1.py gen_task_list
```
The Spark Part 2 step is launched via `start_spark_and_run.sh` (a
Hyak-provided wrapper not included in this repo); see the wiki for the
launch convention.
## See also
The CDSC wiki page
[CommunityData:CDSC_Reddit](https://wiki.communitydata.science/CommunityData:CDSC_Reddit)
documents the surrounding workflow — where the raw dump files come from
(currently ArcticShift via academic torrents), how to stage them on
Hyak, and how to run Spark jobs on the cluster.

48
datasets/add_new_month.sh Executable file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env bash
#
# Add a single new month of dumps to the existing parquet datasets.
#
# Processes only the RC_<month>.zst and RS_<month>.zst files (Part 1),
# leaving the existing per-source temp parquet files untouched, then
# re-runs the Part 2 Spark sort + repartition over the full temp dir so
# the final by_subreddit / by_author datasets pick up the new data.
#
# Usage:
# add_new_month.sh YYYY-MM
#
# Example:
# add_new_month.sh 2025-03
#
# Every command below is independently runnable — to debug, copy a line
# out and run it directly. For a full rebuild instead, see
# build_from_scratch.sh.
#
# Note on cost: Part 2 always re-sorts the full corpus (the sort is global,
# not incremental), so this gets slightly slower each month. For the
# monthly cadence this is fine; if the sort becomes a bottleneck we'd
# need to rearchitect Part 2 to merge-append instead of re-sort.
set -e
cd "$(dirname "$0")"
MONTH="${1:-}"
if [ -z "$MONTH" ]; then
echo "Usage: $0 YYYY-MM" >&2
exit 1
fi
# --- Part 1: parse the new month's dumps (no wipe) -------------------------
# parse the new comments file
python3 comments_part1.py parse_dump "RC_${MONTH}.zst"
# parse the new submissions file
python3 submissions_part1.py parse_dump "RS_${MONTH}.zst"
# --- Part 2: re-sort the full corpus including the new data ---------------
# sort comments and overwrite reddit_comments_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 comments_part2.py
# sort submissions and overwrite reddit_submissions_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 submissions_part2.py

56
datasets/build_from_scratch.sh Executable file
View File

@@ -0,0 +1,56 @@
#!/usr/bin/env bash
#
# Build the sorted, partitioned Reddit parquet datasets from scratch.
#
# Wipes the per-source temp directories, processes every RC_* and RS_* dump
# in the raw_data dumps directory through Part 1 (per-file, parallel), then
# runs the Part 2 Spark sort + repartition for both comments and submissions.
#
# Every command below is independently runnable — to debug a single stage,
# copy the line out and run it directly. Run the whole script end-to-end
# only when you trust each step.
#
# Prerequisites:
# - raw .zst dumps already staged in the dumpdir locations (see the
# defaults in dumps_helper.py, or override via --dumpdir)
# - GNU parallel installed
# - start_spark_and_run.sh on PATH (Hyak-provided wrapper)
#
# To add one new month to an existing build instead of rebuilding from
# scratch, use add_new_month.sh.
set -e
cd "$(dirname "$0")"
TEMP_COMMENTS="/gscratch/comdata/output/temp/reddit_comments.parquet"
TEMP_SUBMISSIONS="/gscratch/comdata/output/temp/reddit_submissions.parquet"
# --- Part 1a: comments ------------------------------------------------------
# wipe any existing comments temp output
rm -rf "$TEMP_COMMENTS"
# generate the per-file parse task list
python3 comments_part1.py gen_task_list
# run all comments parse tasks in parallel
parallel --joblog comments_joblog.txt --results comments_logs < parse_comments_task_list
# --- Part 1b: submissions ---------------------------------------------------
# wipe any existing submissions temp output
rm -rf "$TEMP_SUBMISSIONS"
# generate the per-file parse task list
python3 submissions_part1.py gen_task_list
# run all submissions parse tasks in parallel
parallel --joblog submissions_joblog.txt --results submissions_logs < parse_submissions_task_list
# --- Part 2: spark sort + repartition --------------------------------------
# sort comments and write reddit_comments_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 comments_part2.py
# sort submissions and write reddit_submissions_by_{subreddit,author}.parquet
start_spark_and_run.sh 1 submissions_part2.py

24
datasets/comments_part1.py Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
"""Part 1 for comments: parse one RC_*.zst dump into a parquet file.
CLI:
comments_part1.py parse_dump RC_2018-08.zst
comments_part1.py gen_task_list
comments_part1.py parse_dump RC_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out
"""
import fire
from dumps_helper import COMMENTS, parse_dump, gen_task_list
def _parse_dump(partition, dumpdir=None, outdir=None):
parse_dump(COMMENTS, partition, dumpdir=dumpdir, outdir=outdir)
def _gen_task_list(dumpdir=None, tasklist=None):
gen_task_list(COMMENTS, 'comments_part1.py', dumpdir=dumpdir, tasklist=tasklist)
if __name__ == "__main__":
fire.Fire({'parse_dump': _parse_dump,
'gen_task_list': _gen_task_list})

14
datasets/comments_part2.py Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Part 2 for comments: Spark sort + repartition the per-source parquets
produced by comments_part1.py into the final by_subreddit / by_author
datasets.
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
start_spark_and_run.sh 1 comments_part2.py
"""
from dumps_helper import COMMENTS, sort_and_write
if __name__ == "__main__":
sort_and_write(COMMENTS)

286
datasets/dumps_helper.py Normal file
View File

@@ -0,0 +1,286 @@
"""Shared logic for the comments and submissions dump-to-parquet pipeline.
Used by comments_part1.py / submissions_part1.py (Part 1: one compressed
dump file → one parquet file) and comments_part2.py / submissions_part2.py
(Part 2: Spark sort + repartition of the per-source parquets).
The two dump types only differ in their schemas and a handful of
field-specific extractors. The parse loop, the file I/O wrapping, the
task-list generator, and the Spark sort are all shared here.
"""
import os
from datetime import datetime
from itertools import islice
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import simdjson
from helper import find_dumps, open_fileset
_json = simdjson.Parser()
# --- field-level extractors ------------------------------------------------
def _ts(name):
"""Extractor for a unix-timestamp field (or None if missing)."""
def handler(record):
val = record.get(name)
if val is None:
return None
return datetime.fromtimestamp(int(val), tz=None)
return handler
def _edited(record):
"""Returns (edited, time_edited). The dump packs both into one `edited`
field that is either a bool (never edited / unknown timestamp) or a
unix timestamp."""
val = record.get('edited')
if isinstance(val, bool):
return (val, None)
if val is None:
return (None, None)
return (True, datetime.fromtimestamp(int(val), tz=None))
def _has_media(record):
"""Submissions don't have a `has_media` field directly — derive it."""
return record.get('media') is not None
# --- generic parse loop ----------------------------------------------------
def parse_record(line, fields, handlers):
"""Parse one JSON line into a tuple aligned with `fields`.
`handlers` maps field name → callable(record) returning either a single
value (one column) or a tuple of values (multiple consecutive columns,
consuming the next len(tuple)-1 entries in `fields`).
Fields without a handler are pulled from the record by name, with
missing keys yielding None.
The last field in `fields` is reserved for an error message string
and is set to None on success.
"""
try:
record = _json.parse(line)
except (ValueError, KeyError) as e:
row = [None] * len(fields)
row[-1] = f"parse error|{e}|{line}"
return tuple(row)
row = []
skip_next = 0
for name in fields:
if skip_next > 0:
skip_next -= 1
continue
handler = handlers.get(name)
if handler is None:
try:
row.append(record[name])
except KeyError:
row.append(None)
else:
result = handler(record)
if isinstance(result, tuple):
row.extend(result)
skip_next = len(result) - 1
else:
row.append(result)
return tuple(row)
# --- comments schema -------------------------------------------------------
COMMENT_FIELDS = [
'id', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'author',
'ups', 'downs', 'score', 'edited', 'time_edited', 'subreddit_type',
'subreddit_id', 'stickied', 'is_submitter', 'body', 'error',
]
COMMENT_SCHEMA = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('link_id', pa.string(), nullable=True),
pa.field('parent_id', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('is_submitter', pa.bool_(), nullable=True),
pa.field('body', pa.string(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
COMMENT_HANDLERS = {
'created_utc': _ts('created_utc'),
'edited': _edited,
}
# --- submissions schema ----------------------------------------------------
SUBMISSION_FIELDS = [
'id', 'author', 'subreddit', 'title', 'created_utc', 'permalink', 'url',
'domain', 'score', 'ups', 'downs', 'over_18', 'has_media', 'selftext',
'retrieved_on', 'num_comments', 'gilded', 'edited', 'time_edited',
'subreddit_type', 'subreddit_id', 'subreddit_subscribers', 'name',
'is_self', 'stickied', 'quarantine', 'error',
]
SUBMISSION_SCHEMA = pa.schema([
pa.field('id', pa.string(), nullable=True),
pa.field('author', pa.string(), nullable=True),
pa.field('subreddit', pa.string(), nullable=True),
pa.field('title', pa.string(), nullable=True),
pa.field('created_utc', pa.timestamp('ms'), nullable=True),
pa.field('permalink', pa.string(), nullable=True),
pa.field('url', pa.string(), nullable=True),
pa.field('domain', pa.string(), nullable=True),
pa.field('score', pa.int64(), nullable=True),
pa.field('ups', pa.int64(), nullable=True),
pa.field('downs', pa.int64(), nullable=True),
pa.field('over_18', pa.bool_(), nullable=True),
pa.field('has_media', pa.bool_(), nullable=True),
pa.field('selftext', pa.string(), nullable=True),
pa.field('retrieved_on', pa.timestamp('ms'), nullable=True),
pa.field('num_comments', pa.int64(), nullable=True),
pa.field('gilded', pa.int64(), nullable=True),
pa.field('edited', pa.bool_(), nullable=True),
pa.field('time_edited', pa.timestamp('ms'), nullable=True),
pa.field('subreddit_type', pa.string(), nullable=True),
pa.field('subreddit_id', pa.string(), nullable=True),
pa.field('subreddit_subscribers', pa.int64(), nullable=True),
pa.field('name', pa.string(), nullable=True),
pa.field('is_self', pa.bool_(), nullable=True),
pa.field('stickied', pa.bool_(), nullable=True),
pa.field('quarantine', pa.bool_(), nullable=True),
pa.field('error', pa.string(), nullable=True),
])
SUBMISSION_HANDLERS = {
'created_utc': _ts('created_utc'),
'retrieved_on': _ts('retrieved_on'),
'edited': _edited,
'has_media': _has_media,
}
# --- per-type configuration ------------------------------------------------
# Defaults that the entry-point scripts pass through, exposed here so the
# field/schema/handler triplet, the canonical paths, and the dump filename
# pattern all live in one place.
COMMENTS = {
'fields': COMMENT_FIELDS,
'schema': COMMENT_SCHEMA,
'handlers': COMMENT_HANDLERS,
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/comments",
'outdir': "/gscratch/comdata/output/temp/reddit_comments.parquet",
'file_pattern': 'RC_20*.*',
'task_list': 'parse_comments_task_list',
'output_by_subreddit': "/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",
'output_by_author': "/gscratch/comdata/output/reddit_comments_by_author.parquet",
'subreddit_sort_keys': ["subreddit", "CreatedAt", "link_id", "parent_id", "Year", "Month", "Day"],
'author_sort_keys': ["author", "CreatedAt", "subreddit", "link_id", "parent_id", "Year", "Month", "Day"],
'app_name': "Reddit comments to parquet",
}
SUBMISSIONS = {
'fields': SUBMISSION_FIELDS,
'schema': SUBMISSION_SCHEMA,
'handlers': SUBMISSION_HANDLERS,
'dumpdir': "/gscratch/comdata/raw_data/reddit_dumps/submissions",
'outdir': "/gscratch/comdata/output/temp/reddit_submissions.parquet",
'file_pattern': 'RS_20*.*',
'task_list': 'parse_submissions_task_list',
'output_by_subreddit': "/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet",
'output_by_author': "/gscratch/comdata/output/reddit_submissions_by_author.parquet",
'subreddit_sort_keys': ["subreddit", "CreatedAt", "id"],
'author_sort_keys': ["author", "CreatedAt", "id"],
'app_name': "Reddit submissions to parquet",
}
# --- Part 1: parse one dump file -> one parquet ----------------------------
def parse_dump(config, partition, dumpdir=None, outdir=None, chunk_size=10000):
"""Read one compressed dump from `dumpdir/partition` and write a parquet
file to `outdir/<basename>.parquet`. Streams chunks of `chunk_size`
rows so memory stays bounded."""
dumpdir = dumpdir or config['dumpdir']
outdir = outdir or config['outdir']
schema = config['schema']
fields = config['fields']
handlers = config['handlers']
stream = open_fileset([os.path.join(dumpdir, partition)])
rows = (parse_record(line, fields, handlers) for line in stream)
os.makedirs(outdir, exist_ok=True)
outfile = os.path.join(outdir, os.path.splitext(partition)[0] + ".parquet")
with pq.ParquetWriter(outfile, schema=schema, compression='snappy', flavor='spark') as writer:
while True:
chunk = list(islice(rows, chunk_size))
if not chunk:
break
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf, schema=schema)
writer.write_table(table)
def gen_task_list(config, script_name, dumpdir=None, tasklist=None):
"""Write a parallel-friendly task list of `script_name parse_dump <file>`
lines, one per dump file found under `dumpdir`."""
dumpdir = dumpdir or config['dumpdir']
tasklist = tasklist or config['task_list']
files = list(find_dumps(dumpdir, base_pattern=config['file_pattern']))
with open(tasklist, 'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 {script_name} parse_dump {partition}\n')
# --- Part 2: spark sort + repartition --------------------------------------
def sort_and_write(config):
"""Read the directory of per-source parquets, sort and repartition
twice (once by subreddit, once by author), and write the two final
datasets. Pyspark is imported lazily so Part 1 callers don't pay the
Spark startup cost."""
from pyspark.sql import SparkSession, functions as f
spark = SparkSession.builder.appName(config['app_name']).getOrCreate()
df = spark.read.parquet(config['outdir'], compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2', 'subreddit')
df = df.withColumnRenamed("created_utc", "CreatedAt")
df = df.withColumn("Month", f.month(f.col("CreatedAt")))
df = df.withColumn("Year", f.year(f.col("CreatedAt")))
df = df.withColumn("Day", f.dayofmonth(f.col("CreatedAt")))
sub_keys = config['subreddit_sort_keys']
df_sub = df.repartition('subreddit').sort(sub_keys, ascending=True)
df_sub = df_sub.sortWithinPartitions(sub_keys, ascending=True)
df_sub.write.parquet(config['output_by_subreddit'], mode='overwrite', compression='snappy')
auth_keys = config['author_sort_keys']
df_auth = df.repartition('author').sort(auth_keys, ascending=True)
df_auth = df_auth.sortWithinPartitions(auth_keys, ascending=True)
df_auth.write.parquet(config['output_by_author'], mode='overwrite', compression='snappy')

24
datasets/submissions_part1.py Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
"""Part 1 for submissions: parse one RS_*.zst dump into a parquet file.
CLI:
submissions_part1.py parse_dump RS_2018-08.zst
submissions_part1.py gen_task_list
submissions_part1.py parse_dump RS_2018-08.zst --dumpdir=/tmp/in --outdir=/tmp/out
"""
import fire
from dumps_helper import SUBMISSIONS, parse_dump, gen_task_list
def _parse_dump(partition, dumpdir=None, outdir=None):
parse_dump(SUBMISSIONS, partition, dumpdir=dumpdir, outdir=outdir)
def _gen_task_list(dumpdir=None, tasklist=None):
gen_task_list(SUBMISSIONS, 'submissions_part1.py', dumpdir=dumpdir, tasklist=tasklist)
if __name__ == "__main__":
fire.Fire({'parse_dump': _parse_dump,
'gen_task_list': _gen_task_list})

14
datasets/submissions_part2.py Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Part 2 for submissions: Spark sort + repartition the per-source parquets
produced by submissions_part1.py into the final by_subreddit / by_author
datasets.
Launched via the Hyak-provided start_spark_and_run.sh wrapper:
start_spark_and_run.sh 1 submissions_part2.py
"""
from dumps_helper import SUBMISSIONS, sort_and_write
if __name__ == "__main__":
sort_and_write(SUBMISSIONS)

10
density/Makefile Normal file
View File

@@ -0,0 +1,10 @@
all: /gscratch/comdata/output/reddit_density/comment_terms_10000.feather /gscratch/comdata/output/reddit_density/comment_authors_10000.feather /gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather
/gscratch/comdata/output/reddit_density/comment_terms_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather /gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
start_spark_and_run.sh 1 overlap_density.py terms --inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/comment_authors_10000.feather:overlap_density.py /gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather" --outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather" --agg=pd.DataFrame.sum
/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather: overlap_density.py /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet
start_spark_and_run.sh 1 overlap_density.py authors --inpath="/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet" --outpath="/gscratch/comdata/output/reddit_density/subreddit_author_tf_similarities_10000.feather" --agg=pd.DataFrame.sum

4
density/job_script.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 overlap_density.py authors --inpath=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --outpath=/gscratch/comdata/output/reddit_density/comment_authors_10000.feather --agg=pd.DataFrame.sum
stop-all.sh

View File

@@ -0,0 +1,76 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy as GroupBy
import fire
import numpy as np
import sys
sys.path.append("..")
sys.path.append("../similarities")
from similarities.similarities_helper import reindex_tfidf, reindex_tfidf_time_interval
# this is the mean of the ratio of the overlap to the focal size.
# mean shared membership per focal community member
# the input is the author tf-idf matrix
def overlap_density(inpath, outpath, agg = pd.DataFrame.sum):
df = pd.read_feather(inpath)
df = df.drop('subreddit',1)
np.fill_diagonal(df.values,0)
df = agg(df, 0).reset_index()
df = df.rename({0:'overlap_density'},axis='columns')
df.to_feather(outpath)
return df
def overlap_density_weekly(inpath, outpath, agg = GroupBy.sum):
df = pd.read_parquet(inpath)
# exclude the diagonal
df = df.loc[df.subreddit != df.variable]
res = agg(df.groupby(['subreddit','week'])).reset_index()
res.to_feather(outpath)
return res
# inpath="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet";
# min_df=1;
# included_subreddits=None;
# topN=10000;
# outpath="/gscratch/comdata/output/reddit_density/wang_overlaps_10000.feather"
# to_date=2019-10-28
def author_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather",
outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather", agg=pd.DataFrame.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density(inpath, outpath, agg)
def term_overlap_density(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather",
outpath="/gscratch/comdata/output/reddit_density/comment_term_similarity_10000.feather", agg=pd.DataFrame.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density(inpath, outpath, agg)
def author_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/subreddit_authors_10000_weekly.parquet",
outpath="/gscratch/comdata/output/reddit_density/comment_authors_10000_weekly.feather", agg=GroupBy.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density_weekly(inpath, outpath, agg)
def term_overlap_density_weekly(inpath="/gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet",
outpath="/gscratch/comdata/output/reddit_density/comment_terms_10000_weekly.parquet", agg=GroupBy.sum):
if type(agg) == str:
agg = eval(agg)
overlap_density_weekly(inpath, outpath, agg)
if __name__ == "__main__":
fire.Fire({'authors':author_overlap_density,
'terms':term_overlap_density,
'author_weekly':author_overlap_density_weekly,
'term_weekly':term_overlap_density_weekly})

View File

View File

@@ -0,0 +1,26 @@
#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=tf_subreddit_comments
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
source ./bin/activate
module load parallel_sql
echo $(which perl)
conda list pyarrow
which python3
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term --jobs 4

View File

@@ -7,7 +7,6 @@ from itertools import groupby, islice, chain
import fire
from collections import Counter
import os
import datetime
import re
from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize
from nltk.corpus import stopwords
@@ -31,8 +30,8 @@ def weekly_tf(partition, mwe_pass = 'first'):
ngram_output = partition.replace("parquet","txt")
if mwe_pass == 'first':
if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
if os.path.exists(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}"):
os.remove(f"/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
@@ -67,7 +66,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
if mwe_pass != 'first':
mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather')
mwe_dataset = pd.read_feather(f'/gscratch/comdata/output/reddit_ngrams/multiword_expressions.feather')
mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False)
mwe_phrases = list(mwe_dataset.phrase)
mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases]
@@ -88,7 +87,6 @@ def weekly_tf(partition, mwe_pass = 'first'):
new_sentence.append(new_token)
return new_sentence
stopWords = set(stopwords.words('english'))
# we follow the approach described in datta, phelan, adar 2017
@@ -121,7 +119,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
for sentence in sentences:
if random() <= 0.1:
grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4))))
with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
with open(f'/gscratch/comdata/output/reddit_ngrams/comment_ngrams_10p_sample/{ngram_output}','a') as gram_file:
for ng in grams:
gram_file.write(' '.join(ng) + '\n')
for token in sentence:
@@ -156,7 +154,7 @@ def weekly_tf(partition, mwe_pass = 'first'):
outchunksize = 10000
with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
with pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer:
while True:

View File

@@ -1,14 +0,0 @@
#!/bin/bash
user_agent='nathante teblunthuis <nathante@uw.edu>'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments'
base_url='https://files.pushshift.io/reddit/comments/'
wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
# starting in 2020 we use daily dumps not monthly dumps
wget -r --no-parent -A 'RC_202*.gz' -U $user_agent -P $output_dir -nd -nc $base_url/daily/
./check_comments_shas.py

View File

@@ -1,14 +0,0 @@
#!/bin/bash
user_agent='nathante teblunthuis <nathante@uw.edu>'
output_dir='/gscratch/comdata/raw_data/reddit_dumps/submissions'
base_url='https://files.pushshift.io/reddit/submissions/'
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url
wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/
./check_submission_shas.py

View File

@@ -0,0 +1,24 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from similarities_helper import build_weekly_tfidf_dataset
import pandas as pd
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet")
include_subs = pd.read_csv("/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv")
include_subs = set(include_subs.loc[include_subs.comments_rank <= 25000]['subreddit'])
# remove [deleted] and AutoModerator (TODO remove other bots)
# df = df.filter(df.author != '[deleted]')
# df = df.filter(df.author != 'AutoModerator')
df = build_weekly_tfidf_dataset(df, include_subs, 'term')
df.write.parquet('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet', mode='overwrite', compression='snappy')
spark.stop()

25
similarities/Makefile Normal file
View File

@@ -0,0 +1,25 @@
all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms.parquet
# all: /gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_10000.parquet /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet
# /gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
# start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_authors_25000.feather
/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_terms.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py terms --topN=10000
/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet: tfidf.py similarities_helper.py /gscratch/comdata/output/reddit_ngrams/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv
start_spark_and_run.sh 1 tfidf.py authors --topN=10000
/gscratch/comdata/output/reddit_similarity/comment_authors_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author --outfile=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather
/gscratch/comdata/output/reddit_similarity/comment_terms.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet
start_spark_and_run.sh 1 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
# /gscratch/comdata/output/reddit_similarity/comment_terms_10000_weekly.parquet: cosine_similarities.py /gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet
# start_spark_and_run.sh 1 weekly_cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_comment_terms_10000_weely.parquet
/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet: cosine_similarities.py similarities_helper.py /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet /gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet
start_spark_and_run.sh 1 cosine_similarities.py author-tf --outfile=/gscratch/comdata/output/reddit_similarity/subreddit_author_tf_similarities_10000.parquet

1
similarities/TODO Normal file
View File

@@ -0,0 +1 @@
Try normalizing tf by the mean or std instead of the max to avoid penalizing subreddits with very active users.

View File

@@ -0,0 +1,57 @@
import pandas as pd
import fire
from pathlib import Path
from similarities_helper import similarities, column_similarities
def cosine_similarities(infile, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
return similarities(infile=infile, simfunc=column_similarities, term_colname=term_colname, outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases,from_date=from_date, to_date=to_date, tfidf_colname=tfidf_colname)
def term_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms_100k.parquet', min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
return cosine_similarities(infile,
'term',
outfile,
min_df,
max_df,
included_subreddits,
topN,
exclude_phrases,
from_date,
to_date
)
def author_cosine_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
return cosine_similarities(infile,
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
exclude_phrases=False,
from_date=from_date,
to_date=to_date
)
def author_tf_similarities(outfile, infile='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors_100k.parquet', min_df=2, max_df=None, included_subreddits=None, topN=10000, from_date=None, to_date=None):
return cosine_similarities(infile,
'author',
outfile,
min_df,
max_df,
included_subreddits,
topN,
exclude_phrases=False,
from_date=from_date,
to_date=to_date,
tfidf_colname='relative_tf'
)
if __name__ == "__main__":
fire.Fire({'term':term_cosine_similarities,
'author':author_cosine_similarities,
'author-tf':author_tf_similarities})

4
similarities/job_script.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/bash
start_spark_cluster.sh
spark-submit --master spark://$(hostname):18899 cosine_similarities.py term --outfile=/gscratch/comdata/output/reddit_similarity/comment_terms_10000.feather
stop-all.sh

View File

@@ -0,0 +1,394 @@
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as f
from enum import Enum
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from scipy.sparse import csr_matrix, issparse
import pandas as pd
import numpy as np
import pathlib
from datetime import datetime
from pathlib import Path
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
infile = "/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet"
def reindex_tfidf_time_interval(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf_weekly = spark.read.parquet(infile)
# create the time interval
if from_date is not None:
if type(from_date) is str:
from_date = datetime.fromisoformat(from_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week >= from_date)
if to_date is not None:
if type(to_date) is str:
to_date = datetime.fromisoformat(to_date)
tfidf_weekly = tfidf_weekly.filter(tfidf_weekly.week < to_date)
tfidf = tfidf_weekly.groupBy(["subreddit","week", term_id, term]).agg(f.sum("tf").alias("tf"))
tfidf = _calc_tfidf(tfidf, term_colname, tf_weight.Norm05)
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read_parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
return(tempdir, subreddit_names)
def reindex_tfidf(infile, term_colname, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(exclude_phrases)
tfidf = spark.read.parquet(infile)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(map(str.strip,map(str.lower,open(included_subreddits))))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term_colname).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
return (tempdir, subreddit_names)
def similarities(infile, simfunc, term_colname, outfile, min_df=None, max_df=None, included_subreddits=None, topN=500, exclude_phrases=False, from_date=None, to_date=None, tfidf_colname='tf_idf'):
'''
tfidf_colname: set to 'relative_tf' to use normalized term frequency instead of tf-idf, which can be useful for author-based similarities.
'''
if from_date is not None or to_date is not None:
tempdir, subreddit_names = reindex_tfidf_time_interval(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False, from_date=from_date, to_date=to_date)
else:
tempdir, subreddit_names = reindex_tfidf(infile, term_colname=term_colname, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=False)
print("loading matrix")
# mat = read_tfidf_matrix("term_tfidf_entries7ejhvnvl.parquet", term_colname)
mat = read_tfidf_matrix(tempdir.name, term_colname, tfidf_colname)
print(f'computing similarities on mat. mat.shape:{mat.shape}')
print(f"size of mat is:{mat.data.nbytes}")
sims = simfunc(mat)
del mat
if issparse(sims):
sims = sims.todense()
print(f"shape of sims:{sims.shape}")
print(f"len(subreddit_names.subreddit.values):{len(subreddit_names.subreddit.values)}")
sims = pd.DataFrame(sims)
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)}, axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
def read_tfidf_matrix_weekly(path, term_colname, week, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=[tfidf_colname,'subreddit_id_new', term_id_new],filter=ds.field('week')==week).to_pandas()
return(csr_matrix((entries[tfidf_colname], (entries[term_id_new]-1, entries.subreddit_id_new-1))))
def read_tfidf_matrix(path, term_colname, tfidf_colname='tf_idf'):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
print(f"tfidf_colname:{tfidf_colname}")
entries = dataset.to_table(columns=[tfidf_colname, 'subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries[tfidf_colname],(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def write_weekly_similarities(path, sims, week, names):
sims['week'] = week
p = pathlib.Path(path)
if not p.is_dir():
p.mkdir()
# reformat as a pairwise list
sims = sims.melt(id_vars=['subreddit','week'],value_vars=names.subreddit.values)
sims.to_parquet(p / week.isoformat())
def column_overlaps(mat):
non_zeros = (mat != 0).astype('double')
intersection = non_zeros.T @ non_zeros
card1 = non_zeros.sum(axis=0)
den = np.add.outer(card1,card1) - intersection
return intersection / den
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# we might not have the same terms or subreddits each week, so we need to make unique ids for each week.
sub_ids = tfidf.select(['subreddit_id','week']).distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.partitionBy('week').orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,['subreddit_id','week'])
# only use terms in at least min_df included subreddits in a given week
new_count = tfidf.groupBy([term_id,'week']).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,[term_id,'week'],how='inner')
# reset the term ids
term_ids = tfidf.select([term_id,'week']).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.partitionBy('week').orderBy(term_id)))
tfidf = tfidf.join(term_ids,[term_id,'week'])
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf = tfidf.repartition('week')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return(tempdir)
def prep_tfidf_entries(tfidf, term_colname, min_df, max_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col('count') >= min_df)
if max_df is not None:
tfidf = tfidf.filter(f.col('count') <= max_df)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new", f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
# try computing cosine similarities using spark
def spark_cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
tfidf = tfidf.cache()
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
# put like 10 subredis in each partition
# step 2 make it into a distributed.RowMatrix
coordMat = CoordinateMatrix(entries)
coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
# this needs to be an IndexedRowMatrix()
mat = coordMat.toRowMatrix()
#goal: build a matrix of subreddit columns and tf-idfs rows
sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
return (sim_dist, tfidf)
def build_weekly_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term,'week']).agg(f.sum('tf').alias('tf'))
max_subreddit_terms = df.groupby(['subreddit','week']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
df = df.join(max_subreddit_terms, on=['subreddit','week'])
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
# group by term. term is unique
idf = df.groupby([term,'week']).count()
N_docs = df.select(['subreddit','week']).distinct().groupby(['week']).agg(f.count("subreddit").alias("subreddits_in_week"))
idf = idf.join(N_docs, on=['week'])
# add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(idf.subreddits_in_week) / (1+f.col('count'))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select([term,'week']).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.partitionBy('week').orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit','week']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.partitionBy("week").orderBy("subreddit")))
df = df.join(subreddits,on=['subreddit','week'])
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=[term,'week']) # subreddit-term-id is unique
idf = idf.join(terms,on=[term,'week'])
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term,'week'])
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
def _calc_tfidf(df, term_colname, tf_family):
term = term_colname
term_id = term + '_id'
max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
df = df.join(max_subreddit_terms, on='subreddit')
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
# group by term. term is unique
idf = df.groupby([term]).count()
N_docs = df.select('subreddit').distinct().count()
# add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select(term).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
df = df.join(subreddits,on='subreddit')
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=term) # subreddit-term-id is unique
idf = idf.join(terms,on=term)
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term])
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
df = _calc_tfidf(df, term_colname, tf_family)
return df
def select_topN_subreddits(topN, path="/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments_nonsfw.csv"):
rankdf = pd.read_csv(path)
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
return included_subreddits

84
similarities/tfidf.py Normal file
View File

@@ -0,0 +1,84 @@
import fire
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from similarities_helper import build_tfidf_dataset, build_weekly_tfidf_dataset, select_topN_subreddits
def _tfidf_wrapper(func, inpath, outpath, topN, term_colname, exclude, included_subreddits):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(inpath)
df = df.filter(~ f.col(term_colname).isin(exclude))
if included_subreddits is not None:
include_subs = set(map(str.strip,map(str.lower, open(included_subreddits))))
else:
include_subs = select_topN_subreddits(topN)
df = func(df, include_subs, term_colname)
df.write.parquet(outpath,mode='overwrite',compression='snappy')
spark.stop()
def tfidf(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_weekly(inpath, outpath, topN, term_colname, exclude, included_subreddits):
return _tfidf_wrapper(build_weekly_tfidf_dataset, inpath, outpath, topN, term_colname, exclude, included_subreddits)
def tfidf_authors(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet',
topN=25000,
included_subreddits=None):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
['[deleted]','AutoModerator'],
included_subreddits=included_subreddits
)
def tfidf_terms(outpath='/gscratch/comdata/output/reddit_similarity/tfidf/comment_terms.parquet',
topN=25000,
included_subreddits=None):
return tfidf("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',
[],
included_subreddits=included_subreddits
)
def tfidf_authors_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
topN=25000,
included_subreddits=None):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_authors.parquet",
outpath,
topN,
'author',
['[deleted]','AutoModerator'],
included_subreddits=included_subreddits
)
def tfidf_terms_weekly(outpath='/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
topN=25000,
included_subreddits=None):
return tfidf_weekly("/gscratch/comdata/output/reddit_ngrams/comment_terms.parquet",
outpath,
topN,
'term',
[],
included_subreddits=included_subreddits
)
if __name__ == "__main__":
fire.Fire({'authors':tfidf_authors,
'terms':tfidf_terms,
'authors_weekly':tfidf_authors_weekly,
'terms_weekly':tfidf_terms_weekly})

View File

@@ -1,18 +1,14 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
submissions = spark.read.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet")
prop_nsfw = submissions.select(['subreddit','over_18']).groupby('subreddit').agg(f.mean(f.col('over_18').astype('double')).alias('prop_nsfw'))
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
# remove /u/ pages
@@ -20,6 +16,9 @@ df = df.filter(~df.subreddit.like("u_%"))
df = df.groupBy('subreddit').agg(f.count('id').alias("n_comments"))
df = df.join(prop_nsfw,on='subreddit')
df = df.filter(df.prop_nsfw < 0.5)
win = Window.orderBy(f.col('n_comments').desc())
df = df.withColumn('comments_rank', f.rank().over(win))
@@ -27,4 +26,4 @@ df = df.toPandas()
df = df.sort_values("n_comments")
df.to_csv('/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv',index=False)
df.to_csv('/gscratch/comdata/output/reddit_similarity/subreddits_by_num_comments.csv', index=False)

View File

@@ -0,0 +1,18 @@
from similarities_helper import similarities
import numpy as np
import fire
def wang_similarity(mat):
non_zeros = (mat != 0).astype(np.float32)
intersection = non_zeros.T @ non_zeros
return intersection
infile="/gscratch/comdata/output/reddit_similarity/tfidf/comment_authors.parquet"; outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather"; min_df=1; included_subreddits=None; topN=10000; exclude_phrases=False; from_date=None; to_date=None
def wang_overlaps(infile, outfile="/gscratch/comdata/output/reddit_similarity/wang_similarity_10000.feather", min_df=1, max_df=None, included_subreddits=None, topN=10000, exclude_phrases=False, from_date=None, to_date=None):
return similarities(infile=infile, simfunc=wang_similarity, term_colname='author', outfile=outfile, min_df=min_df, max_df=max_df, included_subreddits=included_subreddits, topN=topN, exclude_phrases=exclude_phrases, from_date=from_date, to_date=to_date)
if __name__ == "__main__":
fire.Fire(wang_overlaps)

View File

@@ -0,0 +1,81 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import *
from multiprocessing import Pool, cpu_count
def _week_similarities(tempdir, term_colname, week):
print(f"loading matrix: {week}")
mat = read_tfidf_matrix_weekly(tempdir.name, term_colname, week)
print('computing similarities')
sims = column_similarities(mat)
del mat
names = subreddit_names.loc[subreddit_names.week == week]
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i: sr for i, sr in enumerate(names.subreddit.values)}, axis=1)
sims['_subreddit'] = names.subreddit.values
write_weekly_similarities(outfile, sims, week, names)
#tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_weekly.parquet')
def cosine_similarities_weekly(tfidf_path, outfile, term_colname, min_df = None, included_subreddits = None, topN = 500):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
tfidf = spark.read.parquet(tfidf_path)
if included_subreddits is None:
included_subreddits = select_topN_subreddits(topN)
else:
included_subreddits = set(open(included_subreddits))
print(f"computing weekly similarities for {len(included_subreddits)} subreddits")
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries_weekly(tfidf, term_colname, min_df, max_df=None, included_subreddits=included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
# the ids can change each week.
subreddit_names = tfidf.select(['subreddit','subreddit_id_new','week']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
weeks = sorted(list(subreddit_names.week.drop_duplicates()))
# do this step in parallel if we have the memory for it.
# should be doable with pool.map
def week_similarities_helper(week):
_week_similarities(tempdir, term_colname, week)
with Pool(cpu_count()) as pool: # maybe it can be done with 40 cores on the huge machine?
list(pool.map(week_similarities_helper,weeks))
def author_cosine_similarities_weekly(outfile, min_df=2 , included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_authors.parquet',
outfile,
'author',
min_df,
included_subreddits,
topN)
def term_cosine_similarities_weekly(outfile, min_df=None, included_subreddits=None, topN=500):
return cosine_similarities_weekly('/gscratch/comdata/output/reddit_similarity/tfidf_weekly/comment_terms.parquet',
outfile,
'term',
min_df,
included_subreddits,
topN)
if __name__ == "__main__":
fire.Fire({'authors':author_cosine_similarities_weekly,
'terms':term_cosine_similarities_weekly})

View File

@@ -1,172 +0,0 @@
from pyspark.sql import Window
from pyspark.sql import functions as f
from enum import Enum
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from tempfile import TemporaryDirectory
import pyarrow
import pyarrow.dataset as ds
from scipy.sparse import csr_matrix
import pandas as pd
import numpy as np
class tf_weight(Enum):
MaxTF = 1
Norm05 = 2
def read_tfidf_matrix(path,term_colname):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
dataset = ds.dataset(path,format='parquet')
entries = dataset.to_table(columns=['tf_idf','subreddit_id_new',term_id_new]).to_pandas()
return(csr_matrix((entries.tf_idf,(entries[term_id_new]-1, entries.subreddit_id_new-1))))
def column_similarities(mat):
norm = np.matrix(np.power(mat.power(2).sum(axis=0),0.5,dtype=np.float32))
mat = mat.multiply(1/norm)
sims = mat.T @ mat
return(sims)
def prep_tfidf_entries(tfidf, term_colname, min_df, included_subreddits):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
# tfidf = tfidf.withColumnRenamed("idf","idf_old")
# tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", (tfidf.relative_tf * tfidf.idf).cast('float'))
tempdir =TemporaryDirectory(suffix='.parquet',prefix='term_tfidf_entries',dir='.')
tfidf.write.parquet(tempdir.name,mode='overwrite',compression='snappy')
return tempdir
def cosine_similarities(tfidf, term_colname, min_df, included_subreddits, similarity_threshold):
term = term_colname
term_id = term + '_id'
term_id_new = term + '_id_new'
if min_df is None:
min_df = 0.1 * len(included_subreddits)
tfidf = tfidf.filter(f.col("subreddit").isin(included_subreddits))
tfidf = tfidf.cache()
# reset the subreddit ids
sub_ids = tfidf.select('subreddit_id').distinct()
sub_ids = sub_ids.withColumn("subreddit_id_new",f.row_number().over(Window.orderBy("subreddit_id")))
tfidf = tfidf.join(sub_ids,'subreddit_id')
# only use terms in at least min_df included subreddits
new_count = tfidf.groupBy(term_id).agg(f.count(term_id).alias('new_count'))
# new_count = new_count.filter(f.col('new_count') >= min_df)
tfidf = tfidf.join(new_count,term_id,how='inner')
# reset the term ids
term_ids = tfidf.select([term_id]).distinct()
term_ids = term_ids.withColumn(term_id_new,f.row_number().over(Window.orderBy(term_id)))
tfidf = tfidf.join(term_ids,term_id)
tfidf = tfidf.withColumnRenamed("tf_idf","tf_idf_old")
# tfidf = tfidf.withColumnRenamed("idf","idf_old")
# tfidf = tfidf.withColumn("idf",f.log(25000/f.col("count")))
tfidf = tfidf.withColumn("tf_idf", tfidf.relative_tf * tfidf.idf)
# step 1 make an rdd of entires
# sorted by (dense) spark subreddit id
# entries = tfidf.filter((f.col('subreddit') == 'asoiaf') | (f.col('subreddit') == 'gameofthrones') | (f.col('subreddit') == 'christianity')).select(f.col("term_id_new")-1,f.col("subreddit_id_new")-1,"tf_idf").rdd
n_partitions = int(len(included_subreddits)*2 / 5)
entries = tfidf.select(f.col(term_id_new)-1,f.col("subreddit_id_new")-1,"tf_idf").rdd.repartition(n_partitions)
# put like 10 subredis in each partition
# step 2 make it into a distributed.RowMatrix
coordMat = CoordinateMatrix(entries)
coordMat = CoordinateMatrix(coordMat.entries.repartition(n_partitions))
# this needs to be an IndexedRowMatrix()
mat = coordMat.toRowMatrix()
#goal: build a matrix of subreddit columns and tf-idfs rows
sim_dist = mat.columnSimilarities(threshold=similarity_threshold)
return (sim_dist, tfidf)
def build_tfidf_dataset(df, include_subs, term_colname, tf_family=tf_weight.Norm05):
term = term_colname
term_id = term + '_id'
# aggregate counts by week. now subreddit-term is distinct
df = df.filter(df.subreddit.isin(include_subs))
df = df.groupBy(['subreddit',term]).agg(f.sum('tf').alias('tf'))
max_subreddit_terms = df.groupby(['subreddit']).max('tf') # subreddits are unique
max_subreddit_terms = max_subreddit_terms.withColumnRenamed('max(tf)','sr_max_tf')
df = df.join(max_subreddit_terms, on='subreddit')
df = df.withColumn("relative_tf", df.tf / df.sr_max_tf)
# group by term. term is unique
idf = df.groupby([term]).count()
N_docs = df.select('subreddit').distinct().count()
# add a little smoothing to the idf
idf = idf.withColumn('idf',f.log(N_docs/(1+f.col('count')))+1)
# collect the dictionary to make a pydict of terms to indexes
terms = idf.select(term).distinct() # terms are distinct
terms = terms.withColumn(term_id,f.row_number().over(Window.orderBy(term))) # term ids are distinct
# make subreddit ids
subreddits = df.select(['subreddit']).distinct()
subreddits = subreddits.withColumn('subreddit_id',f.row_number().over(Window.orderBy("subreddit")))
df = df.join(subreddits,on='subreddit')
# map terms to indexes in the tfs and the idfs
df = df.join(terms,on=term) # subreddit-term-id is unique
idf = idf.join(terms,on=term)
# join on subreddit/term to create tf/dfs indexed by term
df = df.join(idf, on=[term_id, term])
# agg terms by subreddit to make sparse tf/df vectors
if tf_family == tf_weight.MaxTF:
df = df.withColumn("tf_idf", df.relative_tf * df.idf)
else: # tf_fam = tf_weight.Norm05
df = df.withColumn("tf_idf", (0.5 + 0.5 * df.relative_tf) * df.idf)
return df

View File

@@ -1,9 +0,0 @@
## this should be run manually since we don't have a nice way to wait on parallel_sql jobs
#!/usr/bin/env bash
./parse_submissions.sh
start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py

View File

@@ -1,118 +0,0 @@
#!/usr/bin/env python3
# two stages:
# 1. from gz to arrow parquet (this script)
# 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py)
from datetime import datetime
from multiprocessing import Pool
from itertools import islice
from helper import find_dumps, open_fileset
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import simdjson
import fire
import os
parser = simdjson.Parser()
def parse_submission(post, names = None):
if names is None:
names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error']
try:
post = parser.parse(post)
except (ValueError) as e:
# print(e)
# print(post)
row = [None for _ in names]
row[-1] = "Error parsing json|{0}|{1}".format(e,post)
return tuple(row)
row = []
for name in names:
if name == 'created_utc' or name == 'retrieved_on':
val = post.get(name,None)
if val is not None:
row.append(datetime.fromtimestamp(int(post[name]),tz=None))
else:
row.append(None)
elif name == 'edited':
val = post[name]
if type(val) == bool:
row.append(val)
row.append(None)
else:
row.append(True)
row.append(datetime.fromtimestamp(int(val),tz=None))
elif name == "time_edited":
continue
elif name == 'has_media':
row.append(post.get('media',None) is not None)
elif name not in post:
row.append(None)
else:
row.append(post[name])
return tuple(row)
def parse_dump(partition):
N=10000
stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
rows = map(parse_submission,stream)
schema = pa.schema([
pa.field('id', pa.string(),nullable=True),
pa.field('author', pa.string(),nullable=True),
pa.field('subreddit', pa.string(),nullable=True),
pa.field('title', pa.string(),nullable=True),
pa.field('created_utc', pa.timestamp('ms'),nullable=True),
pa.field('permalink', pa.string(),nullable=True),
pa.field('url', pa.string(),nullable=True),
pa.field('domain', pa.string(),nullable=True),
pa.field('score', pa.int64(),nullable=True),
pa.field('ups', pa.int64(),nullable=True),
pa.field('downs', pa.int64(),nullable=True),
pa.field('over_18', pa.bool_(),nullable=True),
pa.field('has_media',pa.bool_(),nullable=True),
pa.field('selftext',pa.string(),nullable=True),
pa.field('retrieved_on', pa.timestamp('ms'),nullable=True),
pa.field('num_comments', pa.int64(),nullable=True),
pa.field('gilded',pa.int64(),nullable=True),
pa.field('edited',pa.bool_(),nullable=True),
pa.field('time_edited',pa.timestamp('ms'),nullable=True),
pa.field('subreddit_type',pa.string(),nullable=True),
pa.field('subreddit_id',pa.string(),nullable=True),
pa.field('subreddit_subscribers',pa.int64(),nullable=True),
pa.field('name',pa.string(),nullable=True),
pa.field('is_self',pa.bool_(),nullable=True),
pa.field('stickied',pa.bool_(),nullable=True),
pa.field('quarantine',pa.bool_(),nullable=True),
pa.field('error',pa.string(),nullable=True)])
if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"):
os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
while True:
chunk = islice(rows,N)
pddf = pd.DataFrame(chunk, columns=schema.names)
table = pa.Table.from_pandas(pddf,schema=schema)
if table.shape[0] == 0:
break
writer.write_table(table)
writer.close()
def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
with open("parse_submissions_task_list",'w') as of:
for fpath in files:
partition = os.path.split(fpath)[1]
of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n')
if __name__ == "__main__":
fire.Fire({'parse_dump':parse_dump,
'gen_task_list':gen_task_list})

View File

@@ -1,42 +0,0 @@
#!/usr/bin/env python3
# spark script to make sorted, and partitioned parquet files
import pyspark
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
conf = conf.set("spark.sql.shuffle.partitions",2000)
conf = conf.set('spark.sql.crossJoin.enabled',"true")
conf = conf.set('spark.debug.maxToStringFields',200)
sqlContext = pyspark.SQLContext(sc)
df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/")
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.withColumnRenamed('subreddit_2','subreddit')
df = df.withColumnRenamed("created_utc","CreatedAt")
df = df.withColumn("Month",f.month(f.col("CreatedAt")))
df = df.withColumn("Year",f.year(f.col("CreatedAt")))
df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
# next we gotta resort it all.
df = df.repartition("subreddit")
df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
# # we also want to have parquet files sorted by author then reddit.
df = df.repartition("author")
df3 = df.sort(["author","CreatedAt","id"],ascending=True)
df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')

View File

@@ -1,127 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.mllib.linalg.distributed import RowMatrix, CoordinateMatrix
import numpy as np
import pyarrow
import pandas as pd
import fire
from itertools import islice
from pathlib import Path
from similarities_helper import cosine_similarities, prep_tfidf_entries, read_tfidf_matrix, column_similarities
import scipy
# outfile='test_similarities_500.feather';
# min_df = None;
# included_subreddits=None; topN=100; exclude_phrases=True;
def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, topN=500, exclude_phrases=False):
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
print(outfile)
print(exclude_phrases)
tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
if included_subreddits is None:
rankdf = pd.read_csv("/gscratch/comdata/users/nathante/cdsc-reddit/subreddits_by_num_comments.csv")
included_subreddits = set(rankdf.loc[rankdf.comments_rank <= topN,'subreddit'].values)
else:
included_subreddits = set(open(included_subreddits))
if exclude_phrases == True:
tfidf = tfidf.filter(~f.col(term).contains("_"))
print("creating temporary parquet with matrix indicies")
tempdir = prep_tfidf_entries(tfidf, 'term', min_df, included_subreddits)
tfidf = spark.read.parquet(tempdir.name)
subreddit_names = tfidf.select(['subreddit','subreddit_id_new']).distinct().toPandas()
subreddit_names = subreddit_names.sort_values("subreddit_id_new")
subreddit_names['subreddit_id_new'] = subreddit_names['subreddit_id_new'] - 1
spark.stop()
print("loading matrix")
mat = read_tfidf_matrix(tempdir.name,'term')
print('computing similarities')
sims = column_similarities(mat)
del mat
sims = pd.DataFrame(sims.todense())
sims = sims.rename({i:sr for i, sr in enumerate(subreddit_names.subreddit.values)},axis=1)
sims['subreddit'] = subreddit_names.subreddit.values
p = Path(outfile)
output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
sims.to_feather(outfile)
tempdir.cleanup()
path = "term_tfidf_entriesaukjy5gv.parquet"
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
# def term_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
# '''
# Compute similarities between subreddits based on tfi-idf vectors of comment texts
# included_subreddits : string
# Text file containing a list of subreddits to include (one per line) if included_subreddits is None then do the top 500 subreddits
# similarity_threshold : double (default = 0)
# set > 0 for large numbers of subreddits to get an approximate solution using the DIMSUM algorithm
# https://stanford.edu/~rezab/papers/dimsum.pdf. If similarity_threshold=0 we get an exact solution using an O(N^2) algorithm.
# min_df : int (default = 0.1 * (number of included_subreddits)
# exclude terms that appear in fewer than this number of documents.
# outfile: string
# where to output csv and feather outputs
# '''
# print(outfile)
# print(exclude_phrases)
# tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet')
# if included_subreddits is None:
# included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
# included_subreddits = {s.strip('\n') for s in included_subreddits}
# else:
# included_subreddits = set(open(included_subreddits))
# if exclude_phrases == True:
# tfidf = tfidf.filter(~f.col(term).contains("_"))
# sim_dist, tfidf = cosine_similarities(tfidf, 'term', min_df, included_subreddits, similarity_threshold)
# p = Path(outfile)
# output_feather = Path(str(p).replace("".join(p.suffixes), ".feather"))
# output_csv = Path(str(p).replace("".join(p.suffixes), ".csv"))
# output_parquet = Path(str(p).replace("".join(p.suffixes), ".parquet"))
# sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
# #instead of toLocalMatrix() why not read as entries and put strait into numpy
# sim_entries = pd.read_parquet(output_parquet)
# df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
# spark.stop()
# df['subreddit_id_new'] = df['subreddit_id_new'] - 1
# df = df.sort_values('subreddit_id_new').reset_index(drop=True)
# df = df.set_index('subreddit_id_new')
# similarities = sim_entries.join(df, on='i')
# similarities = similarities.rename(columns={'subreddit':"subreddit_i"})
# similarities = similarities.join(df, on='j')
# similarities = similarities.rename(columns={'subreddit':"subreddit_j"})
# similarities.to_feather(output_feather)
# similarities.to_csv(output_csv)
# return similarities
if __name__ == '__main__':
fire.Fire(term_cosine_similarities)

View File

@@ -1,19 +0,0 @@
from pyspark.sql import SparkSession
from similarities_helper import build_tfidf_dataset
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
include_subs = {s.strip('\n') for s in include_subs}
# remove [deleted] and AutoModerator (TODO remove other bots)
df = df.filter(df.author != '[deleted]')
df = df.filter(df.author != 'AutoModerator')
df = build_tfidf_dataset(df, include_subs, 'author')
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
spark.stop()

View File

@@ -1,18 +0,0 @@
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql import Window
from similarities_helper import build_tfidf_dataset
## TODO:need to exclude automoderator / bot posts.
## TODO:need to exclude better handle hyperlinks.
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp")
include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
include_subs = {s.strip('\n') for s in include_subs}
df = build_tfidf_dataset(df, include_subs, 'term')
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
spark.stop()

View File

@@ -0,0 +1,96 @@
from pyarrow import dataset as ds
import numpy as np
import pandas as pd
import plotnine as pn
random = np.random.RandomState(1968)
def load_densities(term_density_file="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_density_file="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather"):
term_density = pd.read_feather(term_density_file)
author_density = pd.read_feather(author_density_file)
term_density.rename({'overlap_density':'term_density','index':'subreddit'},axis='columns',inplace=True)
author_density.rename({'overlap_density':'author_density','index':'subreddit'},axis='columns',inplace=True)
density = term_density.merge(author_density,on='subreddit',how='inner')
return density
def load_clusters(term_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_file="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather"):
term_clusters = pd.read_feather(term_clusters_file)
author_clusters = pd.read_feather(author_clusters_file)
# rename, join and return
term_clusters.rename({'cluster':'term_cluster'},axis='columns',inplace=True)
author_clusters.rename({'cluster':'author_cluster'},axis='columns',inplace=True)
clusters = term_clusters.merge(author_clusters,on='subreddit',how='inner')
return clusters
if __name__ == '__main__':
df = load_densities()
cl = load_clusters()
df['td_rank'] = df.term_density.rank()
df['ad_rank'] = df.author_density.rank()
df['td_percentile'] = df.td_rank / df.shape[0]
df['ad_percentile'] = df.ad_rank / df.shape[0]
df = df.merge(cl, on='subreddit',how='inner')
term_cluster_density = df.groupby('term_cluster').agg({'td_rank':['mean','min','max'],
'ad_rank':['mean','min','max'],
'td_percentile':['mean','min','max'],
'ad_percentile':['mean','min','max'],
'subreddit':['count']})
author_cluster_density = df.groupby('author_cluster').agg({'td_rank':['mean','min','max'],
'ad_rank':['mean','min','max'],
'td_percentile':['mean','min','max'],
'ad_percentile':['mean','min','max'],
'subreddit':['count']})
# which clusters have the most term_density?
term_cluster_density.iloc[term_cluster_density.td_rank['mean'].sort_values().index]
# which clusters have the most author_density?
term_cluster_density.iloc[term_cluster_density.ad_rank['mean'].sort_values(ascending=False).index].loc[term_cluster_density.subreddit['count'] >= 5][0:20]
high_density_term_clusters = term_cluster_density.loc[(term_cluster_density.td_percentile['mean'] > 0.75) & (term_cluster_density.subreddit['count'] > 5)]
# let's just use term density instead of author density for now. We can do a second batch with author density next.
chosen_clusters = high_density_term_clusters.sample(3,random_state=random)
cluster_info = df.loc[df.term_cluster.isin(chosen_clusters.index.values)]
chosen_subreddits = cluster_info.subreddit.values
dataset = ds.dataset("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet",format='parquet')
comments = dataset.to_table(filter=ds.field("subreddit").isin(chosen_subreddits),columns=['id','subreddit','author','CreatedAt'])
comments = comments.to_pandas()
comments['week'] = comments.CreatedAt.dt.date - pd.to_timedelta(comments['CreatedAt'].dt.dayofweek, unit='d')
author_timeseries = comments.loc[:,['subreddit','author','week']].drop_duplicates().groupby(['subreddit','week']).count().reset_index()
for clid in chosen_clusters.index.values:
ts = pd.read_feather(f"data/ts_term_cluster_{clid}.feather")
pn.options.figure_size = (11.7,8.27)
p = pn.ggplot(ts)
p = p + pn.geom_line(pn.aes('week','value',group='subreddit'))
p = p + pn.facet_wrap('~ subreddit')
p.save(f"plots/ts_term_cluster_{clid}.png")
fig, ax = pyplot.subplots(figsize=(11.7,8.27))
g = sns.FacetGrid(ts,row='subreddit')
g.map_dataframe(sns.scatterplot,'week','value',data=ts,ax=ax)

View File

@@ -0,0 +1,37 @@
import pandas as pd
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from choose_clusters import load_clusters, load_densities
import fire
from pathlib import Path
def main(term_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_terms_10000.feather",
author_clusters_path="/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather",
term_densities_path="/gscratch/comdata/output/reddit_density/comment_terms_10000.feather",
author_densities_path="/gscratch/comdata/output/reddit_density/comment_authors_10000.feather",
output="data/subreddit_timeseries.parquet"):
clusters = load_clusters(term_clusters_path, author_clusters_path)
densities = load_densities(term_densities_path, author_densities_path)
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet")
df = df.withColumn('week', f.date_trunc('week', f.col("CreatedAt")))
# time of unique authors by series by week
ts = df.select(['subreddit','week','author']).distinct().groupby(['subreddit','week']).count()
ts = ts.repartition('subreddit')
spk_clusters = spark.createDataFrame(clusters)
ts = ts.join(spk_clusters, on='subreddit', how='inner')
spk_densities = spark.createDataFrame(densities)
ts = ts.join(spk_densities, on='subreddit', how='inner')
ts.write.parquet(output, mode='overwrite')
if __name__ == "__main__":
fire.Fire(main)

11
visualization/Makefile Normal file
View File

@@ -0,0 +1,11 @@
all: subreddit_author_tf_similarities_10000.html #comment_authors_10000.html
# wang_tsne_10000.html
# wang_tsne_10000.html:/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather /gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather tsne_vis.py
# python3 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_tsne/wang_similarity_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/wang_similarity_10000.feather --output=wang_tsne_10000.html
# comment_authors_10000.html:/gscratch/comdata/output/reddit_tsne/comment_authors_10000.feather /gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather tsne_vis.py
# python3 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_similarity/comment_authors_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/comment_authors_10000.feather --output=comment_authors_10000.html
subreddit_author_tf_similarities_10000.html:/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather /gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather tsne_vis.py
start_spark_and_run.sh 1 tsne_vis.py --tsne_data=/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather --clusters=/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather --output=subreddit_author_tf_similarities_10000.html

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -5,21 +5,44 @@ alt.data_transformers.enable('default')
from sklearn.neighbors import NearestNeighbors
import pandas as pd
from numpy import random
import fire
import numpy as np
def base_plot(plot_data):
# base = base.encode(alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')))
cluster_dropdown = alt.binding_select(options=[str(c) for c in sorted(set(plot_data.cluster))])
# subreddit_dropdown = alt.binding_select(options=sorted(plot_data.subreddit))
cluster_click_select = alt.selection_single(on='click',fields=['cluster'], bind=cluster_dropdown, name=' ')
# cluster_select = alt.selection_single(fields=['cluster'], bind=cluster_dropdown, name='cluster')
# cluster_select_and = cluster_click_select & cluster_select
#
# subreddit_select = alt.selection_single(on='click',fields=['subreddit'],bind=subreddit_dropdown,name='subreddit_click')
color = alt.condition(cluster_click_select ,
alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')),
alt.value("lightgray"))
base = alt.Chart(plot_data).mark_text().encode(
alt.X('x',axis=alt.Axis(grid=False),scale=alt.Scale(domain=(-65,65))),
alt.Y('y',axis=alt.Axis(grid=False),scale=alt.Scale(domain=(-65,65))),
color=color,
text='subreddit')
base = base.add_selection(cluster_click_select)
return base
def zoom_plot(plot_data):
chart = base_plot(plot_data)
chart = chart.encode(alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')))
chart = chart.interactive()
chart = chart.properties(width=1275,height=1000)
chart = chart.properties(width=1275,height=800)
return chart
@@ -51,7 +74,7 @@ def viewport_plot(plot_data):
alt.Y('y',axis=alt.Axis(grid=False),scale=alt.Scale(domain=selectory2))
)
sr = sr.encode(alt.Color(field='color',type='nominal',scale=alt.Scale(scheme='category10')))
sr = sr.properties(width=1275,height=600)
@@ -70,15 +93,29 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
distances = np.empty(shape=(centroids.shape[0],centroids.shape[0]))
groups = tsne_data.groupby('cluster')
for centroid in centroids.itertuples():
c_dists = groups.apply(lambda r: min(np.sqrt(np.square(centroid.x - r.x) + np.square(centroid.y-r.y))))
distances[:,centroid.Index] = c_dists
points = np.array(tsne_data.loc[:,['x','y']])
centers = np.array(centroids.loc[:,['x','y']])
# point x centroid
point_center_distances = np.linalg.norm((points[:,None,:] - centers[None,:,:]),axis=-1)
# distances is cluster x point
for gid, group in groups:
c_dists = point_center_distances[group.index.values,:].min(axis=0)
distances[group.cluster.values[0],] = c_dists
# nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(centroids)
# distances, indices = nbrs.kneighbors()
nbrs = NearestNeighbors(n_neighbors=n_neighbors,metric='precomputed').fit(distances)
distances, indices = nbrs.kneighbors()
nearest = distances.argpartition(n_neighbors,0)
indices = nearest[:n_neighbors,:].T
# neighbor_distances = np.copy(distances)
# neighbor_distances.sort(0)
# neighbor_distances = neighbor_distances[0:n_neighbors,:]
# nbrs = NearestNeighbors(n_neighbors=n_neighbors,metric='precomputed').fit(distances)
# distances, indices = nbrs.kneighbors()
color_assignments = np.repeat(-1,len(centroids))
@@ -100,26 +137,39 @@ def assign_cluster_colors(tsne_data, clusters, n_colors, n_neighbors = 4):
tsne_data = tsne_data.merge(colors,on='cluster')
return(tsne_data)
term_data = pd.read_feather("tsne_subreddit_fit.feather")
clusters = pd.read_feather("term_3000_clusters.feather")
def build_visualization(tsne_data, clusters, output):
tsne_data = assign_cluster_colors(term_data,clusters,10,8)
# tsne_data = "/gscratch/comdata/output/reddit_tsne/subreddit_author_tf_similarities_10000.feather"
# clusters = "/gscratch/comdata/output/reddit_clustering/subreddit_author_tf_similarities_10000.feather"
tsne_data = pd.read_feather(tsne_data)
clusters = pd.read_feather(clusters)
tsne_data = assign_cluster_colors(tsne_data,clusters,10,8)
# sr_per_cluster = tsne_data.groupby('cluster').subreddit.count().reset_index()
# sr_per_cluster = sr_per_cluster.rename(columns={'subreddit':'cluster_size'})
tsne_data = tsne_data.merge(sr_per_cluster,on='cluster')
term_zoom_plot = zoom_plot(tsne_data)
term_zoom_plot.save("subreddit_terms_tsne_3000.html")
term_zoom_plot.save(output)
term_viewport_plot = viewport_plot(tsne_data)
term_viewport_plot.save("subreddit_terms_tsne_3000_viewport.html")
term_viewport_plot.save(output.replace(".html","_viewport.html"))
commenter_data = pd.read_feather("tsne_author_fit.feather")
clusters = pd.read_feather('author_3000_clusters.feather')
commenter_data = assign_cluster_colors(commenter_data,clusters,10,8)
commenter_zoom_plot = zoom_plot(commenter_data)
commenter_viewport_plot = viewport_plot(commenter_data)
commenter_zoom_plot.save("subreddit_commenters_tsne_3000.html")
commenter_viewport_plot.save("subreddit_commenters_tsne_3000_viewport.html")
if __name__ == "__main__":
fire.Fire(build_visualization)
# commenter_data = pd.read_feather("tsne_author_fit.feather")
# clusters = pd.read_feather('author_3000_clusters.feather')
# commenter_data = assign_cluster_colors(commenter_data,clusters,10,8)
# commenter_zoom_plot = zoom_plot(commenter_data)
# commenter_viewport_plot = viewport_plot(commenter_data)
# commenter_zoom_plot.save("subreddit_commenters_tsne_3000.html")
# commenter_viewport_plot.save("subreddit_commenters_tsne_3000_viewport.html")
# chart = chart.properties(width=10000,height=10000)
# chart.save("test_tsne_whole.svg")