11 Commits

24 changed files with 20469 additions and 3451459 deletions

View File

@@ -1,83 +0,0 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import argparse
import sys
import os
sys.path.append("..")
from wikiq_util import calculate_persistence
from wikiq_util import WikiqIterator
from wikiq_util import WikiqPage
from wikiq_util import WikiqParser
from wikiq_util import open_input_file
from wikiq_util import open_output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
# arguments for the input direction
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
help="Directory for output files.")
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)")
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
parser.add_argument('-p', '--persistence', dest="persist", action="store_true",
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.")
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
parser.add_argument('--persistence-legacy', dest="persist_legacy", action="store_true",
help="Legacy behavior for persistence calculation. Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
args = parser.parse_args()
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
else:
output_dir = "."
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")

View File

@@ -1,144 +0,0 @@
#!/usr/bin/env python3
import dask.dataframe as dd
import pandas as pd
import csv
import re
import os
import argparse
import fcntl
import sys
import errno
import time
import numpy as np
import struct
from urllib.parse import unquote
sys.path.append("..")
from hashlib import sha256
from wikiq_util import IPCheck
from wikiq_util import TO_ENCODE
from wikiq_util import try_unquote
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
args = parser.parse_args()
return(args)
# This script does not do some of the things that might be useful that Jeremy's script did.
# We don't remove bots
# We don't exit on Tech Wiki
# We don't accept an EDITOR_IGNORE_LIST
# We don't have a username-userid mapping file
# We don't remove anonymous editors (though we do indicate IP edits as anon.
# We don't remove any rows, including for malformed data
if __name__ == "__main__":
args = parse_args()
id_dict = {}
if not args.no_cluster:
# set up dask distributed
from dask.distributed import Client, LocalCluster
import multiprocessing as mp
cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
client = Client(cluster)
input_file = args.input_file
d = dd.read_table(input_file, dtype={"anon":np.bool,
"articleid":int,
"deleted":bool,
"editor":str,
"minor":bool,
"namespace":np.int32,
"revert":bool,
"reverteds":str,
"revid":int,
"sha1":str,
"title":str},
true_values=["TRUE"],
false_values=["FALSE"],
parse_dates=["date_time"],
infer_datetime_format=True
)
if args.wiki is None:
wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
else:
wiki = args.wiki
d['wiki'] = wiki
for col in TO_ENCODE:
d[col+"old"] = d[col]
d[col] = d[col].apply(try_unquote, meta=(col,str))
d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
d['anon'] = (d['anon'] == True) | d['IPAnon']
d = d.drop('IPAnon',axis=1)
d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
d['timestamp'] = d['timestamp'].astype(int)
# create a new unique identifier by hashing the editor name or editor ip
# first sort by editor
d = d.set_index(d["date_time"])
d = d.map_partitions(lambda x: x.sort_index())
d['editor_sha'] = d['editor'].apply(lambda x:
sha256(x.encode()).hexdigest()
if x is not None
else None,
meta=("editor_sha",str)
)
editor_groups = d.groupby('editor')
d['editor_nth_edit'] = editor_groups.cumcount()
d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
editor_wiki_groups = d.groupby(['editor_sha','wiki'])
d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
d = editor_wiki_groups.apply(lambda df:
df.assign(
tminus_editor_wiki_edit=df.date_time.diff(1)
))
editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
d = editor_namespace_groups.apply(lambda df:
df.assign(
tminus_namespace_wiki_edit=df.date_time.diff(1)
))
editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
d['editor_nth_article_edit'] = editor_article_groups.cumcount()
d = editor_article_groups.apply(lambda df:
df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
d = d.persist()
if not os.path.exists(args.output_dr):
os.mkdir(args.output_dir
)
if args.output_format == "csv":
d_csv = d
for col in TO_ENCODE:
d_csv = d_csv.drop(col,axis=1)
d_csv[col] = d_csv[col+'old']
d.to_csv()
else:
for col in TO_ENCODE:
d = d.drop(col + 'old', axis=1)
d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
# for writing to csv we need to urlencode
if __name__ == '__main__':
main()

View File

@@ -1,3 +0,0 @@
#!/usr/bin/env bash
pip3 install --user cloudpickle toolz dask partd fastparquet pyarrow

264
test/Wikiq_Unit_Test.py Normal file
View File

@@ -0,0 +1,264 @@
import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.util.testing import assert_frame_equal
from io import StringIO
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to sdtout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikipedia(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'ikwiki-20180301-pages-meta-history'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.bz2".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_WP_namespaces(self):
print(os.path.abspath('.'))
test_filename = "namespaces_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " -n 0 -n 1"
print(call)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
num_wrong_ns = sum(~ test.namespace.isin({0,1}))
self.assertEqual(num_wrong_ns, 0)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Basic(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --collapse-user"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_pwr_segment(self):
test_filename = "persistence_segment_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence segment"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence legacy"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Malformed(unittest.TestCase):
def setUp(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
self.wiki = 'twinpeaks'
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} -o {1}"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
proc.wait()
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(unittest.TestCase):
def setUp(self):
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = "../wikiq {0} --stdout"
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def test_noargs(self):
call = self.base_call.format(self.input_file)
proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
outs = proc.stdout.decode("utf8")
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
print(baseline_file)
test = pd.read_table(StringIO(outs))
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff

View File

Can't render this file because it is too large.

File diff suppressed because it is too large Load Diff

View File

@@ -1,144 +0,0 @@
import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.util.testing import assert_frame_equal
from io import StringIO
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to sdtout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikiq(unittest.TestCase):
def mkoutputdir(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
def setuptoutputfiles(self, suffix="xml.7z"):
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.{1}".format(self.wiki,suffix)
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def run_and_check_output(self, call, test_filename):
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Wikipedia(Test_Wikiq):
def setUp(self):
print(os.path.abspath("."))
self.mkoutputdir()
self.wiki = 'ikwiki-20180301-pages-meta-history'
self.setuptoutputfiles(suffix="xml.bz2")
self.base_call = "../bin/wikiq {0} -o {1}"
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Basic(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="sailormoon"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
print(call)
self.run_and_check_output(call, test_filename)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --collapse-user"
self.run_and_check_output(call, test_filename)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence-legacy"
self.run_and_check_output(call, test_filename)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence"
self.run_and_check_output(call, test_filename)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Malformed(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="twinpeaks"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
proc.wait()
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki = 'sailormoon'
self.setuptoutputfiles()
def test_noargs(self):
self.base_call = ["../bin/wikiq", self.input_file, "--stdout"]
proc = subprocess.Popen(self.base_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
outs = proc.stdout
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
test = pd.read_table(outs)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
if __name__ == '__main__':
unittest.main()

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

164
wikiq_util.py → wikiq Normal file → Executable file
View File

@@ -1,25 +1,34 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import argparse
import sys import sys
import os, os.path
import re import re
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from deltas.tokenizers import wikitext_split
from mwxml import Dump from mwxml import Dump
from deltas.tokenizers import wikitext_split
import mwpersistence import mwpersistence
import mwreverts import mwreverts
from urllib.parse import quote from urllib.parse import quote
from urllib.parse import unquote
from deltas import SequenceMatcher
TO_ENCODE = ('title', 'editor') TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS = 7 PERSISTENCE_RADIUS=7
from deltas import SequenceMatcher
from deltas import SegmentMatcher
def try_unquote(obj): class PersistMethod:
if type(obj) is str: none = 0
obj = unquote(obj) sequence = 1
return obj.strip('\"') segment = 2
else: legacy = 3
return
def calculate_persistence(tokens_added): def calculate_persistence(tokens_added):
return(sum([(len(x.revisions)-1) for x in tokens_added]), return(sum([(len(x.revisions)-1) for x in tokens_added]),
@@ -46,7 +55,6 @@ class WikiqIterator():
def __next__(self): def __next__(self):
return next(self._pages) return next(self._pages)
class WikiqPage(): class WikiqPage():
__slots__ = ('id', 'title', 'namespace', 'redirect', __slots__ = ('id', 'title', 'namespace', 'redirect',
'restrictions', 'mwpage', '__revisions', 'restrictions', 'mwpage', '__revisions',
@@ -55,6 +63,11 @@ class WikiqPage():
def __init__(self, page, namespace_map, collapse_user=False): def __init__(self, page, namespace_map, collapse_user=False):
self.id = page.id self.id = page.id
self.namespace = page.namespace self.namespace = page.namespace
# following mwxml, we assume namespace 0 in cases where
# page.namespace is inconsistent with namespace_map
if page.namespace not in namespace_map:
self.title = page.title
page.namespace = 0
if page.namespace != 0: if page.namespace != 0:
self.title = ':'.join([namespace_map[page.namespace], page.title]) self.title = ':'.join([namespace_map[page.namespace], page.title])
else: else:
@@ -115,16 +128,23 @@ class WikiqPage():
class WikiqParser(): class WikiqParser():
def __init__(self, input_file, output_file, collapse_user=False, persist=False, urlencode=False, persist_legacy=False): def __init__(self, input_file, output_file, collapse_user=False, persist=None, urlencode=False, namespaces = None):
"""
Parameters:
persist : what persistence method to use. Takes a PersistMethod value
"""
self.input_file = input_file self.input_file = input_file
self.output_file = output_file self.output_file = output_file
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.persist = persist self.persist = persist
self.persist_legacy = persist_legacy
self.printed_header = False self.printed_header = False
self.namespaces = [] self.namespaces = []
self.urlencode = urlencode self.urlencode = urlencode
if namespaces is not None:
self.namespace_filter = set(namespaces)
else:
self.namespace_filter = None
def __get_namespace_from_title(self, title): def __get_namespace_from_title(self, title):
default_ns = None default_ns = None
@@ -160,15 +180,27 @@ class WikiqParser():
# Iterate through pages # Iterate through pages
for page in dump: for page in dump:
namespace = page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title)
# skip namespaces not in the filter
if self.namespace_filter is not None:
if namespace not in self.namespace_filter:
continue
rev_detector = mwreverts.Detector() rev_detector = mwreverts.Detector()
if self.persist or self.persist_legacy: if self.persist != PersistMethod.none:
window = deque(maxlen=PERSISTENCE_RADIUS) window = deque(maxlen=PERSISTENCE_RADIUS)
if not self.persist_legacy: if self.persist == PersistMethod.sequence:
state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split), state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
revert_radius=PERSISTENCE_RADIUS) revert_radius=PERSISTENCE_RADIUS)
elif self.persist == PersistMethod.segment:
state = mwpersistence.DiffState(SegmentMatcher(tokenizer = wikitext_split),
revert_radius=PERSISTENCE_RADIUS)
# self.persist == PersistMethod.legacy
else: else:
from mw.lib import persistence from mw.lib import persistence
state = persistence.State() state = persistence.State()
@@ -181,7 +213,7 @@ class WikiqParser():
'articleid' : page.id, 'articleid' : page.id,
'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id, 'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
'title' : '"' + page.title + '"', 'title' : '"' + page.title + '"',
'namespace' : page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title), 'namespace' : namespace,
'deleted' : "TRUE" if rev.deleted.text else "FALSE" } 'deleted' : "TRUE" if rev.deleted.text else "FALSE" }
# if revisions are deleted, /many/ things will be missing # if revisions are deleted, /many/ things will be missing
@@ -241,14 +273,13 @@ class WikiqParser():
if self.collapse_user: if self.collapse_user:
rev_data['collapsed_revs'] = rev.collapsed_revs rev_data['collapsed_revs'] = rev.collapsed_revs
if self.persist or self.persist_legacy: if self.persist != PersistMethod.none:
if rev.deleted.text: if rev.deleted.text:
for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]: for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]:
old_rev_data[k] = None old_rev_data[k] = None
else: else:
if not self.persist_legacy: if self.persist != PersistMethod.legacy:
_, tokens_added, tokens_removed = state.update(rev.text, rev.id) _, tokens_added, tokens_removed = state.update(rev.text, rev.id)
else: else:
@@ -273,7 +304,7 @@ class WikiqParser():
rev_count += 1 rev_count += 1
if self.persist or self.persist_legacy: if self.persist != PersistMethod.none:
# print out metadata for the last RADIUS revisions # print out metadata for the last RADIUS revisions
for i, item in enumerate(window): for i, item in enumerate(window):
# if the window was full, we've already printed item 0 # if the window was full, we've already printed item 0
@@ -332,22 +363,87 @@ def open_output_file(input_filename):
return output_file return output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
class IPCheck(object): # arguments for the input direction
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
# IP address regexes taken from https://gist.github.com/mnordhoff/2213179 parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
ipv4_address = re.compile('^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$') help="Directory for output files.")
ipv6_address_or_addrz = re.compile('^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)(?:%25(?:[A-Za-z0-9\\-._~]|%[0-9A-Fa-f]{2})+)?$') parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)")
@staticmethod parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
def is_ip(username): help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
if not type(username) is str:
return False
'''Check if a username is an ip (v4 or v6) address. We use this as parser.add_argument('-p', '--persistence', dest="persist", default=None, const='', type=str, choices = ['','segment','sequence','legacy'], nargs='?',
a marker of whether the user is anonymous.''' help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure. This may by slow. Use -p=segment for advanced persistence calculation method that is robust to content moves. This might be very slow. Use -p=legacy for legacy behavior.")
if IPCheck.ipv4_address.match(username) or IPCheck.ipv6_address_or_addrz.match(username):
return True parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
parser.add_argument('-n', '--namespace-include', dest="namespace_filter", type=int, action='append',
help="Id number of namspace to include.")
args = parser.parse_args()
# set persistence method
if args.persist is None:
persist = PersistMethod.none
elif args.persist == "segment":
persist = PersistMethod.segment
elif args.persist == "legacy":
persist = PersistMethod.legacy
else:
persist = PersistMethod.sequence
if args.namespace_filter is not None:
namespaces = args.namespace_filter
else:
namespaces = None
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
else: else:
return False output_dir = "."
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=persist,
urlencode=args.urlencode,
namespaces = namespaces)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode,
namespaces = namespaces)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")

