import shutil import unittest import os import subprocess from shutil import copyfile import numpy as np import pandas as pd from pandas import DataFrame from pandas.testing import assert_frame_equal, assert_series_equal from io import StringIO import tracemalloc from typing import Final # Make references to files and wikiq relative to this file, not to the current working directory. TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__)) WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq") TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output") BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output") IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history" SAILORMOON: Final[str] = "sailormoon" TWINPEAKS: Final[str] = "twinpeaks" REGEXTEST: Final[str] = "regextest" def setup(): tracemalloc.start() # Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup. if not os.path.exists(TEST_OUTPUT_DIR): os.mkdir(TEST_OUTPUT_DIR) # Always run setup, even if this is executed via "python -m unittest" rather # than as __main__. setup() class WikiqTester: def __init__(self, wiki: str, case_name: str, suffix: str | None = None, in_compression: str = "bz2", baseline_format: str = "tsv", out_format: str = "tsv", ): self.input_file = os.path.join(TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)) basename = "{0}_{1}".format(case_name, wiki) if suffix: basename = "{0}_{1}".format(basename, suffix) self.output = os.path.join(TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)) if os.path.exists(self.output): if os.path.isfile(self.output): os.remove(self.output) else: shutil.rmtree(self.output) if out_format == "parquet": os.makedirs(self.output, exist_ok=True) if suffix is None: self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format) self.wikiq_out_name = "{0}.{1}".format(wiki, out_format) else: self.wikiq_baseline_name = "{0}_{1}.{2}".format(wiki, suffix, baseline_format) self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format) # If case_name is unset, there are no relevant baseline or test files. if case_name is not None: self.baseline_file = os.path.join(BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)) def call_wikiq(self, *args: str, out: bool = True): """ Calls wikiq with the passed arguments on the input file relevant to the test. :param args: The command line arguments to pass to wikiq. :param out: Whether to pass an output argument to wikiq. :return: The output of the wikiq call. """ if out: call = ' '.join([WIKIQ, self.input_file, "-o", self.output, *args]) else: call = ' '.join([WIKIQ, self.input_file, *args]) print(call) return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True) # with / without pwr DONE # with / without url encode DONE # with / without collapse user DONE # with output to stdout DONE # note that the persistence radius is 7 by default # reading various file formats including # 7z, gz, bz2, xml DONE # wikia and wikipedia data DONE # malformed xmls DONE class WikiqTestCase(unittest.TestCase): def test_WP_noargs(self): tester = WikiqTester(IKWIKI, "noargs") try: tester.call_wikiq() except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_WP_namespaces(self): tester = WikiqTester(IKWIKI, "namespaces") try: tester.call_wikiq("-n 0", "-n 1") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) # as a test let's make sure that we get equal data frames test = pd.read_table(tester.output) num_wrong_ns = sum(~ test.namespace.isin({0, 1})) self.assertEqual(num_wrong_ns, 0) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_WP_revert_radius(self): tester = WikiqTester(IKWIKI, "revert_radius") try: tester.call_wikiq("-n 0", "-n 1", "-rr 1") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) # as a test let's make sure that we get equal data frames test = pd.read_table(tester.output) num_wrong_ns = sum(~ test.namespace.isin({0, 1})) self.assertEqual(num_wrong_ns, 0) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_WP_no_revert_radius(self): tester = WikiqTester(IKWIKI, "no_revert_radius") try: tester.call_wikiq("-rr 0") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) # as a test let's make sure that we get equal data frames test = pd.read_table(tester.output) num_reverted = sum(i is None for i in test.revert) self.assertEqual(num_reverted, 0) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_WP_collapse_user(self): tester = WikiqTester(IKWIKI, "collapse_user") try: tester.call_wikiq("--collapse-user") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_noargs(self): tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z") try: tester.call_wikiq() except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_collapse_user(self): tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z") try: tester.call_wikiq("--collapse-user", "--fandom-2020") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr_segment(self): tester = WikiqTester(SAILORMOON, "persistence_segment", in_compression="7z") try: tester.call_wikiq("--persistence segment", "--fandom-2020") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr_legacy(self): tester = WikiqTester(SAILORMOON, "persistence_legacy", in_compression="7z") try: tester.call_wikiq("--persistence legacy", "--fandom-2020") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr(self): tester = WikiqTester(SAILORMOON, "persistence", in_compression="7z") try: tester.call_wikiq("--persistence", "--fandom-2020") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) test = test.reindex(columns=sorted(test.columns)) assert_frame_equal(test, baseline, check_like=True) def test_malformed_noargs(self): tester = WikiqTester(wiki=TWINPEAKS, case_name="noargs", in_compression="7z") want_exception = 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0' try: tester.call_wikiq() except subprocess.CalledProcessError as exc: errlines = exc.stderr.decode("utf8").splitlines() self.assertEqual(errlines[-1], want_exception) else: self.fail("No exception raised, want: {}".format(want_exception)) def test_stdout_noargs(self): tester = WikiqTester(wiki=SAILORMOON, case_name="noargs", in_compression="7z") try: outs = tester.call_wikiq("--stdout", "--fandom-2020", out=False).decode("utf8") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(StringIO(outs)) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_bad_regex(self): tester = WikiqTester(wiki=REGEXTEST, case_name="bad_regex") # sample arguments for checking that bad arguments get terminated / test_regex_arguments bad_arguments_list = [ # label is missing "-RP '\\b\\d+\\b'", # number of reg and number of labels do not match "-RP 'NPO V' -RP THE -RPl testlabel", # cp but rp label "-CP '(Tamil|Li)' -RPl testlabel", # regex is missing "-CPl testlabel", "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'" ] for arguments in bad_arguments_list: try: tester.call_wikiq("--stdout", arguments, out=False) except subprocess.CalledProcessError as exc: # we want to check that the bad arguments were caught and sys.exit is stopping the code print(exc.stderr.decode("utf-8")) else: self.fail("No exception raised, want Exception") def test_good_regex(self): # sample arguments for checking the outcomes of good arguments / test_basic_regex good_arguments_list = [ "-RP '\\b\\d{3}\\b' -RPl threedigits", "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word", "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning", "-CP 'WP:EVADE' -CPl wp_evade" ] for i, arguments in enumerate(good_arguments_list): tester = WikiqTester(wiki=REGEXTEST, case_name="basic", suffix=str(i)) try: tester.call_wikiq(arguments) except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) print(i) def test_capturegroup_regex(self): cap_arguments_list = [ "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P\\b[a-zA-Z]{3}\\b)|(?P\\b\\d+\\b)|(?P\\bcat\\b)' -CPl three", "-CP '(?P\\bTestCaseA\\b)|(?P\\bTestCaseB\\b)|(?P\\bTestCaseC\\b)|(?P\\bTestCaseD\\b)' -CPl testcase -RP '(?Pnpov|NPOV)|(?Pneutral point of view)' -RPl npov" ] for i, arguments in enumerate(cap_arguments_list): tester = WikiqTester(wiki=REGEXTEST, case_name="capturegroup", suffix=str(i)) try: tester.call_wikiq(arguments) except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) test = pd.read_table(tester.output) baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_parquet(self): tester = WikiqTester(IKWIKI, "noargs", out_format="parquet") try: tester.call_wikiq() except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) # as a test let's make sure that we get equal data frames test: DataFrame = pd.read_parquet(tester.output) # test = test.drop(['reverteds'], axis=1) baseline: DataFrame = pd.read_table(tester.baseline_file) # Pandas does not read timestamps as the desired datetime type. baseline['date_time'] = pd.to_datetime(baseline['date_time']) # Split strings to the arrays of reverted IDs so they can be compared. baseline['revert'] = baseline['revert'].replace(np.nan, None) baseline['reverteds'] = baseline['reverteds'].replace(np.nan, None) # baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']] baseline['sha1'] = baseline['sha1'].replace(np.nan, None) baseline['editor'] = baseline['editor'].replace(np.nan, None) baseline['anon'] = baseline['anon'].replace(np.nan, None) for index, row in baseline.iterrows(): if row['editorid'] is None or test['editorid'][index] is None: if row['editorid'] != test['editorid'][index]: print(row['revid'], ":", row['editorid'], "!=", test['editorid'][index]) for col in baseline.columns: try: assert_series_equal(test[col], baseline[col], check_like=True, check_dtype=False) except ValueError as exc: print(f"Error comparing column {col}") self.fail(exc) # assert_frame_equal(test, baseline, check_like=True, check_dtype=False) if __name__ == '__main__': unittest.main()