Compare commits

...

15 Commits

Author SHA1 Message Date
df18d6e280 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:03:07 -07:00
3af71f03e0 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:02:48 -07:00
1d5a9b53b8 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:02:05 -07:00
cc551eef6e Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:01:52 -07:00
4c77c0f12e Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:01:07 -07:00
915c864ee5 add more variables and support for persistence 2018-08-31 16:00:47 -07:00
3d12865c4e add more variables and support for persistence 2018-08-31 15:57:48 -07:00
bc1f5428f0 add spark program for running group by users 2018-08-31 20:40:22 +00:00
ff689c71dd Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-14 14:44:37 -07:00
39b4e5698f Use dask to parallelize and scale user level datasets 2018-08-14 14:44:21 -07:00
daf1851cbb Use dask to parallelize and scale user level datasets 2018-08-14 14:37:03 -07:00
418fa020e5 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-12 21:34:12 -07:00
311810a36c refactor wikiq to seperate script from classes and functions. Code reuse in testing. 2018-08-12 21:33:19 -07:00
118b8b1722 move tests to test folder 2018-08-12 18:10:46 -07:00
f69e8b44a6 move tests to test folder 2018-08-12 18:05:59 -07:00
22 changed files with 3451449 additions and 298 deletions

83
bin/wikiq Executable file
View File

