update examples with working streaming
This commit is contained in:
parent
40d4563770
commit
e22ddf23da
@ -1,8 +1,8 @@
|
|||||||
import pyarrow.dataset as ds
|
import pyarrow.dataset as ds
|
||||||
import pyarrow as pa
|
|
||||||
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
|
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
|
||||||
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
|
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
|
||||||
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/', format='parquet', partitioning='hive')
|
dataset = ds.dataset('/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/', format='parquet')
|
||||||
|
|
||||||
# let's get all the comments to two subreddits:
|
# let's get all the comments to two subreddits:
|
||||||
subreddits_to_pull = ['seattle','seattlewa']
|
subreddits_to_pull = ['seattle','seattlewa']
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import pyarrow.dataset as ds
|
pimport pyarrow.dataset as ds
|
||||||
from itertools import chain, groupby, islice
|
from itertools import chain, groupby, islice
|
||||||
|
|
||||||
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
|
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.
|
||||||
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
|
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
|
||||||
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive')
|
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
|
||||||
|
|
||||||
# let's get all the comments to two subreddits:
|
# let's get all the comments to two subreddits:
|
||||||
subreddits_to_pull = ['seattlewa','seattle']
|
subreddits_to_pull = ['seattlewa','seattle']
|
||||||
@ -11,22 +11,28 @@ subreddits_to_pull = ['seattlewa','seattle']
|
|||||||
# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
|
# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
|
||||||
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
|
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
|
||||||
|
|
||||||
# simple function to execute scantasks and create a stream of pydict rows
|
# simple function to execute scantasks and create a stream of rows
|
||||||
def execute_scan_task(st):
|
def iterate_rows(scan_tasks):
|
||||||
# an executed scan task yields an iterator of record_batches
|
for st in scan_tasks:
|
||||||
def unroll_record_batch(rb):
|
|
||||||
df = rb.to_pandas()
|
|
||||||
return df.itertuples()
|
|
||||||
|
|
||||||
for rb in st.execute():
|
for rb in st.execute():
|
||||||
yield unroll_record_batch(rb)
|
df = rb.to_pandas()
|
||||||
|
for t in df.itertuples():
|
||||||
|
yield t
|
||||||
|
|
||||||
|
row_iter = iterate_rows(scan_tasks)
|
||||||
# now we just need to flatten and we have our iterator
|
|
||||||
row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks)))
|
|
||||||
|
|
||||||
# now we can use python's groupby function to read one author at a time
|
# now we can use python's groupby function to read one author at a time
|
||||||
# note that the same author can appear more than once since the record batches may not be in the correct order.
|
# note that the same author can appear more than once since the record batches may not be in the correct order.
|
||||||
author_submissions = groupby(row_iter, lambda row: row.author)
|
author_submissions = groupby(row_iter, lambda row: row.author)
|
||||||
|
|
||||||
|
count_dict = {}
|
||||||
|
|
||||||
for auth, posts in author_submissions:
|
for auth, posts in author_submissions:
|
||||||
print(f"{auth} has {len(list(posts))} posts")
|
if auth in count_dict:
|
||||||
|
count_dict[auth] = count_dict[auth] + 1
|
||||||
|
else:
|
||||||
|
count_dict[auth] = 1
|
||||||
|
|
||||||
|
# since it's partitioned and sorted by author, we get one group for each author
|
||||||
|
any([ v != 1 for k,v in count_dict.items()])
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user