mediawiki_dump_tools/test/Wikiq_Unit_Test.py
Will Beason 4d3900b541 Standardize calling for wikiq in tests
This way failures show the output of stderr/etc.

Also create path constant strings for use in tests to avoid repetition
and make changes easier.

Signed-off-by: Will Beason <willbeason@gmail.com>
2025-05-27 14:27:49 -05:00

416 lines
16 KiB
Python

import unittest
import os
import subprocess
from shutil import copyfile
import pandas as pd
from pandas.testing import assert_frame_equal
from io import StringIO
import tracemalloc
from typing import Final
# Make references to files and wikiq relative to this file, not to the current working directory.
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
def setup():
tracemalloc.start()
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
if not os.path.exists(TEST_OUTPUT_DIR):
os.mkdir(TEST_OUTPUT_DIR)
else:
# Avoid subsequent calls to tests interfering with each other.
# Otherwise, a test may erroneously pass if the program has no output
# but a previous run output what was expected.
for f in os.listdir(TEST_OUTPUT_DIR):
os.remove(os.path.join(TEST_OUTPUT_DIR, f))
setup()
def call_wikiq(*args: str):
call = ' '.join([WIKIQ, *args])
print(call)
subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
class Test_Wikipedia(unittest.TestCase):
def setUp(self):
wiki = 'ikwiki-20180301-pages-meta-history'
self.wikiq_out_name = wiki + ".tsv"
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
infile = "{0}.xml.bz2".format(wiki)
input_dir = os.path.join(TEST_DIR, "dumps")
self.input_file = os.path.join(TEST_DIR, input_dir, infile)
def test_WP_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
try:
call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR, "--url-encode")
except subprocess.CalledProcessError as exc:
self.fail(exc.stderr.decode("utf8"))
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_namespaces(self):
print(os.path.abspath('.'))
test_filename = "namespaces_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
try:
call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR,
"-n 0", "-n 1")
except subprocess.CalledProcessError as exc:
self.fail(exc.stderr.decode("utf8"))
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
self.assertEqual(num_wrong_ns, 0)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_revert_radius(self):
print(os.path.abspath('.'))
test_filename = "revert_radius_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
try:
call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR,
"-n 0", "-n 1", "-rr 1")
except subprocess.CalledProcessError as exc:
self.fail(exc.stderr.decode("utf8"))
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
# as a test let's make sure that we get equal data frames
test = pd.read_table(test_file)
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
self.assertEqual(num_wrong_ns, 0)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
class Test_Basic(unittest.TestCase):
def setUp(self):
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = WIKIQ + " {0} -o {1}"
self.input_dir = os.path.join(TEST_DIR, "dumps")
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
def test_noargs(self):
test_filename = "noargs_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_collapse_user(self):
test_filename = "collapse-user_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " --collapse-user"
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_segment(self):
test_filename = "persistence_segment_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " --persistence segment"
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_legacy(self):
test_filename = "persistence_legacy_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " --persistence legacy"
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr(self):
test_filename = "persistence_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " --persistence"
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_url_encode(self):
test_filename = "url-encode_" + self.wikiq_out_name
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " --url-encode"
with subprocess.Popen(call, stdout=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
test = pd.read_table(test_file)
baseline = pd.read_table(baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
class Test_Malformed(unittest.TestCase):
def setUp(self):
self.wiki = 'twinpeaks'
self.wikiq_out_name = self.wiki + ".tsv"
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = WIKIQ + " {0} -o {1}"
self.input_dir = os.path.join(TEST_DIR, "dumps")
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
def test_malformed_noargs(self):
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertNotEqual(proc.returncode, 0)
outs, errs = proc.communicate()
errlines = str(errs).split("\\n")
self.assertEqual(errlines[-2], 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
class Test_Stdout(unittest.TestCase):
def setUp(self):
self.wiki = 'sailormoon'
self.wikiq_out_name = self.wiki + ".tsv"
self.infile = "{0}.xml.7z".format(self.wiki)
self.base_call = WIKIQ + " {0} --stdout"
self.input_dir = os.path.join(TEST_DIR, "dumps")
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
def test_noargs(self):
call = self.base_call.format(self.input_file)
print(call)
proc = subprocess.run(call, stdout=subprocess.PIPE, shell=True)
outs = proc.stdout.decode("utf8")
test_file = "noargs_" + self.wikiq_out_name
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_file)
print(baseline_file)
test = pd.read_table(StringIO(outs))
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
class Test_Regex(unittest.TestCase):
def setUp(self):
self.wiki = 'regextest'
self.wikiq_out_name = self.wiki + '.tsv'
self.infile = "{0}.xml.bz2".format(self.wiki)
self.input_dir = os.path.join(TEST_DIR, "dumps")
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
# we have two base calls, one for checking arguments and the other for checking outputs
self.base_call = WIKIQ + " {0}"
self.base_call_outs = WIKIQ + " {0} -o {1}"
# sample arguments for checking that bad arguments get terminated / test_regex_arguments
self.bad_arguments_list = [
# label is missing
"-RP '\\b\\d+\\b'",
# number of reg and number of labels do not match
"-RP 'NPO V' -RP THE -RPl testlabel",
# cp but rp label
"-CP '(Tamil|Li)' -RPl testlabel",
# regex is missing
"-CPl testlabel",
"-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
]
# sample arguments for checking the outcomes of good arguments / test_basic_regex
self.good_arguments_list = [
"-RP '\\b\\d{3}\\b' -RPl threedigits",
"-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
"-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
"-CP 'WP:EVADE' -CPl wp_evade"
]
self.cap_arguments_list = [
"-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
"-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
]
def test_regex_arguments(self):
for arguments in self.bad_arguments_list:
call = self.base_call.format(self.input_file)
call = call + " --stdout " + arguments
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
stdout, stderr = proc.communicate()
# we want to check that the bad arguments were caught and sys.exit is stopping the code
print(stderr.decode("utf-8"))
self.assertNotEqual(proc.returncode, 0)
def test_basic_regex(self):
for i, arguments in enumerate(self.good_arguments_list):
test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
# print(test_filename)
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " " + arguments
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
test = pd.read_table(test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
print(i)
def test_capturegroup_regex(self):
for i, arguments in enumerate(self.cap_arguments_list):
test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
print(test_filename)
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
if os.path.exists(test_file):
os.remove(test_file)
call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR)
call = call + " " + arguments
print(call)
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
proc.wait()
self.assertEqual(proc.returncode, 0)
copyfile(self.call_output, test_file)
test = pd.read_table(test_file)
baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename)
baseline = pd.read_table(baseline_file)
assert_frame_equal(test, baseline, check_like=True)
if __name__ == '__main__':
unittest.main()