Compare commits
11 Commits
user_level
...
advanced_p
| Author | SHA1 | Date | |
|---|---|---|---|
| f7f5bf8fd4 | |||
|
|
f784c77f60 | ||
| 317bafb50d | |||
| 7cd0bf3b9e | |||
| d93769c21f | |||
|
|
afd40c1a45 | ||
|
|
e4222c45dd | ||
|
|
829ffcffae | ||
|
|
776b73519a | ||
|
|
5b6aaad862 | ||
| f468d1a5b6 |
83
bin/wikiq
83
bin/wikiq
@@ -1,83 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# original wikiq headers are: title articleid revid date_time anon
|
|
||||||
# editor editor_id minor text_size text_entropy text_md5 reversion
|
|
||||||
# additions_size deletions_size
|
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append("..")
|
|
||||||
from wikiq_util import calculate_persistence
|
|
||||||
from wikiq_util import WikiqIterator
|
|
||||||
from wikiq_util import WikiqPage
|
|
||||||
from wikiq_util import WikiqParser
|
|
||||||
from wikiq_util import open_input_file
|
|
||||||
from wikiq_util import open_output_file
|
|
||||||
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
|
|
||||||
|
|
||||||
# arguments for the input direction
|
|
||||||
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
|
|
||||||
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
|
|
||||||
|
|
||||||
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
|
|
||||||
help="Directory for output files.")
|
|
||||||
|
|
||||||
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
|
|
||||||
help="Write output to standard out (do not create dump file)")
|
|
||||||
|
|
||||||
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
|
|
||||||
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
|
|
||||||
|
|
||||||
parser.add_argument('-p', '--persistence', dest="persist", action="store_true",
|
|
||||||
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.")
|
|
||||||
|
|
||||||
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
|
|
||||||
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
|
|
||||||
|
|
||||||
parser.add_argument('--persistence-legacy', dest="persist_legacy", action="store_true",
|
|
||||||
help="Legacy behavior for persistence calculation. Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if len(args.dumpfiles) > 0:
|
|
||||||
for filename in args.dumpfiles:
|
|
||||||
input_file = open_input_file(filename)
|
|
||||||
|
|
||||||
# open directory for output
|
|
||||||
if args.output_dir:
|
|
||||||
output_dir = args.output_dir[0]
|
|
||||||
else:
|
|
||||||
output_dir = "."
|
|
||||||
|
|
||||||
print("Processing file: %s" % filename, file=sys.stderr)
|
|
||||||
|
|
||||||
if args.stdout:
|
|
||||||
output_file = sys.stdout
|
|
||||||
else:
|
|
||||||
filename = os.path.join(output_dir, os.path.basename(filename))
|
|
||||||
output_file = open_output_file(filename)
|
|
||||||
|
|
||||||
wikiq = WikiqParser(input_file, output_file,
|
|
||||||
collapse_user=args.collapse_user,
|
|
||||||
persist=args.persist,
|
|
||||||
persist_legacy=args.persist_legacy,
|
|
||||||
urlencode=args.urlencode)
|
|
||||||
|
|
||||||
|
|
||||||
wikiq.process()
|
|
||||||
|
|
||||||
# close things
|
|
||||||
input_file.close()
|
|
||||||
output_file.close()
|
|
||||||
else:
|
|
||||||
wikiq = WikiqParser(sys.stdin, sys.stdout,
|
|
||||||
collapse_user=args.collapse_user,
|
|
||||||
persist=args.persist,
|
|
||||||
persist_legacy=args.persist_legacy,
|
|
||||||
urlencode=args.urlencode)
|
|
||||||
wikiq.process()
|
|
||||||
|
|
||||||
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
|
|
||||||
# stop_words = stop_words.split(",")
|
|
||||||
144
bin/wikiq_users
144
bin/wikiq_users
@@ -1,144 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
import dask.dataframe as dd
|
|
||||||
import pandas as pd
|
|
||||||
import csv
|
|
||||||
import re
|
|
||||||
import os
|
|
||||||
import argparse
|
|
||||||
import fcntl
|
|
||||||
import sys
|
|
||||||
import errno
|
|
||||||
import time
|
|
||||||
import numpy as np
|
|
||||||
import struct
|
|
||||||
from urllib.parse import unquote
|
|
||||||
sys.path.append("..")
|
|
||||||
from hashlib import sha256
|
|
||||||
|
|
||||||
from wikiq_util import IPCheck
|
|
||||||
from wikiq_util import TO_ENCODE
|
|
||||||
from wikiq_util import try_unquote
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
|
|
||||||
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
|
|
||||||
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
|
|
||||||
parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
|
|
||||||
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
|
|
||||||
parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
|
|
||||||
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
|
|
||||||
args = parser.parse_args()
|
|
||||||
return(args)
|
|
||||||
|
|
||||||
# This script does not do some of the things that might be useful that Jeremy's script did.
|
|
||||||
# We don't remove bots
|
|
||||||
# We don't exit on Tech Wiki
|
|
||||||
# We don't accept an EDITOR_IGNORE_LIST
|
|
||||||
# We don't have a username-userid mapping file
|
|
||||||
# We don't remove anonymous editors (though we do indicate IP edits as anon.
|
|
||||||
# We don't remove any rows, including for malformed data
|
|
||||||
if __name__ == "__main__":
|
|
||||||
|
|
||||||
args = parse_args()
|
|
||||||
id_dict = {}
|
|
||||||
|
|
||||||
if not args.no_cluster:
|
|
||||||
# set up dask distributed
|
|
||||||
from dask.distributed import Client, LocalCluster
|
|
||||||
import multiprocessing as mp
|
|
||||||
cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
|
|
||||||
client = Client(cluster)
|
|
||||||
|
|
||||||
input_file = args.input_file
|
|
||||||
d = dd.read_table(input_file, dtype={"anon":np.bool,
|
|
||||||
"articleid":int,
|
|
||||||
"deleted":bool,
|
|
||||||
"editor":str,
|
|
||||||
"minor":bool,
|
|
||||||
"namespace":np.int32,
|
|
||||||
"revert":bool,
|
|
||||||
"reverteds":str,
|
|
||||||
"revid":int,
|
|
||||||
"sha1":str,
|
|
||||||
"title":str},
|
|
||||||
true_values=["TRUE"],
|
|
||||||
false_values=["FALSE"],
|
|
||||||
parse_dates=["date_time"],
|
|
||||||
infer_datetime_format=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if args.wiki is None:
|
|
||||||
wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
|
|
||||||
else:
|
|
||||||
wiki = args.wiki
|
|
||||||
|
|
||||||
d['wiki'] = wiki
|
|
||||||
|
|
||||||
for col in TO_ENCODE:
|
|
||||||
d[col+"old"] = d[col]
|
|
||||||
d[col] = d[col].apply(try_unquote, meta=(col,str))
|
|
||||||
|
|
||||||
d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
|
|
||||||
d['anon'] = (d['anon'] == True) | d['IPAnon']
|
|
||||||
d = d.drop('IPAnon',axis=1)
|
|
||||||
d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
|
|
||||||
d['timestamp'] = d['timestamp'].astype(int)
|
|
||||||
# create a new unique identifier by hashing the editor name or editor ip
|
|
||||||
|
|
||||||
# first sort by editor
|
|
||||||
d = d.set_index(d["date_time"])
|
|
||||||
d = d.map_partitions(lambda x: x.sort_index())
|
|
||||||
|
|
||||||
d['editor_sha'] = d['editor'].apply(lambda x:
|
|
||||||
sha256(x.encode()).hexdigest()
|
|
||||||
if x is not None
|
|
||||||
else None,
|
|
||||||
meta=("editor_sha",str)
|
|
||||||
)
|
|
||||||
|
|
||||||
editor_groups = d.groupby('editor')
|
|
||||||
d['editor_nth_edit'] = editor_groups.cumcount()
|
|
||||||
d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
|
|
||||||
|
|
||||||
editor_wiki_groups = d.groupby(['editor_sha','wiki'])
|
|
||||||
d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
|
|
||||||
d = editor_wiki_groups.apply(lambda df:
|
|
||||||
df.assign(
|
|
||||||
tminus_editor_wiki_edit=df.date_time.diff(1)
|
|
||||||
))
|
|
||||||
|
|
||||||
editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
|
|
||||||
d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
|
|
||||||
d = editor_namespace_groups.apply(lambda df:
|
|
||||||
df.assign(
|
|
||||||
tminus_namespace_wiki_edit=df.date_time.diff(1)
|
|
||||||
))
|
|
||||||
|
|
||||||
editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
|
|
||||||
d['editor_nth_article_edit'] = editor_article_groups.cumcount()
|
|
||||||
d = editor_article_groups.apply(lambda df:
|
|
||||||
df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
|
|
||||||
|
|
||||||
|
|
||||||
d = d.persist()
|
|
||||||
|
|
||||||
if not os.path.exists(args.output_dr):
|
|
||||||
os.mkdir(args.output_dir
|
|
||||||
)
|
|
||||||
|
|
||||||
if args.output_format == "csv":
|
|
||||||
d_csv = d
|
|
||||||
for col in TO_ENCODE:
|
|
||||||
d_csv = d_csv.drop(col,axis=1)
|
|
||||||
d_csv[col] = d_csv[col+'old']
|
|
||||||
d.to_csv()
|
|
||||||
else:
|
|
||||||
for col in TO_ENCODE:
|
|
||||||
d = d.drop(col + 'old', axis=1)
|
|
||||||
|
|
||||||
d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
|
|
||||||
|
|
||||||
# for writing to csv we need to urlencode
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
@@ -1,3 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
pip3 install --user cloudpickle toolz dask partd fastparquet pyarrow
|
|
||||||
|
|
||||||
264
test/Wikiq_Unit_Test.py
Normal file
264
test/Wikiq_Unit_Test.py
Normal file
@@ -0,0 +1,264 @@
|
|||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
from shutil import copyfile
|
||||||
|
import pandas as pd
|
||||||
|
from pandas.util.testing import assert_frame_equal
|
||||||
|
from io import StringIO
|
||||||
|
|
||||||
|
# with / without pwr DONE
|
||||||
|
# with / without url encode DONE
|
||||||
|
# with / without collapse user DONE
|
||||||
|
# with output to sdtout DONE
|
||||||
|
# note that the persistence radius is 7 by default
|
||||||
|
# reading various file formats including
|
||||||
|
# 7z, gz, bz2, xml DONE
|
||||||
|
# wikia and wikipedia data DONE
|
||||||
|
# malformed xmls DONE
|
||||||
|
|
||||||
|
class Test_Wikipedia(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
if not os.path.exists("test_output"):
|
||||||
|
os.mkdir("test_output")
|
||||||
|
|
||||||
|
self.wiki = 'ikwiki-20180301-pages-meta-history'
|
||||||
|
self.wikiq_out_name = self.wiki + ".tsv"
|
||||||
|
self.test_output_dir = os.path.join(".", "test_output")
|
||||||
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
||||||
|
|
||||||
|
self.infile = "{0}.xml.bz2".format(self.wiki)
|
||||||
|
self.base_call = "../wikiq {0} -o {1}"
|
||||||
|
self.input_dir = "dumps"
|
||||||
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
||||||
|
self.baseline_output_dir = "baseline_output"
|
||||||
|
|
||||||
|
def test_WP_url_encode(self):
|
||||||
|
test_filename = "url-encode_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --url-encode"
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
# as a test let's make sure that we get equal data frames
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
def test_WP_namespaces(self):
|
||||||
|
print(os.path.abspath('.'))
|
||||||
|
test_filename = "namespaces_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " -n 0 -n 1"
|
||||||
|
print(call)
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
# as a test let's make sure that we get equal data frames
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
num_wrong_ns = sum(~ test.namespace.isin({0,1}))
|
||||||
|
self.assertEqual(num_wrong_ns, 0)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_Basic(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
if not os.path.exists("test_output"):
|
||||||
|
os.mkdir("test_output")
|
||||||
|
|
||||||
|
self.wiki = 'sailormoon'
|
||||||
|
self.wikiq_out_name = self.wiki + ".tsv"
|
||||||
|
self.test_output_dir = os.path.join(".", "test_output")
|
||||||
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
||||||
|
|
||||||
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
||||||
|
self.base_call = "../wikiq {0} -o {1}"
|
||||||
|
self.input_dir = "dumps"
|
||||||
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
||||||
|
self.baseline_output_dir = "baseline_output"
|
||||||
|
|
||||||
|
def test_noargs(self):
|
||||||
|
|
||||||
|
test_filename = "noargs_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
|
||||||
|
def test_collapse_user(self):
|
||||||
|
test_filename = "collapse-user_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --collapse-user"
|
||||||
|
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
def test_pwr_segment(self):
|
||||||
|
test_filename = "persistence_segment_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --persistence segment"
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
def test_pwr_legacy(self):
|
||||||
|
test_filename = "persistence_legacy_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --persistence legacy"
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
def test_pwr(self):
|
||||||
|
test_filename = "persistence_" + self.wikiq_out_name
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --persistence"
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
|
||||||
|
def test_url_encode(self):
|
||||||
|
test_filename = "url-encode_" + self.wikiq_out_name
|
||||||
|
|
||||||
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
||||||
|
if os.path.exists(test_file):
|
||||||
|
os.remove(test_file)
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
call = call + " --url-encode"
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
copyfile(self.call_output, test_file)
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
||||||
|
test = pd.read_table(test_file)
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_Malformed(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
if not os.path.exists("test_output"):
|
||||||
|
os.mkdir("test_output")
|
||||||
|
|
||||||
|
self.wiki = 'twinpeaks'
|
||||||
|
self.wikiq_out_name = self.wiki + ".tsv"
|
||||||
|
self.test_output_dir = os.path.join(".", "test_output")
|
||||||
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
||||||
|
|
||||||
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
||||||
|
self.base_call = "../wikiq {0} -o {1}"
|
||||||
|
self.input_dir = "dumps"
|
||||||
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
||||||
|
|
||||||
|
|
||||||
|
def test_malformed_noargs(self):
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
||||||
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
||||||
|
proc.wait()
|
||||||
|
outs, errs = proc.communicate()
|
||||||
|
errlines = str(errs).split("\\n")
|
||||||
|
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
|
||||||
|
|
||||||
|
class Test_Stdout(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.wiki = 'sailormoon'
|
||||||
|
self.wikiq_out_name = self.wiki + ".tsv"
|
||||||
|
|
||||||
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
||||||
|
self.base_call = "../wikiq {0} --stdout"
|
||||||
|
self.input_dir = "dumps"
|
||||||
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
||||||
|
self.baseline_output_dir = "baseline_output"
|
||||||
|
|
||||||
|
def test_noargs(self):
|
||||||
|
|
||||||
|
call = self.base_call.format(self.input_file)
|
||||||
|
proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
|
||||||
|
outs = proc.stdout.decode("utf8")
|
||||||
|
|
||||||
|
test_file = "noargs_" + self.wikiq_out_name
|
||||||
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
||||||
|
print(baseline_file)
|
||||||
|
test = pd.read_table(StringIO(outs))
|
||||||
|
baseline = pd.read_table(baseline_file)
|
||||||
|
assert_frame_equal(test,baseline)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
15421
test/baseline_output/namespaces_ikwiki-20180301-pages-meta-history.tsv
Normal file
15421
test/baseline_output/namespaces_ikwiki-20180301-pages-meta-history.tsv
Normal file
File diff suppressed because it is too large
Load Diff
|
Can't render this file because it is too large.
|
|
Can't render this file because it is too large.
|
|
Can't render this file because it is too large.
|
4652
test/baseline_output/persistence_segment_sailormoon.tsv
Normal file
4652
test/baseline_output/persistence_segment_sailormoon.tsv
Normal file
File diff suppressed because it is too large
Load Diff
|
Can't render this file because it is too large.
|
|
Can't render this file because it is too large.
|
@@ -1,144 +0,0 @@
|
|||||||
import unittest
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
from shutil import copyfile
|
|
||||||
import pandas as pd
|
|
||||||
from pandas.util.testing import assert_frame_equal
|
|
||||||
from io import StringIO
|
|
||||||
|
|
||||||
# with / without pwr DONE
|
|
||||||
# with / without url encode DONE
|
|
||||||
# with / without collapse user DONE
|
|
||||||
# with output to sdtout DONE
|
|
||||||
# note that the persistence radius is 7 by default
|
|
||||||
# reading various file formats including
|
|
||||||
# 7z, gz, bz2, xml DONE
|
|
||||||
# wikia and wikipedia data DONE
|
|
||||||
# malformed xmls DONE
|
|
||||||
|
|
||||||
class Test_Wikiq(unittest.TestCase):
|
|
||||||
|
|
||||||
def mkoutputdir(self):
|
|
||||||
if not os.path.exists("test_output"):
|
|
||||||
os.mkdir("test_output")
|
|
||||||
|
|
||||||
def setuptoutputfiles(self, suffix="xml.7z"):
|
|
||||||
self.wikiq_out_name = self.wiki + ".tsv"
|
|
||||||
self.test_output_dir = os.path.join(".", "test_output")
|
|
||||||
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
|
||||||
self.infile = "{0}.{1}".format(self.wiki,suffix)
|
|
||||||
self.input_dir = "dumps"
|
|
||||||
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
|
||||||
self.baseline_output_dir = "baseline_output"
|
|
||||||
|
|
||||||
def run_and_check_output(self, call, test_filename):
|
|
||||||
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
||||||
if os.path.exists(test_file):
|
|
||||||
os.remove(test_file)
|
|
||||||
|
|
||||||
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
||||||
proc.wait()
|
|
||||||
|
|
||||||
copyfile(self.call_output, test_file)
|
|
||||||
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
||||||
|
|
||||||
# as a test let's make sure that we get equal data frames
|
|
||||||
test = pd.read_table(test_file)
|
|
||||||
baseline = pd.read_table(baseline_file)
|
|
||||||
assert_frame_equal(test,baseline)
|
|
||||||
|
|
||||||
class Test_Wikipedia(Test_Wikiq):
|
|
||||||
def setUp(self):
|
|
||||||
print(os.path.abspath("."))
|
|
||||||
self.mkoutputdir()
|
|
||||||
self.wiki = 'ikwiki-20180301-pages-meta-history'
|
|
||||||
self.setuptoutputfiles(suffix="xml.bz2")
|
|
||||||
self.base_call = "../bin/wikiq {0} -o {1}"
|
|
||||||
|
|
||||||
def test_WP_url_encode(self):
|
|
||||||
test_filename = "url-encode_" + self.wikiq_out_name
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
call = call + " --url-encode"
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
|
|
||||||
class Test_Basic(Test_Wikiq):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.mkoutputdir()
|
|
||||||
self.wiki="sailormoon"
|
|
||||||
self.setuptoutputfiles()
|
|
||||||
self.base_call = "../bin/wikiq {0} -o {1}"
|
|
||||||
|
|
||||||
def test_noargs(self):
|
|
||||||
test_filename = "noargs_" + self.wikiq_out_name
|
|
||||||
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
print(call)
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
def test_collapse_user(self):
|
|
||||||
test_filename = "collapse-user_" + self.wikiq_out_name
|
|
||||||
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
call = call + " --collapse-user"
|
|
||||||
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
def test_pwr_legacy(self):
|
|
||||||
test_filename = "persistence_legacy_" + self.wikiq_out_name
|
|
||||||
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
call = call + " --persistence-legacy"
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
def test_pwr(self):
|
|
||||||
test_filename = "persistence_" + self.wikiq_out_name
|
|
||||||
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
call = call + " --persistence"
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
def test_url_encode(self):
|
|
||||||
test_filename = "url-encode_" + self.wikiq_out_name
|
|
||||||
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
call = call + " --url-encode"
|
|
||||||
self.run_and_check_output(call, test_filename)
|
|
||||||
|
|
||||||
class Test_Malformed(Test_Wikiq):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.mkoutputdir()
|
|
||||||
self.wiki="twinpeaks"
|
|
||||||
self.setuptoutputfiles()
|
|
||||||
self.base_call = "../bin/wikiq {0} -o {1}"
|
|
||||||
|
|
||||||
def test_malformed_noargs(self):
|
|
||||||
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
||||||
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
|
||||||
proc.wait()
|
|
||||||
outs, errs = proc.communicate()
|
|
||||||
errlines = str(errs).split("\\n")
|
|
||||||
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
|
|
||||||
|
|
||||||
class Test_Stdout(Test_Wikiq):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.mkoutputdir()
|
|
||||||
self.wiki = 'sailormoon'
|
|
||||||
self.setuptoutputfiles()
|
|
||||||
|
|
||||||
|
|
||||||
def test_noargs(self):
|
|
||||||
self.base_call = ["../bin/wikiq", self.input_file, "--stdout"]
|
|
||||||
proc = subprocess.Popen(self.base_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
|
|
||||||
outs = proc.stdout
|
|
||||||
test_file = "noargs_" + self.wikiq_out_name
|
|
||||||
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
|
||||||
test = pd.read_table(outs)
|
|
||||||
baseline = pd.read_table(baseline_file)
|
|
||||||
assert_frame_equal(test,baseline)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
||||||
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
164
wikiq_util.py → wikiq
Normal file → Executable file
164
wikiq_util.py → wikiq
Normal file → Executable file
@@ -1,25 +1,34 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# original wikiq headers are: title articleid revid date_time anon
|
||||||
|
# editor editor_id minor text_size text_entropy text_md5 reversion
|
||||||
|
# additions_size deletions_size
|
||||||
|
|
||||||
|
import argparse
|
||||||
import sys
|
import sys
|
||||||
|
import os, os.path
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
from deltas.tokenizers import wikitext_split
|
|
||||||
from mwxml import Dump
|
from mwxml import Dump
|
||||||
|
|
||||||
|
from deltas.tokenizers import wikitext_split
|
||||||
import mwpersistence
|
import mwpersistence
|
||||||
import mwreverts
|
import mwreverts
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
from urllib.parse import unquote
|
|
||||||
from deltas import SequenceMatcher
|
|
||||||
|
|
||||||
TO_ENCODE = ('title', 'editor')
|
TO_ENCODE = ('title', 'editor')
|
||||||
PERSISTENCE_RADIUS = 7
|
PERSISTENCE_RADIUS=7
|
||||||
|
from deltas import SequenceMatcher
|
||||||
|
from deltas import SegmentMatcher
|
||||||
|
|
||||||
def try_unquote(obj):
|
class PersistMethod:
|
||||||
if type(obj) is str:
|
none = 0
|
||||||
obj = unquote(obj)
|
sequence = 1
|
||||||
return obj.strip('\"')
|
segment = 2
|
||||||
else:
|
legacy = 3
|
||||||
return
|
|
||||||
|
|
||||||
def calculate_persistence(tokens_added):
|
def calculate_persistence(tokens_added):
|
||||||
return(sum([(len(x.revisions)-1) for x in tokens_added]),
|
return(sum([(len(x.revisions)-1) for x in tokens_added]),
|
||||||
@@ -46,7 +55,6 @@ class WikiqIterator():
|
|||||||
def __next__(self):
|
def __next__(self):
|
||||||
return next(self._pages)
|
return next(self._pages)
|
||||||
|
|
||||||
|
|
||||||
class WikiqPage():
|
class WikiqPage():
|
||||||
__slots__ = ('id', 'title', 'namespace', 'redirect',
|
__slots__ = ('id', 'title', 'namespace', 'redirect',
|
||||||
'restrictions', 'mwpage', '__revisions',
|
'restrictions', 'mwpage', '__revisions',
|
||||||
@@ -55,6 +63,11 @@ class WikiqPage():
|
|||||||
def __init__(self, page, namespace_map, collapse_user=False):
|
def __init__(self, page, namespace_map, collapse_user=False):
|
||||||
self.id = page.id
|
self.id = page.id
|
||||||
self.namespace = page.namespace
|
self.namespace = page.namespace
|
||||||
|
# following mwxml, we assume namespace 0 in cases where
|
||||||
|
# page.namespace is inconsistent with namespace_map
|
||||||
|
if page.namespace not in namespace_map:
|
||||||
|
self.title = page.title
|
||||||
|
page.namespace = 0
|
||||||
if page.namespace != 0:
|
if page.namespace != 0:
|
||||||
self.title = ':'.join([namespace_map[page.namespace], page.title])
|
self.title = ':'.join([namespace_map[page.namespace], page.title])
|
||||||
else:
|
else:
|
||||||
@@ -115,16 +128,23 @@ class WikiqPage():
|
|||||||
|
|
||||||
class WikiqParser():
|
class WikiqParser():
|
||||||
|
|
||||||
def __init__(self, input_file, output_file, collapse_user=False, persist=False, urlencode=False, persist_legacy=False):
|
def __init__(self, input_file, output_file, collapse_user=False, persist=None, urlencode=False, namespaces = None):
|
||||||
|
"""
|
||||||
|
Parameters:
|
||||||
|
persist : what persistence method to use. Takes a PersistMethod value
|
||||||
|
"""
|
||||||
|
|
||||||
self.input_file = input_file
|
self.input_file = input_file
|
||||||
self.output_file = output_file
|
self.output_file = output_file
|
||||||
self.collapse_user = collapse_user
|
self.collapse_user = collapse_user
|
||||||
self.persist = persist
|
self.persist = persist
|
||||||
self.persist_legacy = persist_legacy
|
|
||||||
self.printed_header = False
|
self.printed_header = False
|
||||||
self.namespaces = []
|
self.namespaces = []
|
||||||
self.urlencode = urlencode
|
self.urlencode = urlencode
|
||||||
|
if namespaces is not None:
|
||||||
|
self.namespace_filter = set(namespaces)
|
||||||
|
else:
|
||||||
|
self.namespace_filter = None
|
||||||
|
|
||||||
def __get_namespace_from_title(self, title):
|
def __get_namespace_from_title(self, title):
|
||||||
default_ns = None
|
default_ns = None
|
||||||
@@ -160,15 +180,27 @@ class WikiqParser():
|
|||||||
|
|
||||||
# Iterate through pages
|
# Iterate through pages
|
||||||
for page in dump:
|
for page in dump:
|
||||||
|
namespace = page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title)
|
||||||
|
|
||||||
|
# skip namespaces not in the filter
|
||||||
|
if self.namespace_filter is not None:
|
||||||
|
if namespace not in self.namespace_filter:
|
||||||
|
continue
|
||||||
|
|
||||||
rev_detector = mwreverts.Detector()
|
rev_detector = mwreverts.Detector()
|
||||||
|
|
||||||
if self.persist or self.persist_legacy:
|
if self.persist != PersistMethod.none:
|
||||||
window = deque(maxlen=PERSISTENCE_RADIUS)
|
window = deque(maxlen=PERSISTENCE_RADIUS)
|
||||||
|
|
||||||
if not self.persist_legacy:
|
if self.persist == PersistMethod.sequence:
|
||||||
state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
|
state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
|
||||||
revert_radius=PERSISTENCE_RADIUS)
|
revert_radius=PERSISTENCE_RADIUS)
|
||||||
|
|
||||||
|
elif self.persist == PersistMethod.segment:
|
||||||
|
state = mwpersistence.DiffState(SegmentMatcher(tokenizer = wikitext_split),
|
||||||
|
revert_radius=PERSISTENCE_RADIUS)
|
||||||
|
|
||||||
|
# self.persist == PersistMethod.legacy
|
||||||
else:
|
else:
|
||||||
from mw.lib import persistence
|
from mw.lib import persistence
|
||||||
state = persistence.State()
|
state = persistence.State()
|
||||||
@@ -181,7 +213,7 @@ class WikiqParser():
|
|||||||
'articleid' : page.id,
|
'articleid' : page.id,
|
||||||
'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
|
'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
|
||||||
'title' : '"' + page.title + '"',
|
'title' : '"' + page.title + '"',
|
||||||
'namespace' : page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title),
|
'namespace' : namespace,
|
||||||
'deleted' : "TRUE" if rev.deleted.text else "FALSE" }
|
'deleted' : "TRUE" if rev.deleted.text else "FALSE" }
|
||||||
|
|
||||||
# if revisions are deleted, /many/ things will be missing
|
# if revisions are deleted, /many/ things will be missing
|
||||||
@@ -241,14 +273,13 @@ class WikiqParser():
|
|||||||
if self.collapse_user:
|
if self.collapse_user:
|
||||||
rev_data['collapsed_revs'] = rev.collapsed_revs
|
rev_data['collapsed_revs'] = rev.collapsed_revs
|
||||||
|
|
||||||
if self.persist or self.persist_legacy:
|
if self.persist != PersistMethod.none:
|
||||||
if rev.deleted.text:
|
if rev.deleted.text:
|
||||||
|
|
||||||
for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]:
|
for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]:
|
||||||
old_rev_data[k] = None
|
old_rev_data[k] = None
|
||||||
else:
|
else:
|
||||||
|
|
||||||
if not self.persist_legacy:
|
if self.persist != PersistMethod.legacy:
|
||||||
_, tokens_added, tokens_removed = state.update(rev.text, rev.id)
|
_, tokens_added, tokens_removed = state.update(rev.text, rev.id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -273,7 +304,7 @@ class WikiqParser():
|
|||||||
|
|
||||||
rev_count += 1
|
rev_count += 1
|
||||||
|
|
||||||
if self.persist or self.persist_legacy:
|
if self.persist != PersistMethod.none:
|
||||||
# print out metadata for the last RADIUS revisions
|
# print out metadata for the last RADIUS revisions
|
||||||
for i, item in enumerate(window):
|
for i, item in enumerate(window):
|
||||||
# if the window was full, we've already printed item 0
|
# if the window was full, we've already printed item 0
|
||||||
@@ -332,22 +363,87 @@ def open_output_file(input_filename):
|
|||||||
|
|
||||||
return output_file
|
return output_file
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
|
||||||
|
|
||||||
class IPCheck(object):
|
# arguments for the input direction
|
||||||
|
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
|
||||||
|
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
|
||||||
|
|
||||||
# IP address regexes taken from https://gist.github.com/mnordhoff/2213179
|
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
|
||||||
ipv4_address = re.compile('^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$')
|
help="Directory for output files.")
|
||||||
|
|
||||||
ipv6_address_or_addrz = re.compile('^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)(?:%25(?:[A-Za-z0-9\\-._~]|%[0-9A-Fa-f]{2})+)?$')
|
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
|
||||||
|
help="Write output to standard out (do not create dump file)")
|
||||||
|
|
||||||
@staticmethod
|
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
|
||||||
def is_ip(username):
|
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
|
||||||
if not type(username) is str:
|
|
||||||
return False
|
|
||||||
|
|
||||||
'''Check if a username is an ip (v4 or v6) address. We use this as
|
parser.add_argument('-p', '--persistence', dest="persist", default=None, const='', type=str, choices = ['','segment','sequence','legacy'], nargs='?',
|
||||||
a marker of whether the user is anonymous.'''
|
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure. This may by slow. Use -p=segment for advanced persistence calculation method that is robust to content moves. This might be very slow. Use -p=legacy for legacy behavior.")
|
||||||
if IPCheck.ipv4_address.match(username) or IPCheck.ipv6_address_or_addrz.match(username):
|
|
||||||
return True
|
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
|
||||||
|
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
|
||||||
|
|
||||||
|
parser.add_argument('-n', '--namespace-include', dest="namespace_filter", type=int, action='append',
|
||||||
|
help="Id number of namspace to include.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# set persistence method
|
||||||
|
|
||||||
|
if args.persist is None:
|
||||||
|
persist = PersistMethod.none
|
||||||
|
elif args.persist == "segment":
|
||||||
|
persist = PersistMethod.segment
|
||||||
|
elif args.persist == "legacy":
|
||||||
|
persist = PersistMethod.legacy
|
||||||
|
else:
|
||||||
|
persist = PersistMethod.sequence
|
||||||
|
|
||||||
|
if args.namespace_filter is not None:
|
||||||
|
namespaces = args.namespace_filter
|
||||||
|
else:
|
||||||
|
namespaces = None
|
||||||
|
|
||||||
|
if len(args.dumpfiles) > 0:
|
||||||
|
for filename in args.dumpfiles:
|
||||||
|
input_file = open_input_file(filename)
|
||||||
|
|
||||||
|
# open directory for output
|
||||||
|
if args.output_dir:
|
||||||
|
output_dir = args.output_dir[0]
|
||||||
else:
|
else:
|
||||||
return False
|
output_dir = "."
|
||||||
|
|
||||||
|
print("Processing file: %s" % filename, file=sys.stderr)
|
||||||
|
|
||||||
|
if args.stdout:
|
||||||
|
output_file = sys.stdout
|
||||||
|
else:
|
||||||
|
filename = os.path.join(output_dir, os.path.basename(filename))
|
||||||
|
output_file = open_output_file(filename)
|
||||||
|
|
||||||
|
wikiq = WikiqParser(input_file, output_file,
|
||||||
|
collapse_user=args.collapse_user,
|
||||||
|
persist=persist,
|
||||||
|
urlencode=args.urlencode,
|
||||||
|
namespaces = namespaces)
|
||||||
|
|
||||||
|
wikiq.process()
|
||||||
|
|
||||||
|
# close things
|
||||||
|
input_file.close()
|
||||||
|
output_file.close()
|
||||||
|
else:
|
||||||
|
wikiq = WikiqParser(sys.stdin, sys.stdout,
|
||||||
|
collapse_user=args.collapse_user,
|
||||||
|
persist=persist,
|
||||||
|
persist_legacy=args.persist_legacy,
|
||||||
|
urlencode=args.urlencode,
|
||||||
|
namespaces = namespaces)
|
||||||
|
wikiq.process()
|
||||||
|
|
||||||
|
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
|
||||||
|
# stop_words = stop_words.split(",")
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
spark-submit --master spark://n0649:18899 wikiq_users_spark.py --output-format parquet -i "/com/output/wikiq-enwiki-persist-sequence-20180301/enwiki/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 --schema-opt persistence+collapse
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
spark-submit benchmark_spark.py --output-format csv -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o "./out.tsv" --num-partitions 2
|
|
||||||
@@ -1,163 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Builds a user level dataset. Requires a functional spark installation.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
# add pyspark to your python path e.g.
|
|
||||||
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
|
|
||||||
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
|
|
||||||
from pyspark import SparkConf
|
|
||||||
from pyspark.sql import SparkSession, SQLContext
|
|
||||||
from pyspark.sql import Window
|
|
||||||
import pyspark.sql.functions as f
|
|
||||||
from pyspark.sql import types
|
|
||||||
import argparse
|
|
||||||
import glob
|
|
||||||
from os import mkdir
|
|
||||||
from os import path
|
|
||||||
from wikiq_util import PERSISTENCE_RADIUS
|
|
||||||
#read a table
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
|
|
||||||
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
|
|
||||||
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
|
|
||||||
# parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
|
|
||||||
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
|
|
||||||
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
|
|
||||||
parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
|
|
||||||
parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
|
|
||||||
# parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
|
|
||||||
args = parser.parse_args()
|
|
||||||
return(args)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
args = parse_args()
|
|
||||||
conf = SparkConf().setAppName("Wiki Users Spark")
|
|
||||||
spark = SparkSession.builder.getOrCreate()
|
|
||||||
|
|
||||||
# test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
|
|
||||||
files = glob.glob(args.input_file)
|
|
||||||
files = [path.abspath(p) for p in files]
|
|
||||||
|
|
||||||
read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
|
|
||||||
read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
|
|
||||||
|
|
||||||
# going to have to do some coercing of the schema
|
|
||||||
|
|
||||||
# build a schema
|
|
||||||
struct = types.StructType().add("anon",types.StringType(),True)
|
|
||||||
struct = struct.add("articleid",types.LongType(),True)
|
|
||||||
|
|
||||||
if read_collapse is True:
|
|
||||||
struct = struct.add("collapsed_revs", types.IntegerType(), True)
|
|
||||||
|
|
||||||
struct = struct.add("date_time",types.TimestampType(), True)
|
|
||||||
struct = struct.add("deleted",types.BooleanType(), True)
|
|
||||||
struct = struct.add("editor",types.StringType(),True)
|
|
||||||
struct = struct.add("editor_id",types.LongType(), True)
|
|
||||||
struct = struct.add("minor", types.BooleanType(), True)
|
|
||||||
struct = struct.add("namespace", types.LongType(), True)
|
|
||||||
struct = struct.add("revert", types.BooleanType(), True)
|
|
||||||
struct = struct.add("reverteds", types.StringType(), True)
|
|
||||||
struct = struct.add("revid", types.LongType(), True)
|
|
||||||
struct = struct.add("sha1", types.StringType(), True)
|
|
||||||
struct = struct.add("text_chars", types.LongType(), True)
|
|
||||||
struct = struct.add("title",types.StringType(), True)
|
|
||||||
|
|
||||||
if read_persistence is True:
|
|
||||||
struct = struct.add("token_revs", types.IntegerType(),True)
|
|
||||||
struct = struct.add("tokens_added", types.IntegerType(),True)
|
|
||||||
struct = struct.add("tokens_removed", types.IntegerType(),True)
|
|
||||||
struct = struct.add("tokens_window", types.IntegerType(),True)
|
|
||||||
|
|
||||||
|
|
||||||
reader = spark.read
|
|
||||||
|
|
||||||
df = reader.csv(files,
|
|
||||||
sep='\t',
|
|
||||||
inferSchema=False,
|
|
||||||
header=True,
|
|
||||||
mode="PERMISSIVE",
|
|
||||||
schema = struct)
|
|
||||||
df = df.repartition(args.num_partitions)
|
|
||||||
|
|
||||||
# replace na editor ids
|
|
||||||
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
|
|
||||||
|
|
||||||
# sort by datetime
|
|
||||||
df = df.orderBy(df.date_time.asc())
|
|
||||||
|
|
||||||
# create our window_specs
|
|
||||||
ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
|
|
||||||
art_win = Window.orderBy("date_time").partitionBy("articleid")
|
|
||||||
|
|
||||||
# assign which edit reverted what edit
|
|
||||||
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
|
|
||||||
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
|
|
||||||
reverteds_df = reverteds_df.drop("reverteds")
|
|
||||||
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
|
|
||||||
reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
|
|
||||||
|
|
||||||
reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
|
|
||||||
|
|
||||||
|
|
||||||
df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
|
|
||||||
df = df.drop("reverted_id")
|
|
||||||
del(reverteds_df_explode)
|
|
||||||
|
|
||||||
reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
|
|
||||||
df = df.join(reverteds_df, on= ["revid"], how='left_outer')
|
|
||||||
|
|
||||||
del(reverteds_df)
|
|
||||||
|
|
||||||
# count reverts
|
|
||||||
reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
|
|
||||||
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
|
|
||||||
|
|
||||||
# articles total reverts
|
|
||||||
reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
|
|
||||||
|
|
||||||
# some kind of bad work around a bug
|
|
||||||
# see https://issues.apache.org/jira/browse/SPARK-14948
|
|
||||||
reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
|
|
||||||
df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
|
|
||||||
df = df.drop("r_revid")
|
|
||||||
del(reverts_df)
|
|
||||||
|
|
||||||
# count edits
|
|
||||||
df = df.withColumn('year', f.year(df.date_time))
|
|
||||||
df = df.withColumn('month',f.month(df.date_time))
|
|
||||||
|
|
||||||
if not read_collapse:
|
|
||||||
df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
|
|
||||||
df = df.withColumn('article_nth_edit', f.rank().over(art_win))
|
|
||||||
else:
|
|
||||||
df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
|
|
||||||
df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
|
|
||||||
df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
|
|
||||||
df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
|
|
||||||
|
|
||||||
# total editor's token_revs
|
|
||||||
if read_persistence:
|
|
||||||
df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
|
|
||||||
df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
|
|
||||||
df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
|
|
||||||
df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
|
|
||||||
df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
|
|
||||||
df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
|
|
||||||
df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
|
|
||||||
|
|
||||||
# output
|
|
||||||
if not path.exists(args.output_dir):
|
|
||||||
mkdir(args.output_dir)
|
|
||||||
if args.output_format == "csv" or args.output_format == "tsv":
|
|
||||||
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
|
|
||||||
# format == "parquet"
|
|
||||||
else:
|
|
||||||
df.write.parquet(args.output_dir, mode='overwrite')
|
|
||||||
|
|
||||||
# for writing to csv we need to urlencode
|
|
||||||
Reference in New Issue
Block a user