16 Commits

Author SHA1 Message Date
df18d6e280 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:03:07 -07:00
3af71f03e0 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:02:48 -07:00
1d5a9b53b8 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:02:05 -07:00
cc551eef6e Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:01:52 -07:00
4c77c0f12e Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-31 16:01:07 -07:00
915c864ee5 add more variables and support for persistence 2018-08-31 16:00:47 -07:00
3d12865c4e add more variables and support for persistence 2018-08-31 15:57:48 -07:00
bc1f5428f0 add spark program for running group by users 2018-08-31 20:40:22 +00:00
ff689c71dd Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-14 14:44:37 -07:00
39b4e5698f Use dask to parallelize and scale user level datasets 2018-08-14 14:44:21 -07:00
daf1851cbb Use dask to parallelize and scale user level datasets 2018-08-14 14:37:03 -07:00
418fa020e5 Merge branch 'user_level_wikiq' of code.communitydata.cc:mediawiki_dump_tools into user_level_wikiq 2018-08-12 21:34:12 -07:00
311810a36c refactor wikiq to seperate script from classes and functions. Code reuse in testing. 2018-08-12 21:33:19 -07:00
118b8b1722 move tests to test folder 2018-08-12 18:10:46 -07:00
f69e8b44a6 move tests to test folder 2018-08-12 18:05:59 -07:00
bf396ad366 Prefix page titles with namespace names. 2018-07-09 22:11:17 -07:00
25 changed files with 3500521 additions and 100 deletions

3
.gitmodules vendored
View File

@@ -1,3 +0,0 @@
[submodule "Mediawiki-Utilities"]
path = Mediawiki-Utilities
url = https://github.com/halfak/Mediawiki-Utilities.git

View File

@@ -7,3 +7,7 @@ submodule like::
git submodule init git submodule init
git submodule update git submodule update
Wikimedia dumps are usually in a compressed format such as 7z (most common), gz, or bz2. Wikiq uses your computer's compression software to read these files. Therefore wikiq depends on
`7za`, `gzcat`, and `zcat`.

