Also add test to ensure functionality works. Signed-off-by: Will Beason <willbeason@gmail.com>
406 lines
15 KiB
Python
406 lines
15 KiB
Python
import unittest
|
|
import os
|
|
import subprocess
|
|
from shutil import copyfile
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pandas import DataFrame
|
|
from pandas.testing import assert_frame_equal, assert_series_equal
|
|
from io import StringIO
|
|
import tracemalloc
|
|
from typing import Final
|
|
|
|
# Make references to files and wikiq relative to this file, not to the current working directory.
|
|
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
|
WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq")
|
|
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
|
PARQUET_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output.parquet")
|
|
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
|
SAILORMOON: Final[str] = "sailormoon"
|
|
TWINPEAKS: Final[str] = "twinpeaks"
|
|
REGEXTEST: Final[str] = "regextest"
|
|
|
|
|
|
def setup():
|
|
tracemalloc.start()
|
|
|
|
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
|
|
if not os.path.exists(TEST_OUTPUT_DIR):
|
|
os.mkdir(TEST_OUTPUT_DIR)
|
|
if not os.path.exists(PARQUET_OUTPUT_DIR):
|
|
os.mkdir(PARQUET_OUTPUT_DIR)
|
|
|
|
|
|
# Always run setup, even if this is executed via "python -m unittest" rather
|
|
# than as __main__.
|
|
setup()
|
|
|
|
|
|
class WikiqTester:
|
|
def __init__(self,
|
|
wiki: str,
|
|
case_name: str | None = None,
|
|
suffix: str | None = None,
|
|
in_compression: str = "bz2",
|
|
baseline_format: str = "tsv",
|
|
out_format: str = "tsv",
|
|
):
|
|
self.input_file = os.path.join(TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression))
|
|
|
|
if out_format == "tsv":
|
|
self.output_dir = TEST_OUTPUT_DIR
|
|
else:
|
|
self.output_dir = "{0}.parquet".format(TEST_OUTPUT_DIR)
|
|
|
|
if suffix is None:
|
|
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
|
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
|
else:
|
|
self.wikiq_baseline_name = "{0}_{1}.{2}".format(wiki, suffix, baseline_format)
|
|
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
|
self.call_output = os.path.join(self.output_dir, "{0}.{1}".format(wiki, out_format))
|
|
|
|
# If case_name is unset, there are no relevant baseline or test files.
|
|
if case_name is not None:
|
|
self.baseline_file = os.path.join(BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name))
|
|
self.test_file = os.path.join(self.output_dir, "{0}_{1}".format(case_name, self.wikiq_out_name))
|
|
if os.path.exists(self.test_file):
|
|
os.remove(self.test_file)
|
|
|
|
def call_wikiq(self, *args: str, out: bool = True):
|
|
"""
|
|
Calls wikiq with the passed arguments on the input file relevant to the test.
|
|
:param args: The command line arguments to pass to wikiq.
|
|
:param out: Whether to pass an output argument to wikiq.
|
|
:return: The output of the wikiq call.
|
|
"""
|
|
if out:
|
|
call = ' '.join([WIKIQ, self.input_file, "-o", self.output_dir, *args])
|
|
else:
|
|
call = ' '.join([WIKIQ, self.input_file, *args])
|
|
|
|
print(call)
|
|
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
|
|
|
|
|
# with / without pwr DONE
|
|
# with / without url encode DONE
|
|
# with / without collapse user DONE
|
|
# with output to stdout DONE
|
|
# note that the persistence radius is 7 by default
|
|
# reading various file formats including
|
|
# 7z, gz, bz2, xml DONE
|
|
# wikia and wikipedia data DONE
|
|
# malformed xmls DONE
|
|
|
|
class WikiqTestCase(unittest.TestCase):
|
|
def test_WP_noargs(self):
|
|
tester = WikiqTester(IKWIKI, "noargs")
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_url_encode(self):
|
|
tester = WikiqTester(IKWIKI, "url-encode")
|
|
|
|
try:
|
|
tester.call_wikiq("--url-encode")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_namespaces(self):
|
|
tester = WikiqTester(IKWIKI, "namespaces")
|
|
|
|
try:
|
|
tester.call_wikiq("-n 0", "-n 1")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.test_file)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_revert_radius(self):
|
|
tester = WikiqTester(IKWIKI, "revert_radius")
|
|
|
|
try:
|
|
tester.call_wikiq("-n 0", "-n 1", "-rr 1")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.test_file)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_no_revert_radius(self):
|
|
tester = WikiqTester(IKWIKI, "no_revert_radius")
|
|
|
|
try:
|
|
tester.call_wikiq("-rr 0")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(tester.test_file)
|
|
num_reverted = sum(i is None for i in test.revert)
|
|
self.assertEqual(num_reverted, 0)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_noargs(self):
|
|
tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_collapse_user(self):
|
|
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--collapse-user", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_segment(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence_segment", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence segment", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_legacy(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence_legacy", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence legacy", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr(self):
|
|
tester = WikiqTester(SAILORMOON, "persistence", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--persistence", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
|
|
test = test.reindex(columns=sorted(test.columns))
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_url_encode(self):
|
|
tester = WikiqTester(SAILORMOON, "url-encode", in_compression="7z")
|
|
|
|
try:
|
|
tester.call_wikiq("--url-encode", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
test = pd.read_table(tester.test_file)
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
|
|
test = test.reindex(columns=sorted(test.columns))
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_malformed_noargs(self):
|
|
tester = WikiqTester(wiki=TWINPEAKS, in_compression="7z")
|
|
want_exception = 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0'
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
errlines = exc.stderr.decode("utf8").splitlines()
|
|
self.assertEqual(errlines[-1], want_exception)
|
|
else:
|
|
self.fail("No exception raised, want: {}".format(want_exception))
|
|
|
|
def test_stdout_noargs(self):
|
|
tester = WikiqTester(wiki=SAILORMOON, case_name="noargs", in_compression="7z")
|
|
|
|
try:
|
|
outs = tester.call_wikiq( "--stdout", "--fandom-2020", out=False).decode("utf8")
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(StringIO(outs))
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_bad_regex(self):
|
|
tester = WikiqTester(wiki=REGEXTEST)
|
|
|
|
# sample arguments for checking that bad arguments get terminated / test_regex_arguments
|
|
bad_arguments_list = [
|
|
# label is missing
|
|
"-RP '\\b\\d+\\b'",
|
|
# number of reg and number of labels do not match
|
|
"-RP 'NPO V' -RP THE -RPl testlabel",
|
|
# cp but rp label
|
|
"-CP '(Tamil|Li)' -RPl testlabel",
|
|
# regex is missing
|
|
"-CPl testlabel",
|
|
"-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
|
|
]
|
|
|
|
for arguments in bad_arguments_list:
|
|
try:
|
|
tester.call_wikiq("--stdout", arguments, out=False)
|
|
except subprocess.CalledProcessError as exc:
|
|
# we want to check that the bad arguments were caught and sys.exit is stopping the code
|
|
print(exc.stderr.decode("utf-8"))
|
|
else:
|
|
self.fail("No exception raised, want Exception")
|
|
|
|
def test_good_regex(self):
|
|
# sample arguments for checking the outcomes of good arguments / test_basic_regex
|
|
good_arguments_list = [
|
|
"-RP '\\b\\d{3}\\b' -RPl threedigits",
|
|
"-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
|
|
"-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
|
|
"-CP 'WP:EVADE' -CPl wp_evade"
|
|
]
|
|
|
|
for i, arguments in enumerate(good_arguments_list):
|
|
tester = WikiqTester(wiki=REGEXTEST, case_name="basic", suffix=str(i))
|
|
|
|
try:
|
|
tester.call_wikiq(arguments)
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
print(i)
|
|
|
|
def test_capturegroup_regex(self):
|
|
cap_arguments_list = [
|
|
"-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
|
|
"-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
|
|
]
|
|
|
|
for i, arguments in enumerate(cap_arguments_list):
|
|
tester = WikiqTester(wiki=REGEXTEST, case_name="capturegroup", suffix=str(i))
|
|
|
|
try:
|
|
tester.call_wikiq(arguments)
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
test = pd.read_table(tester.test_file)
|
|
|
|
baseline = pd.read_table(tester.baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_parquet(self):
|
|
tester = WikiqTester(IKWIKI, "noargs", out_format="parquet")
|
|
|
|
try:
|
|
tester.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
self.fail(exc.stderr.decode("utf8"))
|
|
|
|
copyfile(tester.call_output, tester.test_file)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test: DataFrame = pd.read_parquet(tester.test_file)
|
|
# test = test.drop(['reverteds'], axis=1)
|
|
|
|
baseline: DataFrame = pd.read_table(tester.baseline_file)
|
|
|
|
# Pandas does not read timestamps as the desired datetime type.
|
|
baseline['date_time'] = pd.to_datetime(baseline['date_time'])
|
|
# Split strings to the arrays of reverted IDs so they can be compared.
|
|
baseline['revert'] = baseline['revert'].replace(np.nan, None)
|
|
baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']]
|
|
baseline['sha1'] = baseline['sha1'].replace(np.nan, None)
|
|
baseline['editor'] = baseline['editor'].replace(np.nan, None)
|
|
baseline['anon'] = baseline['anon'].replace(np.nan, None)
|
|
|
|
for index, row in baseline.iterrows():
|
|
if row['editorid'] is None or test['editorid'][index] is None:
|
|
if row['editorid'] != test['editorid'][index]:
|
|
print(row['revid'], ":", row['editorid'], "!=", test['editorid'][index])
|
|
|
|
for col in baseline.columns:
|
|
try:
|
|
assert_series_equal(test[col], baseline[col], check_like=True, check_dtype=False)
|
|
except ValueError as exc:
|
|
print(f"Error comparing column {col}")
|
|
self.fail(exc)
|
|
|
|
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|