Tests no longer implicitly require that the caller be in a specific working directory. Signed-off-by: Will Beason <willbeason@gmail.com>
419 lines
17 KiB
Python
419 lines
17 KiB
Python
import unittest
|
|
import os
|
|
import subprocess
|
|
from shutil import copyfile
|
|
import pandas as pd
|
|
from pandas.testing import assert_frame_equal
|
|
from io import StringIO
|
|
import tracemalloc
|
|
from typing import Final
|
|
|
|
|
|
# Make references to files and wikiq relative to this file, not to the current working directory.
|
|
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
|
WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq")
|
|
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
|
|
|
# with / without pwr DONE
|
|
# with / without url encode DONE
|
|
# with / without collapse user DONE
|
|
# with output to stdout DONE
|
|
# note that the persistence radius is 7 by default
|
|
# reading various file formats including
|
|
# 7z, gz, bz2, xml DONE
|
|
# wikia and wikipedia data DONE
|
|
# malformed xmls DONE
|
|
|
|
class Test_Wikipedia(unittest.TestCase):
|
|
def setUp(self):
|
|
self.wiki = 'ikwiki-20180301-pages-meta-history'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.bz2".format(self.wiki)
|
|
|
|
self.base_call = WIKIQ + " {0} -o {1}"
|
|
self.input_dir = os.path.join(TEST_DIR, "dumps")
|
|
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
|
|
self.baseline_output_dir = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
def test_WP_url_encode(self):
|
|
test_filename = "url-encode_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --url-encode"
|
|
print(call)
|
|
try:
|
|
subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
print(exc.stderr.decode("utf8"))
|
|
self.fail()
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_namespaces(self):
|
|
print(os.path.abspath('.'))
|
|
test_filename = "namespaces_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " -n 0 -n 1"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(test_file)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_WP_revert_radius(self):
|
|
print(os.path.abspath('.'))
|
|
test_filename = "revert_radius_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " -n 0 -n 1 -rr 1"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
|
|
|
|
# as a test let's make sure that we get equal data frames
|
|
test = pd.read_table(test_file)
|
|
num_wrong_ns = sum(~ test.namespace.isin({0, 1}))
|
|
self.assertEqual(num_wrong_ns, 0)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
|
|
class Test_Basic(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.wiki = 'sailormoon'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = WIKIQ + " {0} -o {1}"
|
|
self.input_dir = os.path.join(TEST_DIR, "dumps")
|
|
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
|
|
self.baseline_output_dir = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
def test_noargs(self):
|
|
|
|
test_filename = "noargs_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_collapse_user(self):
|
|
test_filename = "collapse-user_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --collapse-user"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_segment(self):
|
|
test_filename = "persistence_segment_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --persistence segment"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr_legacy(self):
|
|
test_filename = "persistence_legacy_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --persistence legacy"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_pwr(self):
|
|
test_filename = "persistence_" + self.wikiq_out_name
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --persistence"
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
|
|
test = test.reindex(columns=sorted(test.columns))
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
def test_url_encode(self):
|
|
test_filename = "url-encode_" + self.wikiq_out_name
|
|
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " --url-encode"
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
test = pd.read_table(test_file)
|
|
baseline = pd.read_table(baseline_file)
|
|
|
|
test = test.reindex(columns=sorted(test.columns))
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
|
|
class Test_Malformed(unittest.TestCase):
|
|
def setUp(self):
|
|
self.wiki = 'twinpeaks'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = WIKIQ + " {0} -o {1}"
|
|
self.input_dir = os.path.join(TEST_DIR, "dumps")
|
|
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
|
|
|
|
def test_malformed_noargs(self):
|
|
call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR)
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertNotEqual(proc.returncode, 0)
|
|
outs, errs = proc.communicate()
|
|
errlines = str(errs).split("\\n")
|
|
self.assertEqual(errlines[-2], 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
|
|
|
|
|
|
class Test_Stdout(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.wiki = 'sailormoon'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = WIKIQ + " {0} --stdout"
|
|
self.input_dir = os.path.join(TEST_DIR, "dumps")
|
|
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
|
|
self.baseline_output_dir = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
def test_noargs(self):
|
|
call = self.base_call.format(self.input_file)
|
|
print(call)
|
|
proc = subprocess.run(call, stdout=subprocess.PIPE, shell=True)
|
|
outs = proc.stdout.decode("utf8")
|
|
|
|
test_file = "noargs_" + self.wikiq_out_name
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
|
print(baseline_file)
|
|
test = pd.read_table(StringIO(outs))
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
|
|
class Test_Regex(unittest.TestCase):
|
|
def setUp(self):
|
|
self.wiki = 'regextest'
|
|
self.wikiq_out_name = self.wiki + '.tsv'
|
|
self.infile = "{0}.xml.bz2".format(self.wiki)
|
|
|
|
self.input_dir = os.path.join(TEST_DIR, "dumps")
|
|
self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile)
|
|
|
|
self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name)
|
|
# we have two base calls, one for checking arguments and the other for checking outputs
|
|
self.base_call = WIKIQ + " {0}"
|
|
self.base_call_outs = WIKIQ + " {0} -o {1}"
|
|
|
|
self.baseline_output_dir = os.path.join(TEST_DIR, "baseline_output")
|
|
|
|
# sample arguments for checking that bad arguments get terminated / test_regex_arguments
|
|
self.bad_arguments_list = [
|
|
# label is missing
|
|
"-RP '\\b\\d+\\b'",
|
|
# number of reg and number of labels do not match
|
|
"-RP 'NPO V' -RP THE -RPl testlabel",
|
|
# cp but rp label
|
|
"-CP '(Tamil|Li)' -RPl testlabel",
|
|
# regex is missing
|
|
"-CPl testlabel",
|
|
"-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
|
|
]
|
|
|
|
# sample arguments for checking the outcomes of good arguments / test_basic_regex
|
|
self.good_arguments_list = [
|
|
"-RP '\\b\\d{3}\\b' -RPl threedigits",
|
|
"-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
|
|
"-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
|
|
"-CP 'WP:EVADE' -CPl wp_evade"
|
|
]
|
|
|
|
self.cap_arguments_list = [
|
|
"-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
|
|
"-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
|
|
]
|
|
|
|
def test_regex_arguments(self):
|
|
for arguments in self.bad_arguments_list:
|
|
call = self.base_call.format(self.input_file)
|
|
call = call + " --stdout " + arguments
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
stdout, stderr = proc.communicate()
|
|
# we want to check that the bad arguments were caught and sys.exit is stopping the code
|
|
print(stderr.decode("utf-8"))
|
|
|
|
self.assertNotEqual(proc.returncode, 0)
|
|
|
|
def test_basic_regex(self):
|
|
for i, arguments in enumerate(self.good_arguments_list):
|
|
|
|
test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
|
|
# print(test_filename)
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " " + arguments
|
|
print(call)
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
test = pd.read_table(test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
print(i)
|
|
|
|
def test_capturegroup_regex(self):
|
|
for i, arguments in enumerate(self.cap_arguments_list):
|
|
test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
|
|
print(test_filename)
|
|
test_file = os.path.join(TEST_OUTPUT_DIR, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR)
|
|
call = call + " " + arguments
|
|
print(call)
|
|
|
|
with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc:
|
|
proc.wait()
|
|
self.assertEqual(proc.returncode, 0)
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
test = pd.read_table(test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
baseline = pd.read_table(baseline_file)
|
|
assert_frame_equal(test, baseline, check_like=True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
tracemalloc.start()
|
|
|
|
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
|
|
if not os.path.exists(TEST_OUTPUT_DIR):
|
|
os.mkdir(TEST_OUTPUT_DIR)
|
|
else:
|
|
# Avoid subsequent calls to tests interfering with each other.
|
|
# Otherwise, a test may erroneously pass if the program has no output
|
|
# but a previous run output what was expected.
|
|
for f in os.listdir(TEST_OUTPUT_DIR):
|
|
os.remove(os.path.join(TEST_OUTPUT_DIR, f))
|
|
|
|
unittest.main()
|