Use dask to parallelize and scale user level datasets

This commit is contained in:
Nathan TeBlunthuis 2018-08-14 14:37:03 -07:00
parent 418fa020e5
commit daf1851cbb
3 changed files with 27927 additions and 0 deletions

144
bin/wikiq_users Executable file
View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
import dask.dataframe as dd
import pandas as pd
import csv
import re
import os
import argparse
import fcntl
import sys
import errno
import time
import numpy as np
import struct
from urllib.parse import unquote
sys.path.append("..")
from hashlib import sha256
from wikiq_util import IPCheck
from wikiq_util import TO_ENCODE
from wikiq_util import try_unquote
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
args = parser.parse_args()
return(args)
# This script does not do some of the things that might be useful that Jeremy's script did.
# We don't remove bots
# We don't exit on Tech Wiki
# We don't accept an EDITOR_IGNORE_LIST
# We don't have a username-userid mapping file
# We don't remove anonymous editors (though we do indicate IP edits as anon.
# We don't remove any rows, including for malformed data
if __name__ == "__main__":
args = parse_args()
id_dict = {}
if not args.no_cluster:
# set up dask distributed
from dask.distributed import Client, LocalCluster
import multiprocessing as mp
cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
client = Client(cluster)
input_file = args.input_file
d = dd.read_table(input_file, dtype={"anon":np.bool,
"articleid":int,
"deleted":bool,
"editor":str,
"minor":bool,
"namespace":np.int32,
"revert":bool,
"reverteds":str,
"revid":int,
"sha1":str,
"title":str},
true_values=["TRUE"],
false_values=["FALSE"],
parse_dates=["date_time"],
infer_datetime_format=True
)
if args.wiki is None:
wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
else:
wiki = args.wiki
d['wiki'] = wiki
for col in TO_ENCODE:
d[col+"old"] = d[col]
d[col] = d[col].apply(try_unquote, meta=(col,str))
d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
d['anon'] = (d['anon'] == True) | d['IPAnon']
d = d.drop('IPAnon',axis=1)
d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
d['timestamp'] = d['timestamp'].astype(int)
# create a new unique identifier by hashing the editor name or editor ip
# first sort by editor
d = d.set_index(d["date_time"])
d = d.map_partitions(lambda x: x.sort_index())
d['editor_sha'] = d['editor'].apply(lambda x:
sha256(x.encode()).hexdigest()
if x is not None
else None,
meta=("editor_sha",str)
)
editor_groups = d.groupby('editor')
d['editor_nth_edit'] = editor_groups.cumcount()
d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
editor_wiki_groups = d.groupby(['editor_sha','wiki'])
d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
d = editor_wiki_groups.apply(lambda df:
df.assign(
tminus_editor_wiki_edit=df.date_time.diff(1)
))
editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
d = editor_namespace_groups.apply(lambda df:
df.assign(
tminus_namespace_wiki_edit=df.date_time.diff(1)
))
editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
d['editor_nth_article_edit'] = editor_article_groups.cumcount()
d = editor_article_groups.apply(lambda df:
df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
d = d.persist()
if not os.path.exists(args.output_dr):
os.mkdir(args.output_dir
)
if args.output_format == "csv":
d_csv = d
for col in TO_ENCODE:
d_csv = d_csv.drop(col,axis=1)
d_csv[col] = d_csv[col+'old']
d.to_csv()
else:
for col in TO_ENCODE:
d = d.drop(col + 'old', axis=1)
d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
# for writing to csv we need to urlencode
if __name__ == '__main__':
main()

3
install.dask.sh Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
pip3 install --user cloudpickle toolz dask partd fastparquet pyarrow

File diff suppressed because it is too large Load Diff