83
bin/wikiq Executable file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import argparse
import sys
import os
sys.path.append("..")
from wikiq_util import calculate_persistence
from wikiq_util import WikiqIterator
from wikiq_util import WikiqPage
from wikiq_util import WikiqParser
from wikiq_util import open_input_file
from wikiq_util import open_output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
# arguments for the input direction
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
help="Directory for output files.")
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)")
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true",
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.")
parser.add_argument('-p', '--persistence', dest="persist", action="store_true",
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.")
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true",
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
parser.add_argument('--persistence-legacy', dest="persist_legacy", action="store_true",
help="Legacy behavior for persistence calculation. Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
args = parser.parse_args()
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
else:
output_dir = "."
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=args.persist,
persist_legacy=args.persist_legacy,
urlencode=args.urlencode)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")

144
bin/wikiq_users Executable file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
import dask.dataframe as dd
import pandas as pd
import csv
import re
import os
import argparse
import fcntl
import sys
import errno
import time
import numpy as np
import struct
from urllib.parse import unquote
sys.path.append("..")
from hashlib import sha256
from wikiq_util import IPCheck
from wikiq_util import TO_ENCODE
from wikiq_util import try_unquote
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--no-cluster', help="disable dask.distributed", action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
args = parser.parse_args()
return(args)
# This script does not do some of the things that might be useful that Jeremy's script did.
# We don't remove bots
# We don't exit on Tech Wiki
# We don't accept an EDITOR_IGNORE_LIST
# We don't have a username-userid mapping file
# We don't remove anonymous editors (though we do indicate IP edits as anon.
# We don't remove any rows, including for malformed data
if __name__ == "__main__":
args = parse_args()
id_dict = {}
if not args.no_cluster:
# set up dask distributed
from dask.distributed import Client, LocalCluster
import multiprocessing as mp
cluster = LocalCluster(n_workers = mp.cpu_count(), processes=True)
client = Client(cluster)
input_file = args.input_file
d = dd.read_table(input_file, dtype={"anon":np.bool,
"articleid":int,
"deleted":bool,
"editor":str,
"minor":bool,
"namespace":np.int32,
"revert":bool,
"reverteds":str,
"revid":int,
"sha1":str,
"title":str},
true_values=["TRUE"],
false_values=["FALSE"],
parse_dates=["date_time"],
infer_datetime_format=True
)
if args.wiki is None:
wiki = re.match('(.*)\.tsv', os.path.split(args.input_file)[1]).group(1)
else:
wiki = args.wiki
d['wiki'] = wiki
for col in TO_ENCODE:
d[col+"old"] = d[col]
d[col] = d[col].apply(try_unquote, meta=(col,str))
d['IPAnon'] = d['editor'].apply(IPCheck.is_ip, meta=('editor',str))
d['anon'] = (d['anon'] == True) | d['IPAnon']
d = d.drop('IPAnon',axis=1)
d['timestamp'] = (d['date_time'] - d['date_time'].min())/np.timedelta64(1,'s')
d['timestamp'] = d['timestamp'].astype(int)
# create a new unique identifier by hashing the editor name or editor ip
# first sort by editor
d = d.set_index(d["date_time"])
d = d.map_partitions(lambda x: x.sort_index())
d['editor_sha'] = d['editor'].apply(lambda x:
sha256(x.encode()).hexdigest()
if x is not None
else None,
meta=("editor_sha",str)
)
editor_groups = d.groupby('editor')
d['editor_nth_edit'] = editor_groups.cumcount()
d = editor_groups.apply(lambda df: df.assign(tminus_editor_edit = df.date_time.diff(1)))
editor_wiki_groups = d.groupby(['editor_sha','wiki'])
d['editor_nth_wiki_edit'] = editor_wiki_groups.cumcount()
d = editor_wiki_groups.apply(lambda df:
df.assign(
tminus_editor_wiki_edit=df.date_time.diff(1)
))
editor_namespace_groups = d.groupby(['editor_sha','wiki','namespace'])
d['editor_nth_namespace_edit'] = editor_wiki_groups.cumcount()
d = editor_namespace_groups.apply(lambda df:
df.assign(
tminus_namespace_wiki_edit=df.date_time.diff(1)
))
editor_article_groups = d.groupby(['editor_sha','wiki','articleid'])
d['editor_nth_article_edit'] = editor_article_groups.cumcount()
d = editor_article_groups.apply(lambda df:
df.assign(tminus_editor_article_edit=df.date_time.diff(1)))
d = d.persist()
if not os.path.exists(args.output_dr):
os.mkdir(args.output_dir
)
if args.output_format == "csv":
d_csv = d
for col in TO_ENCODE:
d_csv = d_csv.drop(col,axis=1)
d_csv[col] = d_csv[col+'old']
d.to_csv()
else:
for col in TO_ENCODE:
d = d.drop(col + 'old', axis=1)
d.to_parquet("test_parquet/",object_encoding={"editor":"utf8","reverteds":"utf8","sha1":"utf8","title":"utf8","wiki":"utf8","namespace":"utf8","editor_sha":"utf8","revert":"bool"})
# for writing to csv we need to urlencode
if __name__ == '__main__':
main()

3
install.dask.sh Normal file
View File

@@ -0,0 +1,3 @@
#!/usr/bin/env bash
pip3 install --user cloudpickle toolz dask partd fastparquet pyarrow

1
mw
View File

@@ -1 +0,0 @@
Mediawiki-Utilities/mw

144
tests/Wikiq_Test.py Normal file
View File

@@ -0,0 +1,144 @@
import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.util.testing import assert_frame_equal
from io import StringIO
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to sdtout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikiq(unittest.TestCase):
def mkoutputdir(self):
if not os.path.exists("test_output"):
os.mkdir("test_output")
def setuptoutputfiles(self, suffix="xml.7z"):
self.wikiq_out_name = self.wiki + ".tsv"
self.test_output_dir = os.path.join(".", "test_output")
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
self.infile = "{0}.{1}".format(self.wiki,suffix)
self.input_dir = "dumps"
self.input_file = os.path.join(".", self.input_dir,self.infile)
self.baseline_output_dir = "baseline_output"
def run_and_check_output(self, call, test_filename):
test_file = os.path.join(self.test_output_dir, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
proc.wait()
copyfile(self.call_output, test_file)
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
class Test_Wikipedia(Test_Wikiq):
def setUp(self):
print(os.path.abspath("."))
self.mkoutputdir()
self.wiki = 'ikwiki-20180301-pages-meta-history'
self.setuptoutputfiles(suffix="xml.bz2")
self.base_call = "../bin/wikiq {0} -o {1}"
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Basic(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="sailormoon"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
print(call)
self.run_and_check_output(call, test_filename)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --collapse-user"
self.run_and_check_output(call, test_filename)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence-legacy"
self.run_and_check_output(call, test_filename)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --persistence"
self.run_and_check_output(call, test_filename)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
call = self.base_call.format(self.input_file, self.test_output_dir)
call = call + " --url-encode"
self.run_and_check_output(call, test_filename)
class Test_Malformed(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki="twinpeaks"
self.setuptoutputfiles()
self.base_call = "../bin/wikiq {0} -o {1}"
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, self.test_output_dir)
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
proc.wait()
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(Test_Wikiq):
def setUp(self):
self.mkoutputdir()
self.wiki = 'sailormoon'
self.setuptoutputfiles()
def test_noargs(self):
self.base_call = ["../bin/wikiq", self.input_file, "--stdout"]
proc = subprocess.Popen(self.base_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
outs = proc.stdout
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
test = pd.read_table(outs)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test,baseline)
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

1368
tests/dumps/twinpeaks.xml Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

5104
tests/dumps/twinpeaks.xml~ Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,2 @@
#!/usr/bin/env bash
spark-submit --master spark://n0649:18899 wikiq_users_spark.py --output-format parquet -i "/com/output/wikiq-enwiki-persist-sequence-20180301/enwiki/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-parquet/" --num-partitions 500 --schema-opt persistence+collapse

View File

@@ -0,0 +1,2 @@
#!/usr/bin/env bash
spark-submit benchmark_spark.py --output-format csv -i "../mediawiki_dump_tools/tests/tsvs/*.tsv" -o "./out.tsv" --num-partitions 2

163
wikiq_users/wikiq_users_spark.py Executable file
View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Builds a user level dataset. Requires a functional spark installation.
"""
import sys
# add pyspark to your python path e.g.
#sys.path.append("/home/nathante/sparkstuff/spark/python/pyspark")
#sys.path.append("/home/nathante/sparkstuff/spark/python/")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window
import pyspark.sql.functions as f
from pyspark.sql import types
import argparse
import glob
from os import mkdir
from os import path
from wikiq_util import PERSISTENCE_RADIUS
#read a table
def parse_args():
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
# parser.add_argument('--wiki', help="Wiki name. If not provided, we will guess based on the filename.", type=str)
parser.add_argument('--urlencode', help="whether we need to decode urls",action="store_true")
parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
parser.add_argument('--schema-opt', help = 'Options for the input schema.', choices = ["basic","persistence","collapse","persistence+collapse"])
# parser.add_argument('--nodes', help = "how many hyak nodes to use", default=0, type=int)
args = parser.parse_args()
return(args)
if __name__ == "__main__":
args = parse_args()
conf = SparkConf().setAppName("Wiki Users Spark")
spark = SparkSession.builder.getOrCreate()
# test file with persistence: "../tests/tsvs/persistence_sailormoon.tsv"
files = glob.glob(args.input_file)
files = [path.abspath(p) for p in files]
read_persistence = args.schema_opt in ["persistence", "persistence+collapse"]
read_collapse = args.schema_opt in ["collapse", "persistence+collapse"]
# going to have to do some coercing of the schema
# build a schema
struct = types.StructType().add("anon",types.StringType(),True)
struct = struct.add("articleid",types.LongType(),True)
if read_collapse is True:
struct = struct.add("collapsed_revs", types.IntegerType(), True)
struct = struct.add("date_time",types.TimestampType(), True)
struct = struct.add("deleted",types.BooleanType(), True)
struct = struct.add("editor",types.StringType(),True)
struct = struct.add("editor_id",types.LongType(), True)
struct = struct.add("minor", types.BooleanType(), True)
struct = struct.add("namespace", types.LongType(), True)
struct = struct.add("revert", types.BooleanType(), True)
struct = struct.add("reverteds", types.StringType(), True)
struct = struct.add("revid", types.LongType(), True)
struct = struct.add("sha1", types.StringType(), True)
struct = struct.add("text_chars", types.LongType(), True)
struct = struct.add("title",types.StringType(), True)
if read_persistence is True:
struct = struct.add("token_revs", types.IntegerType(),True)
struct = struct.add("tokens_added", types.IntegerType(),True)
struct = struct.add("tokens_removed", types.IntegerType(),True)
struct = struct.add("tokens_window", types.IntegerType(),True)
reader = spark.read
df = reader.csv(files,
sep='\t',
inferSchema=False,
header=True,
mode="PERMISSIVE",
schema = struct)
df = df.repartition(args.num_partitions)
# replace na editor ids
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
# sort by datetime
df = df.orderBy(df.date_time.asc())
# create our window_specs
ed_win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
art_win = Window.orderBy("date_time").partitionBy("articleid")
# assign which edit reverted what edit
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds','editor_id_or_ip','date_time'])
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
reverteds_df = reverteds_df.drop("reverteds")
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
reverteds_df = reverteds_df.withColumn("editor_nth_revert_action", f.rank().over(ed_win))
reverteds_df_explode = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id'))
df = df.join(reverteds_df_explode, df.revid == reverteds_df_explode.reverted_id, how='left_outer')
df = df.drop("reverted_id")
del(reverteds_df_explode)
reverteds_df = reverteds_df.select("revid","editor_nth_revert_action")
df = df.join(reverteds_df, on= ["revid"], how='left_outer')
del(reverteds_df)
# count reverts
reverts_df = df.filter(df.revert==True).select('revid','articleid','editor_id_or_ip','date_time','revert')
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(ed_win))
# articles total reverts
reverts_df = reverts_df.withColumn('article_nth_revert',f.rank().over(art_win))
# some kind of bad work around a bug
# see https://issues.apache.org/jira/browse/SPARK-14948
reverts_df = reverts_df.select(reverts_df.revid.alias("r_revid"),'editor_nth_revert','article_nth_revert')
df = df.join(reverts_df, df.revid == reverts_df.r_revid, how='left_outer')
df = df.drop("r_revid")
del(reverts_df)
# count edits
df = df.withColumn('year', f.year(df.date_time))
df = df.withColumn('month',f.month(df.date_time))
if not read_collapse:
df = df.withColumn('editor_nth_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_edit', f.rank().over(art_win))
else:
df = df.withColumn('editor_nth_edit', f.sum("collapsed_revs").over(ed_win))
df = df.withColumn('article_nth_edit', f.sum("collapsed_revs").over(art_win))
df = df.withColumn('editor_nth_collapsed_edit', f.rank().over(ed_win))
df = df.withColumn('article_nth_collapsed_edit', f.rank().over(art_win))
# total editor's token_revs
if read_persistence:
df = df.withColumn("token_revs_upper", df.token_revs + df.tokens_added * (PERSISTENCE_RADIUS - df.tokens_window - 1))
df = df.withColumn('editor_cum_token_revs_lower', f.sum("token_revs").over(ed_win))
df = df.withColumn('editor_cum_token_revs_upper', f.sum("token_revs_upper").over(ed_win))
df = df.withColumn('article_cum_token_revs_lower', f.sum("token_revs").over(art_win))
df = df.withColumn('article_cum_token_revs_upper', f.sum("token_revs_upper").over(art_win))
df = df.withColumn('editor_cum_tokens_added', f.sum("tokens_added").over(ed_win))
df = df.withColumn('article_cum_tokens_removed', f.sum("tokens_removed").over(art_win))
# output
if not path.exists(args.output_dir):
mkdir(args.output_dir)
if args.output_format == "csv" or args.output_format == "tsv":
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
# format == "parquet"
else:
df.write.parquet(args.output_dir, mode='overwrite')
# for writing to csv we need to urlencode

183
wikiq → wikiq_util.py Executable file → Normal file
View File

@@ -1,24 +1,25 @@
#!/usr/bin/env python3
# original wikiq headers are: title articleid revid date_time anon
# editor editor_id minor text_size text_entropy text_md5 reversion
# additions_size deletions_size
import argparse
import sys import sys
import os, os.path
import re import re
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from deltas.tokenizers import wikitext_split
from mw.xml_dump import Iterator from mwxml import Dump
from mw.lib import persistence import mwpersistence
from mw.lib import reverts import mwreverts
from urllib.parse import quote from urllib.parse import quote
from urllib.parse import unquote
from deltas import SequenceMatcher
TO_ENCODE = ('title', 'editor') TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS=7 PERSISTENCE_RADIUS = 7
def try_unquote(obj):
if type(obj) is str:
obj = unquote(obj)
return obj.strip('\"')
else:
return
def calculate_persistence(tokens_added): def calculate_persistence(tokens_added):
return(sum([(len(x.revisions)-1) for x in tokens_added]), return(sum([(len(x.revisions)-1) for x in tokens_added]),
@@ -28,12 +29,16 @@ class WikiqIterator():
def __init__(self, fh, collapse_user=False): def __init__(self, fh, collapse_user=False):
self.fh = fh self.fh = fh
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.mwiterator = Iterator.from_file(self.fh) self.mwiterator = Dump.from_file(self.fh)
self.namespace_map = { ns.id : ns.name for ns in
self.mwiterator.site_info.namespaces }
self.__pages = self.load_pages() self.__pages = self.load_pages()
def load_pages(self): def load_pages(self):
for page in self.mwiterator: for page in self.mwiterator:
yield WikiqPage(page, collapse_user=self.collapse_user) yield WikiqPage(page,
namespace_map = self.namespace_map,
collapse_user=self.collapse_user)
def __iter__(self): def __iter__(self):
return self.__pages return self.__pages
@@ -41,18 +46,20 @@ class WikiqIterator():
def __next__(self): def __next__(self):
return next(self._pages) return next(self._pages)
class WikiqPage(): class WikiqPage():
__slots__ = ('id', 'title', 'namespace', 'redirect', __slots__ = ('id', 'title', 'namespace', 'redirect',
'restrictions', 'mwpage', '__revisions', 'restrictions', 'mwpage', '__revisions',
'collapse_user') 'collapse_user')
def __init__(self, page, collapse_user=False): def __init__(self, page, namespace_map, collapse_user=False):
self.id = page.id self.id = page.id
self.title = page.title
self.namespace = page.namespace self.namespace = page.namespace
self.redirect = page.redirect if page.namespace != 0:
self.title = ':'.join([namespace_map[page.namespace], page.title])
else:
self.title = page.title
self.restrictions = page.restrictions self.restrictions = page.restrictions
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.mwpage = page self.mwpage = page
self.__revisions = self.rev_list() self.__revisions = self.rev_list()
@@ -76,7 +83,14 @@ class WikiqPage():
else: else:
if self.collapse_user: if self.collapse_user:
# yield if this is the last edit in a seq by a user and reset # yield if this is the last edit in a seq by a user and reset
if not rev.contributor.user_text == prev_rev.contributor.user_text: # also yield if we do know who the user is
if rev.deleted.user or prev_rev.deleted.user:
yield prev_rev
collapsed_revs = 1
rev.collapsed_revs = collapsed_revs
elif not rev.user.text == prev_rev.user.text:
yield prev_rev yield prev_rev
collapsed_revs = 1 collapsed_revs = 1
rev.collapsed_revs = collapsed_revs rev.collapsed_revs = collapsed_revs
@@ -89,6 +103,7 @@ class WikiqPage():
yield prev_rev yield prev_rev
prev_rev = rev prev_rev = rev
# also yield the final time # also yield the final time
yield prev_rev yield prev_rev
@@ -100,13 +115,13 @@ class WikiqPage():
class WikiqParser(): class WikiqParser():
def __init__(self, input_file, output_file, collapse_user=False, persist=False, urlencode=False, persist_legacy=False):
def __init__(self, input_file, output_file, collapse_user=False, persist=False, urlencode=False):
self.input_file = input_file self.input_file = input_file
self.output_file = output_file self.output_file = output_file
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.persist = persist self.persist = persist
self.persist_legacy = persist_legacy
self.printed_header = False self.printed_header = False
self.namespaces = [] self.namespaces = []
self.urlencode = urlencode self.urlencode = urlencode
@@ -137,17 +152,26 @@ class WikiqParser():
dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user) dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
# extract list of namspaces # extract list of namspaces
self.namespaces = {ns.name : ns.id for ns in dump.mwiterator.namespaces} self.namespaces = {ns.name : ns.id for ns in dump.mwiterator.site_info.namespaces}
page_count = 0 page_count = 0
rev_count = 0 rev_count = 0
# Iterate through pages # Iterate through pages
for page in dump: for page in dump:
if self.persist: rev_detector = mwreverts.Detector()
state = persistence.State()
if self.persist or self.persist_legacy:
window = deque(maxlen=PERSISTENCE_RADIUS) window = deque(maxlen=PERSISTENCE_RADIUS)
rev_detector = reverts.Detector() if not self.persist_legacy:
state = mwpersistence.DiffState(SequenceMatcher(tokenizer = wikitext_split),
revert_radius=PERSISTENCE_RADIUS)
else:
from mw.lib import persistence
state = persistence.State()
# Iterate through a page's revisions # Iterate through a page's revisions
for rev in page: for rev in page:
@@ -155,23 +179,28 @@ class WikiqParser():
rev_data = {'revid' : rev.id, rev_data = {'revid' : rev.id,
'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'articleid' : page.id, 'articleid' : page.id,
'editor_id' : "" if rev.contributor.id == None else rev.contributor.id, 'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
'title' : '"' + page.title + '"', 'title' : '"' + page.title + '"',
'namespace' : page.namespace if page.namespace else self.__get_namespace_from_title(page.title), 'namespace' : page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title),
'deleted' : "TRUE" if rev.text.deleted else "FALSE" } 'deleted' : "TRUE" if rev.deleted.text else "FALSE" }
# if revisions are deleted, /many/ things will be missing # if revisions are deleted, /many/ things will be missing
if rev.text.deleted: if rev.deleted.text:
rev_data['text_chars'] = "" rev_data['text_chars'] = ""
rev_data['sha1'] = "" rev_data['sha1'] = ""
rev_data['revert'] = "" rev_data['revert'] = ""
rev_data['reverteds'] = "" rev_data['reverteds'] = ""
else: else:
# rev.text can be None if the page has no text
if not rev.text:
rev.text = ""
# if text exists, we'll check for a sha1 and generate one otherwise # if text exists, we'll check for a sha1 and generate one otherwise
if rev.sha1: if rev.sha1:
text_sha1 = rev.sha1 text_sha1 = rev.sha1
else: else:
text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest() text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
rev_data['sha1'] = text_sha1 rev_data['sha1'] = text_sha1
@@ -181,6 +210,7 @@ class WikiqParser():
# generate revert data # generate revert data
revert = rev_detector.process(text_sha1, rev.id) revert = rev_detector.process(text_sha1, rev.id)
if revert: if revert:
rev_data['revert'] = "TRUE" rev_data['revert'] = "TRUE"
rev_data['reverteds'] = '"' + ",".join([str(x) for x in revert.reverteds]) + '"' rev_data['reverteds'] = '"' + ",".join([str(x) for x in revert.reverteds]) + '"'
@@ -191,10 +221,10 @@ class WikiqParser():
# if the fact that the edit was minor can be hidden, this might be an issue # if the fact that the edit was minor can be hidden, this might be an issue
rev_data['minor'] = "TRUE" if rev.minor else "FALSE" rev_data['minor'] = "TRUE" if rev.minor else "FALSE"
if rev.contributor.user_text: if not rev.deleted.user:
# wrap user-defined editors in quotes for fread # wrap user-defined editors in quotes for fread
rev_data['editor'] = '"' + rev.contributor.user_text + '"' rev_data['editor'] = '"' + rev.user.text + '"'
rev_data['anon'] = "TRUE" if rev.contributor.id == None else "FALSE" rev_data['anon'] = "TRUE" if rev.user.id == None else "FALSE"
else: else:
rev_data['anon'] = "" rev_data['anon'] = ""
@@ -211,12 +241,19 @@ class WikiqParser():
if self.collapse_user: if self.collapse_user:
rev_data['collapsed_revs'] = rev.collapsed_revs rev_data['collapsed_revs'] = rev.collapsed_revs
if self.persist: if self.persist or self.persist_legacy:
if rev.text.deleted: if rev.deleted.text:
for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]: for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]:
old_rev_data[k] = None old_rev_data[k] = None
else: else:
_, tokens_added, tokens_removed = state.process(rev.text, rev.id, text_sha1)
if not self.persist_legacy:
_, tokens_added, tokens_removed = state.update(rev.text, rev.id)
else:
_, tokens_added, tokens_removed = state.process(rev.text, rev.id, text_sha1)
window.append((rev.id, rev_data, tokens_added, tokens_removed)) window.append((rev.id, rev_data, tokens_added, tokens_removed))
if len(window) == PERSISTENCE_RADIUS: if len(window) == PERSISTENCE_RADIUS:
@@ -236,7 +273,7 @@ class WikiqParser():
rev_count += 1 rev_count += 1
if self.persist: if self.persist or self.persist_legacy:
# print out metadata for the last RADIUS revisions # print out metadata for the last RADIUS revisions
for i, item in enumerate(window): for i, item in enumerate(window):
# if the window was full, we've already printed item 0 # if the window was full, we've already printed item 0
@@ -277,7 +314,7 @@ def open_input_file(input_filename):
elif re.match(r'.*\.gz$', input_filename): elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename] cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename): elif re.match(r'.*\.bz2$', input_filename):
cmd = ["zcat", input_filename] cmd = ["bzcat", "-dk", input_filename]
try: try:
input_file = Popen(cmd, stdout=PIPE).stdout input_file = Popen(cmd, stdout=PIPE).stdout
@@ -295,64 +332,22 @@ def open_output_file(input_filename):
return output_file return output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
# arguments for the input direction class IPCheck(object):
parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1, # IP address regexes taken from https://gist.github.com/mnordhoff/2213179
help="Directory for output files.") ipv4_address = re.compile('^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$')
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true", ipv6_address_or_addrz = re.compile('^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)(?:%25(?:[A-Za-z0-9\\-._~]|%[0-9A-Fa-f]{2})+)?$')
help="Write output to standard out (do not create dump file)")
parser.add_argument('--collapse-user', dest="collapse_user", action="store_true", @staticmethod
help="Operate only on the final revision made by user a user within all sequences of consecutive edits made by a user. This can be useful for addressing issues with text persistence measures.") def is_ip(username):
if not type(username) is str:
return False
parser.add_argument('-p', '--persistence', dest="persist", action="store_true", '''Check if a username is an ip (v4 or v6) address. We use this as
help="Compute and report measures of content persistent: (1) persistent token revisions, (2) tokens added, and (3) number of revision used in computing the first measure.") a marker of whether the user is anonymous.'''
if IPCheck.ipv4_address.match(username) or IPCheck.ipv6_address_or_addrz.match(username):
parser.add_argument('-u', '--url-encode', dest="urlencode", action="store_true", return True
help="Output url encoded text strings. This works around some data issues like newlines in editor names. In the future it may be used to output other text data.")
args = parser.parse_args()
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename)
# open directory for output
if args.output_dir:
output_dir = args.output_dir[0]
else: else:
output_dir = "." return False
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
output_file = sys.stdout
else:
filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename)
wikiq = WikiqParser(input_file, output_file,
collapse_user=args.collapse_user,
persist=args.persist,
urlencode=args.urlencode)
wikiq.process()
# close things
input_file.close()
output_file.close()
else:
wikiq = WikiqParser(sys.stdin, sys.stdout,
collapse_user=args.collapse_user,
persist=args.persist,
urlencode=args.urlencode)
wikiq.process()
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")