The big challenges here (and remaining) are as follows: 1. Deltas requires changes to be given at the token level, whereas wikidiff2 reports changes at the byte level. Thus, it is often required to tokenize sequences of text to convert to the desired token indices. As-is this is done inefficiently, often requiring re-tokenization of previously-tokenized sequences. A better implementation would incrementally tokenize, or automatically find the referenced sequences. 2. Deltas only allows for Equal/Insert/Delete operations, while wikidiff2 also detects paragraph moves. These paragraph moves are NOT equivalent to Equal, as the moved paragraphs are not guaranteed to be equivalent, just very similar. Wikidiff2 does not report changes to moved paragraphs, so to preserve token persistence, a difference algorithm would need to be performed on the before/after sequences. A stopgap (currently implemented) is to turn these into strict deletions/insertions. 3. There appears to be a lot of memory consumption, and sometimes this results in memory overflow. I am unsure if this is a memory leak or simply that re-tokenizing causes significant enough memory throughput that my machine can't handle it. 4. Deltas expects all tokens in the before/after text to be covered by segment ranges of Equal/Insert/Delete, but wikidiff2 does not appear to ever emit any Equal ranges, instead skipping them. These ranges must be computed and inserted in sequence. As-is the code does not correctly handle unchanged text at the end of pages. Signed-off-by: Will Beason <willbeason@gmail.com>
366 lines
14 KiB
Python
366 lines
14 KiB
Python
import shutil
|
|
import unittest
|
|
import os
|
|
import subprocess
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pandas import DataFrame
|
|
from pandas.testing import assert_frame_equal, assert_series_equal
|
|
from io import StringIO
|
|
import tracemalloc
|
|
from typing import Final, Union
|
|
|
|
# Make references to files and wikiq relative to this file, not to the current working directory.
|
|
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
|
WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq")
|
|
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
|
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
|
SAILORMOON: Final[str] = "sailormoon"
|
|
TWINPEAKS: Final[str] = "twinpeaks"
|
|
REGEXTEST: Final[str] = "regextest"
|
|
|
|
|
|
def setup():
|
|
tracemalloc.start()
|
|
|
|
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
|
|
if not os.path.exists(TEST_OUTPUT_DIR):
|
|
os.mkdir(TEST_OUTPUT_DIR)
|
|
|
|
|
|
# Always run setup, even if this is executed via "python -m unittest" rather
|
|
# than as __main__.
|
|
setup()
|
|
|
|
|
|
class WikiqTester:
|
|
def __init__(self,
|
|
wiki: str,
|
|
case_name: str,
|
|
suffix: Union[str, None] = None,
|
|
in_compression: str = "bz2",
|
|
baseline_format: str = "tsv",
|
|
out_format: str = "tsv",
|
|
):
|
|
self.input_file = os.path.join(TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression))
|
|
|
|
basename = "{0}_{1}".format(case_name, wiki)
|
|
if suffix:
|
|
basename = "{0}_{1}".format(basename, suffix)
|
|
|
|
self.output = os.path.join(TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format))
|
|
|
|
if os.path.exists(self.output):
|
|
if os.path.isfile(self.output):
|
|
os.remove(self.output)
|
|
else:
|
|
shutil.rmtree(self.output)
|
|
|
|
if out_format == "parquet":
|
|
os.makedirs(self.output, exist_ok=True)
|
|
|
|
if suffix is None:
|
|
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
|
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
|
else:
|
|
self.wikiq_baseline_name = "{0}_{1}.{2}".format(wiki, suffix, baseline_format)
|
|
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
|
|
|
# If case_name is unset, there are no relevant baseline or test files.
|
|
if case_name is not None:
|
|
self.baseline_file = os.path.join(BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name))
|
|
|
|
def call_wikiq(self, *args: str, out: bool = True):
|
|
"""
|
|
Calls wikiq with the passed arguments on the input file relevant to the test.
|
|
:param args: The command line arguments to pass to wikiq.
|
|
:param out: Whether to pass an output argument to wikiq.
|
|
:return: The output of the wikiq call.
|
|
"""
|
|
if out:
|
|
call = ' '.join([WIKIQ, self.input_file, "-o", self.output, *args])
|
|
else:
|
|
call = ' '.join([WIKIQ, self.input_file, *args])
|
|
|
|
print(call)
|
|
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
|
|
|
|
|
# with / without pwr DONE
|
|
# with / without url encode DONE
|
|
# with / without collapse user DONE
|
|
# with output to stdout DONE
|
|
# note that the persistence radius is 7 by default
|
|
# reading various file formats including
|
|
# 7z, gz, bz2, xml DONE
|
|
# wikia and wikipedia data DONE
|
|
# malformed xmls DONE
|
|
|
|
class WikiqTestCase(unittest.TestCase):
|
|
def test_WP_noargs(self):
|
|
tester = WikiqTester(IKWIKI, "noargs")
|
|
|
|
try:
|
|
# tester.call_wikiq()
|
|
tester.call_wikiq("--wikidiff-url=http://localhost:8000")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_namespaces(self):
|
|
tester = WikiqTester(IKWIKI, "namespaces")
|
|
|
|
try:
|
|
tester.call_wikiq("-n 0", "-n 1")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.output)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_revert_radius(self):
|
|
tester = WikiqTester(IKWIKI, "revert_radius")
|
|
|
|
try:
|
|
tester.call_wikiq("-n 0", "-n 1", "-rr 1")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.output)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_no_revert_radius(self):
|
|
tester = WikiqTester(IKWIKI, "no_revert_radius")
|
|
|
|
try:
|
|
tester.call_wikiq("-rr 0")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.output)
|
|
num_reverted = sum(i is None for i in test.revert)
|
|
self.assertEqual(num_reverted, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_collapse_user(self):
|
|
tester = WikiqTester(IKWIKI, "collapse_user")
|
|
|
|
try:
|
|
tester.call_wikiq("--collapse-user")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_noargs(self):
|
|
tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_collapse_user(self):
|
|
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--collapse-user", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_segment(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence_segment", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence segment", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_legacy(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence_legacy", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence legacy", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
|
|
test = test.reindex(columns=sorted(test.columns))
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_malformed_noargs(self):
|
|
tester = WikiqTester(wiki=TWINPEAKS, case_name="noargs", in_compression="7z")
|
|
want_exception = 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0'
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
errlines = exc.stderr.decode("utf8").splitlines()
|
|
self.assertEqual(errlines[-1], want_exception)
|
|
else:
|
|
self.fail("No exception raised, want: {}".format(want_exception))
|
|
|
|
def test_stdout_noargs(self):
|
|
tester = WikiqTester(wiki=SAILORMOON, case_name="noargs", in_compression="7z")
|
|
|
|
try:
|
|
outs = tester.call_wikiq("--stdout", "--fandom-2020", out=False).decode("utf8")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(StringIO(outs))
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_bad_regex(self):
|
|
tester = WikiqTester(wiki=REGEXTEST, case_name="bad_regex")
|
|
|
|
# sample arguments for checking that bad arguments get terminated / test_regex_arguments
|
|
bad_arguments_list = [
|
|
# label is missing
|
|
"-RP '\\b\\d+\\b'",
|
|
# number of reg and number of labels do not match
|
|
"-RP 'NPO V' -RP THE -RPl testlabel",
|
|
# cp but rp label
|
|
"-CP '(Tamil|Li)' -RPl testlabel",
|
|
# regex is missing
|
|
"-CPl testlabel",
|
|
"-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
|
|
]
|
|
|
|
for arguments in bad_arguments_list:
|
|
try:
|
|
tester.call_wikiq("--stdout", arguments, out=False)
|
|
except subprocess.CalledProcessError as exc:
|
|
# we want to check that the bad arguments were caught and sys.exit is stopping the code
|
|
print(exc.stderr.decode("utf-8"))
|
|
else:
|
|
self.fail("No exception raised, want Exception")
|
|
|
|
def test_good_regex(self):
|
|
# sample arguments for checking the outcomes of good arguments / test_basic_regex
|
|
good_arguments_list = [
|
|
"-RP '\\b\\d{3}\\b' -RPl threedigits",
|
|
"-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
|
|
"-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
|
|
"-CP 'WP:EVADE' -CPl wp_evade"
|
|
]
|
|
|
|
for i, arguments in enumerate(good_arguments_list):
|
|
tester = WikiqTester(wiki=REGEXTEST, case_name="basic", suffix=str(i))
|
|
|
|
try:
|
|
tester.call_wikiq(arguments)
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
print(i)
|
|
|
|
def test_capturegroup_regex(self):
|
|
cap_arguments_list = [
|
|
"-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
|
|
"-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
|
|
]
|
|
|
|
for i, arguments in enumerate(cap_arguments_list):
|
|
tester = WikiqTester(wiki=REGEXTEST, case_name="capturegroup", suffix=str(i))
|
|
|
|
try:
|
|
tester.call_wikiq(arguments)
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
test = pd.read_table(tester.output)
|
|
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_parquet(self):
|
|
tester = WikiqTester(IKWIKI, "noargs", out_format="parquet")
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test: DataFrame = pd.read_parquet(tester.output)
|
|
# test = test.drop(['reverteds'], axis=1)
|
|
|
|
baseline: DataFrame = pd.read_table(tester.baseline_file)
|
|
|
|
# Pandas does not read timestamps as the desired datetime type.
|
|
baseline['date_time'] = pd.to_datetime(baseline['date_time'])
|
|
# Split strings to the arrays of reverted IDs so they can be compared.
|
|
baseline['revert'] = baseline['revert'].replace(np.nan, None)
|
|
baseline['reverteds'] = baseline['reverteds'].replace(np.nan, None)
|
|
# baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']]
|
|
baseline['sha1'] = baseline['sha1'].replace(np.nan, None)
|
|
baseline['editor'] = baseline['editor'].replace(np.nan, None)
|
|
baseline['anon'] = baseline['anon'].replace(np.nan, None)
|
|
|
|
for index, row in baseline.iterrows():
|
|
if row['revert'] != test['revert'][index]:
|
|
print(row['revid'], ":", row['revert'], "!=", test['revert'][index])
|
|
|
|
for col in baseline.columns:
|
|
try:
|
|
assert_series_equal(test[col], baseline[col], check_like=True, check_dtype=False)
|
|
except ValueError as exc:
|
|
print(f"Error comparing column {col}")
|
|
self.fail(exc)
|
|
|
|
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|