View File

@@ -1,2 +0,0 @@
#!/usr/bin/env bash
spark-submit --master spark://n0649:18899 wikiq_users_spark.py --output-format parquet -i "/com/output/wikiq-enwiki-persist-sequence-20180301/enwiki/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 --schema-opt persistence+collapse

View File

@@ -1,2 +0,0 @@
#!/usr/bin/env bash
spark-submit benchmark_spark.py --output-format csv -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o "./out.tsv" --num-partitions 2

View File

@@ -1,163 +0,0 @@
#!/usr/bin/env python3
"""
Builds a user level dataset. Requires a functional spark installation.
"""
import sys
# add pyspark to your python path e.g.
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window
import pyspark.sql.functions as f
from pyspark.sql import types
import argparse
import glob
from os import mkdir
from os import path
from wikiq_util import PERSISTENCE_RADIUS
#read a table
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
# parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
# parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
args = parser.parse_args()
return(args)
if __name__ == "__main__":
args = parse_args()
conf = SparkConf().setAppName("Wiki Users Spark")
spark = SparkSession.builder.getOrCreate()
# test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
files = glob.glob(args.input_file)
files = [path.abspath(p) for p in files]
read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
# going to have to do some coercing of the schema
# build a schema
struct = types.StructType().add("anon",types.StringType(),True)
struct = struct.add("articleid",types.LongType(),True)
if read_collapse is True:
struct = struct.add("collapsed_revs", types.IntegerType(), True)
struct = struct.add("date_time",types.TimestampType(), True)
struct = struct.add("deleted",types.BooleanType(), True)
struct = struct.add("editor",types.StringType(),True)
struct = struct.add("editor_id",types.LongType(), True)
struct = struct.add("minor", types.BooleanType(), True)
struct = struct.add("namespace", types.LongType(), True)
struct = struct.add("revert", types.BooleanType(), True)
struct = struct.add("reverteds", types.StringType(), True)
struct = struct.add("revid", types.LongType(), True)
struct = struct.add("sha1", types.StringType(), True)
struct = struct.add("text_chars", types.LongType(), True)
struct = struct.add("title",types.StringType(), True)
if read_persistence is True:
struct = struct.add("token_revs", types.IntegerType(),True)
struct = struct.add("tokens_added", types.IntegerType(),True)
struct = struct.add("tokens_removed", types.IntegerType(),True)
struct = struct.add("tokens_window", types.IntegerType(),True)
reader = spark.read
df = reader.csv(files,
sep='\t',
inferSchema=False,
header=True,
mode="PERMISSIVE",
schema = struct)
df = df.repartition(args.num_partitions)
# replace na editor ids
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
# sort by datetime
df = df.orderBy(df.date_time.asc())
# create our window_specs
ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
art_win = Window.orderBy("date_time").partitionBy("articleid")
# assign which edit reverted what edit
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
reverteds_df = reverteds_df.drop("reverteds")
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
df = df.drop("reverted_id")
del(reverteds_df_explode)
reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
df = df.join(reverteds_df, on= ["revid"], how='left_outer')
del(reverteds_df)
# count reverts
reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
# articles total reverts
reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
# some kind of bad work around a bug
# see https://issues.apache.org/jira/browse/SPARK-14948
reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
df = df.drop("r_revid")
del(reverts_df)
# count edits
df = df.withColumn('year', f.year(df.date_time))
df = df.withColumn('month',f.month(df.date_time))
if not read_collapse:
df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_edit', f.rank().over(art_win))
else:
df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
# total editor's token_revs
if read_persistence:
df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
# output
if not path.exists(args.output_dir):
mkdir(args.output_dir)
if args.output_format == "csv" or args.output_format == "tsv":
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
# format == "parquet"
else:
df.write.parquet(args.output_dir, mode='overwrite')
# for writing to csv we need to urlencode