@ -0,0 +1,83 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import argparse
import sys
import os
sys.path.append("..")
from wikiq_util import calculate_persistence
from wikiq_util import WikiqIterator
from wikiq_util import WikiqPage
from wikiq_util import WikiqParser
from wikiq_util import open_input_file
from wikiq_util import open_output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
# arguments for the input direction
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
help="Directory for output files.")
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)")
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
parser.add_argument('-p', '--persistence', dest="persist", action="store_true",
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.")
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
parser.add_argument('--persistence-legacy', dest="persist_legacy", action="store_true",
help="Legacy behavior for persistence calculation. Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
args = parser.parse_args()
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
else:
output_dir = "."
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")

144
bin/wikiq_users Executable file
View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
import dask.dataframe as dd
import pandas as pd
import csv
import re
import os
import argparse
import fcntl
import sys
import errno
import time
import numpy as np
import struct
from urllib.parse import unquote
sys.path.append("..")
from hashlib import sha256
from wikiq_util import IPCheck
from wikiq_util import TO_ENCODE
from wikiq_util import try_unquote
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
args = parser.parse_args()
return(args)
# This script does not do some of the things that might be useful that Jeremy's script did.
# We don't remove bots
# We don't exit on Tech Wiki
# We don't accept an EDITOR_IGNORE_LIST
# We don't have a username-userid mapping file
# We don't remove anonymous editors (though we do indicate IP edits as anon.
# We don't remove any rows, including for malformed data
if __name__ == "__main__":
args = parse_args()
id_dict = {}
if not args.no_cluster:
# set up dask distributed
from dask.distributed import Client, LocalCluster
import multiprocessing as mp
cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
client = Client(cluster)
input_file = args.input_file
d = dd.read_table(input_file, dtype={"anon":np.bool,
"articleid":int,
"deleted":bool,
"editor":str,
"minor":bool,
"namespace":np.int32,
"revert":bool,
"reverteds":str,
"revid":int,
"sha1":str,
"title":str},
true_values=["TRUE"],
false_values=["FALSE"],
parse_dates=["date_time"],
infer_datetime_format=True
)
if args.wiki is None:
wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
else:
wiki = args.wiki
d['wiki'] = wiki
for col in TO_ENCODE:
d[col+"old"] = d[col]
d[col] = d[col].apply(try_unquote, meta=(col,str))
d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
d['anon'] = (d['anon'] == True) | d['IPAnon']
d = d.drop('IPAnon',axis=1)
d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
d['timestamp'] = d['timestamp'].astype(int)
# create a new unique identifier by hashing the editor name or editor ip
# first sort by editor
d = d.set_index(d["date_time"])
d = d.map_partitions(lambda x: x.sort_index())
d['editor_sha'] = d['editor'].apply(lambda x:
sha256(x.encode()).hexdigest()
if x is not None
else None,
meta=("editor_sha",str)
)
editor_groups = d.groupby('editor')
d['editor_nth_edit'] = editor_groups.cumcount()
d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
editor_wiki_groups = d.groupby(['editor_sha','wiki'])
d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
d = editor_wiki_groups.apply(lambda df:
df.assign(
tminus_editor_wiki_edit=df.date_time.diff(1)
))
editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
d = editor_namespace_groups.apply(lambda df:
df.assign(
tminus_namespace_wiki_edit=df.date_time.diff(1)
))
editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
d['editor_nth_article_edit'] = editor_article_groups.cumcount()
d = editor_article_groups.apply(lambda df:
df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
d = d.persist()
if not os.path.exists(args.output_dr):
os.mkdir(args.output_dir
)
if args.output_format == "csv":
d_csv = d
for col in TO_ENCODE:
d_csv = d_csv.drop(col,axis=1)
d_csv[col] = d_csv[col+'old']
d.to_csv()
else:
for col in TO_ENCODE:
d = d.drop(col + 'old', axis=1)
d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
# for writing to csv we need to urlencode
if __name__ == '__main__':
main()

3
install.dask.sh Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
pip3 install --user cloudpickle toolz dask partd fastparquet pyarrow

View File

@ -1,223 +0,0 @@
import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.util.testing import assert_frame_equal
from io import StringIO
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to sdtout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikipedia(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'ikwiki-20180301-pages-meta-history'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.bz2".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Basic(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --collapse-user"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence-legacy"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Malformed(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'twinpeaks'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
proc.wait()
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(unittest.TestCase):
def setUp(self):
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} --stdout"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_noargs(self):
call = self.base_call.format(self.input_file)
proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
outs = proc.stdout.decode("utf8")
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
print(baseline_file)
test = pd.read_table(StringIO(outs))
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
if __name__ == '__main__':
unittest.main()

144
tests/Wikiq_Test.py Normal file
View File

@ -0,0 +1,144 @@
import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.util.testing import assert_frame_equal
from io import StringIO
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to sdtout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikiq(unittest.TestCase):
def mkoutputdir(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
def setuptoutputfiles(self, suffix="xml.7z"):
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.{1}".format(self.wiki,suffix)
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def run_and_check_output(self, call, test_filename):
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Wikipedia(Test_Wikiq):
def setUp(self):
print(os.path.abspath("."))
self.mkoutputdir()
self.wiki = 'ikwiki-20180301-pages-meta-history'
self.setuptoutputfiles(suffix="xml.bz2")
self.base_call = "../bin/wikiq {0} -o {1}"
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Basic(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="sailormoon"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
print(call)
self.run_and_check_output(call, test_filename)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --collapse-user"
self.run_and_check_output(call, test_filename)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence-legacy"
self.run_and_check_output(call, test_filename)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence"
self.run_and_check_output(call, test_filename)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Malformed(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="twinpeaks"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
proc.wait()
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki = 'sailormoon'
self.setuptoutputfiles()
def test_noargs(self):
self.base_call = ["../bin/wikiq", self.input_file, "--stdout"]
proc = subprocess.Popen(self.base_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
outs = proc.stdout
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
test = pd.read_table(outs)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
if __name__ == '__main__':
unittest.main()

View File

Can't render this file because it is too large.

File diff suppressed because one or more lines are too long

1368
tests/dumps/twinpeaks.xml Normal file

File diff suppressed because it is too large Load Diff

5104
tests/dumps/twinpeaks.xml~ Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
#!/usr/bin/env bash
spark-submit --master spark://n0649:18899 wikiq_users_spark.py --output-format parquet -i "/com/output/wikiq-enwiki-persist-sequence-20180301/enwiki/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 --schema-opt persistence+collapse

View File

@ -0,0 +1,2 @@
#!/usr/bin/env bash
spark-submit benchmark_spark.py --output-format csv -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o "./out.tsv" --num-partitions 2

163
wikiq_users/wikiq_users_spark.py Executable file
View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Builds a user level dataset. Requires a functional spark installation.
"""
import sys
# add pyspark to your python path e.g.
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window
import pyspark.sql.functions as f
from pyspark.sql import types
import argparse
import glob
from os import mkdir
from os import path
from wikiq_util import PERSISTENCE_RADIUS
#read a table
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
# parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
# parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
args = parser.parse_args()
return(args)
if __name__ == "__main__":
args = parse_args()
conf = SparkConf().setAppName("Wiki Users Spark")
spark = SparkSession.builder.getOrCreate()
# test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
files = glob.glob(args.input_file)
files = [path.abspath(p) for p in files]
read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
# going to have to do some coercing of the schema
# build a schema
struct = types.StructType().add("anon",types.StringType(),True)
struct = struct.add("articleid",types.LongType(),True)
if read_collapse is True:
struct = struct.add("collapsed_revs", types.IntegerType(), True)
struct = struct.add("date_time",types.TimestampType(), True)
struct = struct.add("deleted",types.BooleanType(), True)
struct = struct.add("editor",types.StringType(),True)
struct = struct.add("editor_id",types.LongType(), True)
struct = struct.add("minor", types.BooleanType(), True)
struct = struct.add("namespace", types.LongType(), True)
struct = struct.add("revert", types.BooleanType(), True)
struct = struct.add("reverteds", types.StringType(), True)
struct = struct.add("revid", types.LongType(), True)
struct = struct.add("sha1", types.StringType(), True)
struct = struct.add("text_chars", types.LongType(), True)
struct = struct.add("title",types.StringType(), True)
if read_persistence is True:
struct = struct.add("token_revs", types.IntegerType(),True)
struct = struct.add("tokens_added", types.IntegerType(),True)
struct = struct.add("tokens_removed", types.IntegerType(),True)
struct = struct.add("tokens_window", types.IntegerType(),True)
reader = spark.read
df = reader.csv(files,
sep='\t',
inferSchema=False,
header=True,
mode="PERMISSIVE",
schema = struct)
df = df.repartition(args.num_partitions)
# replace na editor ids
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
# sort by datetime
df = df.orderBy(df.date_time.asc())
# create our window_specs
ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
art_win = Window.orderBy("date_time").partitionBy("articleid")
# assign which edit reverted what edit
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
reverteds_df = reverteds_df.drop("reverteds")
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
df = df.drop("reverted_id")
del(reverteds_df_explode)
reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
df = df.join(reverteds_df, on= ["revid"], how='left_outer')
del(reverteds_df)
# count reverts
reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
# articles total reverts
reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
# some kind of bad work around a bug
# see https://issues.apache.org/jira/browse/SPARK-14948
reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
df = df.drop("r_revid")
del(reverts_df)
# count edits
df = df.withColumn('year', f.year(df.date_time))
df = df.withColumn('month',f.month(df.date_time))
if not read_collapse:
df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_edit', f.rank().over(art_win))
else:
df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
# total editor's token_revs
if read_persistence:
df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
# output
if not path.exists(args.output_dir):
mkdir(args.output_dir)
if args.output_format == "csv" or args.output_format == "tsv":
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
# format == "parquet"
else:
df.write.parquet(args.output_dir, mode='overwrite')
# for writing to csv we need to urlencode

101
wikiq → wikiq_util.py Executable file → Normal file
View File

@ -1,33 +1,30 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import pdb
import argparse
import sys
import os, os.path
import re
from subprocess import Popen, PIPE
from collections import deque
from hashlib import sha1
from mwxml import Dump
from deltas.tokenizers import wikitext_split
from mwxml import Dump
import mwpersistence
import mwreverts
from urllib.parse import quote
TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS=7
from urllib.parse import unquote
from deltas import SequenceMatcher
TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS = 7
def try_unquote(obj):
if type(obj) is str:
obj = unquote(obj)
return obj.strip('\"')
else:
return
def calculate_persistence(tokens_added):
return(sum([(len(x.revisions)-1) for x in tokens_added]),
len(tokens_added))
class WikiqIterator():
def __init__(self, fh, collapse_user=False):
self.fh = fh
@ -49,6 +46,7 @@ class WikiqIterator():
def __next__(self):
return next(self._pages)
class WikiqPage():
__slots__ = ('id', 'title', 'namespace', 'redirect',
'restrictions', 'mwpage', '__revisions',
@ -334,69 +332,22 @@ def open_output_file(input_filename):
return output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
# arguments for the input direction
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
class IPCheck(object):
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
help="Directory for output files.")
# IP address regexes taken from https://gist.github.com/mnordhoff/2213179
ipv4_address = re.compile('^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$')
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)")
ipv6_address_or_addrz = re.compile('^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)(?:%25(?:[A-Za-z0-9\\-._~]|%[0-9A-Fa-f]{2})+)?$')
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
@staticmethod
def is_ip(username):
if not type(username) is str:
return False
parser.add_argument('-p', '--persistence', dest="persist", action="store_true",
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.")
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
parser.add_argument('--persistence-legacy', dest="persist_legacy", action="store_true",
help="Legacy behavior for persistence calculation. Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
args = parser.parse_args()
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
'''Check if a username is an ip (v4 or v6) address. We use this as
a marker of whether the user is anonymous.'''
if IPCheck.ipv4_address.match(username) or IPCheck.ipv6_address_or_addrz.match(username):
return True
else:
output_dir = "."
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")
return False