diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 08fdfc4..eae8020 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -8,12 +8,12 @@ from io import StringIO import tracemalloc from typing import Final - # Make references to files and wikiq relative to this file, not to the current working directory. TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__)) WIKIQ: Final[str] = os.path.join(os.path.dirname(TEST_DIR), "wikiq") TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output") -BASELINE_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output") +BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output") + def setup(): tracemalloc.start() @@ -28,12 +28,36 @@ def setup(): for f in os.listdir(TEST_OUTPUT_DIR): os.remove(os.path.join(TEST_OUTPUT_DIR, f)) + +# Always run setup, even if this is executed via "python -m unittest" rather +# than as __main__. setup() -def call_wikiq(*args: str): - call = ' '.join([WIKIQ, *args]) + +def call_wikiq(input_file: str, *args: str, out: bool = True): + if out: + call = ' '.join([WIKIQ, input_file, "-o", TEST_OUTPUT_DIR, *args]) + else: + call = ' '.join([WIKIQ, input_file, *args]) + print(call) - subprocess.check_output(call, stderr=subprocess.PIPE, shell=True) + return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True) + + +def tmp_test_file(name: str) -> (str, str): + """ + Removes any existing test file with the given name and returns the path to + the that file. + :param name: The test case name. Should be unique to each test case. + :return: The path to the test file. + """ + baseline_file = os.path.join(BASELINE_DIR, name) + test_file = os.path.join(TEST_OUTPUT_DIR, name) + if os.path.exists(test_file): + os.remove(test_file) + + return baseline_file, test_file + # with / without pwr DONE # with / without url encode DONE @@ -48,27 +72,22 @@ def call_wikiq(*args: str): class Test_Wikipedia(unittest.TestCase): def setUp(self): wiki = 'ikwiki-20180301-pages-meta-history' - self.wikiq_out_name = wiki + ".tsv" + self.wikiq_out_name = "{0}.tsv".format(wiki) self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name) infile = "{0}.xml.bz2".format(wiki) - input_dir = os.path.join(TEST_DIR, "dumps") self.input_file = os.path.join(TEST_DIR, input_dir, infile) def test_WP_url_encode(self): - test_filename = "url-encode_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("url-encode_" + self.wikiq_out_name) try: - call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR, "--url-encode") + call_wikiq(self.input_file, "--url-encode") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) @@ -76,20 +95,14 @@ class Test_Wikipedia(unittest.TestCase): assert_frame_equal(test, baseline, check_like=True) def test_WP_namespaces(self): - print(os.path.abspath('.')) - test_filename = "namespaces_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("namespaces_" + self.wikiq_out_name) try: - call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR, - "-n 0", "-n 1") + call_wikiq(self.input_file, "-n 0", "-n 1") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) @@ -99,20 +112,14 @@ class Test_Wikipedia(unittest.TestCase): assert_frame_equal(test, baseline, check_like=True) def test_WP_revert_radius(self): - print(os.path.abspath('.')) - test_filename = "revert_radius_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("revert_radius_" + self.wikiq_out_name) try: - call_wikiq(self.input_file, "-o", TEST_OUTPUT_DIR, - "-n 0", "-n 1", "-rr 1") + call_wikiq(self.input_file, "-n 0", "-n 1", "-rr 1") except subprocess.CalledProcessError as exc: self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) # as a test let's make sure that we get equal data frames test = pd.read_table(test_file) @@ -125,115 +132,80 @@ class Test_Wikipedia(unittest.TestCase): class Test_Basic(unittest.TestCase): def setUp(self): - self.wiki = 'sailormoon' - self.wikiq_out_name = self.wiki + ".tsv" + wiki = 'sailormoon' + self.wikiq_out_name = wiki + ".tsv" self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name) - self.infile = "{0}.xml.7z".format(self.wiki) - self.base_call = WIKIQ + " {0} -o {1}" - self.input_dir = os.path.join(TEST_DIR, "dumps") - self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile) + infile = "{0}.xml.7z".format(wiki) + input_dir = os.path.join(TEST_DIR, "dumps") + self.input_file = os.path.join(TEST_DIR, input_dir, infile) def test_noargs(self): + baseline_file, test_file = tmp_test_file("noargs_" + self.wikiq_out_name) - test_filename = "noargs_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file) + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) - test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_collapse_user(self): - test_filename = "collapse-user_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("collapse-user_" + self.wikiq_out_name) - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " --collapse-user" - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, "--collapse-user") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr_segment(self): - test_filename = "persistence_segment_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("persistence_segment_" + self.wikiq_out_name) - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " --persistence segment" - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, "--persistence segment") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) - test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr_legacy(self): - test_filename = "persistence_legacy_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("persistence_legacy_" + self.wikiq_out_name) - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " --persistence legacy" - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, "--persistence legacy") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) - test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) def test_pwr(self): - test_filename = "persistence_" + self.wikiq_out_name - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file("persistence_" + self.wikiq_out_name) - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " --persistence" - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, "--persistence") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) - test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) @@ -241,20 +213,14 @@ class Test_Basic(unittest.TestCase): assert_frame_equal(test, baseline, check_like=True) def test_url_encode(self): - test_filename = "url-encode_" + self.wikiq_out_name + baseline_file, test_file = tmp_test_file("url-encode_" + self.wikiq_out_name) - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) - - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " --url-encode" - with subprocess.Popen(call, stdout=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, "--url-encode") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) test = pd.read_table(test_file) baseline = pd.read_table(baseline_file) @@ -264,46 +230,47 @@ class Test_Basic(unittest.TestCase): class Test_Malformed(unittest.TestCase): def setUp(self): - self.wiki = 'twinpeaks' - self.wikiq_out_name = self.wiki + ".tsv" - self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name) + wiki = 'twinpeaks' - self.infile = "{0}.xml.7z".format(self.wiki) - self.base_call = WIKIQ + " {0} -o {1}" - self.input_dir = os.path.join(TEST_DIR, "dumps") - self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile) + infile = "{0}.xml.7z".format(wiki) + input_dir = os.path.join(TEST_DIR, "dumps") + self.input_file = os.path.join(TEST_DIR, input_dir, infile) def test_malformed_noargs(self): - call = self.base_call.format(self.input_file, TEST_OUTPUT_DIR) - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertNotEqual(proc.returncode, 0) - outs, errs = proc.communicate() - errlines = str(errs).split("\\n") - self.assertEqual(errlines[-2], 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0') + want_exception = 'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0' + + try: + call_wikiq(self.input_file) + except subprocess.CalledProcessError as exc: + errlines = exc.stderr.decode("utf8").splitlines() + self.assertEqual(errlines[-1], want_exception) + else: + self.fail("No exception raised, want: {}".format(want_exception)) class Test_Stdout(unittest.TestCase): def setUp(self): - self.wiki = 'sailormoon' - self.wikiq_out_name = self.wiki + ".tsv" + wiki = 'sailormoon' + self.wikiq_out_name = wiki + ".tsv" + self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name) - self.infile = "{0}.xml.7z".format(self.wiki) + infile = "{0}.xml.7z".format(wiki) self.base_call = WIKIQ + " {0} --stdout" - self.input_dir = os.path.join(TEST_DIR, "dumps") - self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile) + input_dir = os.path.join(TEST_DIR, "dumps") + self.input_file = os.path.join(TEST_DIR, input_dir, infile) def test_noargs(self): - call = self.base_call.format(self.input_file) - print(call) - proc = subprocess.run(call, stdout=subprocess.PIPE, shell=True) - outs = proc.stdout.decode("utf8") + baseline_file, test_file = tmp_test_file("noargs_" + self.wikiq_out_name) + + outs = "" + try: + outs = call_wikiq(self.input_file, "--stdout", out=False).decode("utf8") + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) + + copyfile(self.call_output, test_file) - test_file = "noargs_" + self.wikiq_out_name - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_file) - print(baseline_file) test = pd.read_table(StringIO(outs)) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) @@ -311,17 +278,14 @@ class Test_Stdout(unittest.TestCase): class Test_Regex(unittest.TestCase): def setUp(self): - self.wiki = 'regextest' - self.wikiq_out_name = self.wiki + '.tsv' - self.infile = "{0}.xml.bz2".format(self.wiki) + wiki = 'regextest' + self.wikiq_out_name = wiki + '.tsv' + infile = "{0}.xml.bz2".format(wiki) - self.input_dir = os.path.join(TEST_DIR, "dumps") - self.input_file = os.path.join(TEST_DIR, self.input_dir, self.infile) + input_dir = os.path.join(TEST_DIR, "dumps") + self.input_file = os.path.join(TEST_DIR, input_dir, infile) self.call_output = os.path.join(TEST_OUTPUT_DIR, self.wikiq_out_name) - # we have two base calls, one for checking arguments and the other for checking outputs - self.base_call = WIKIQ + " {0}" - self.base_call_outs = WIKIQ + " {0} -o {1}" # sample arguments for checking that bad arguments get terminated / test_regex_arguments self.bad_arguments_list = [ @@ -351,37 +315,28 @@ class Test_Regex(unittest.TestCase): def test_regex_arguments(self): for arguments in self.bad_arguments_list: - call = self.base_call.format(self.input_file) - call = call + " --stdout " + arguments - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - stdout, stderr = proc.communicate() + try: + call_wikiq(self.input_file, "--stdout", arguments, out=False) + except subprocess.CalledProcessError as exc: # we want to check that the bad arguments were caught and sys.exit is stopping the code - print(stderr.decode("utf-8")) - - self.assertNotEqual(proc.returncode, 0) + print(exc.stderr.decode("utf-8")) + else: + self.fail("No exception raised, want Exception") def test_basic_regex(self): for i, arguments in enumerate(self.good_arguments_list): - test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) - # print(test_filename) - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file(test_filename) - call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " " + arguments - print(call) - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, arguments) + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) test = pd.read_table(test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True) print(i) @@ -389,24 +344,17 @@ class Test_Regex(unittest.TestCase): def test_capturegroup_regex(self): for i, arguments in enumerate(self.cap_arguments_list): test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i)) - print(test_filename) - test_file = os.path.join(TEST_OUTPUT_DIR, test_filename) - if os.path.exists(test_file): - os.remove(test_file) + baseline_file, test_file = tmp_test_file(test_filename) - call = self.base_call_outs.format(self.input_file, TEST_OUTPUT_DIR) - call = call + " " + arguments - print(call) - - with subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as proc: - proc.wait() - self.assertEqual(proc.returncode, 0) + try: + call_wikiq(self.input_file, arguments) + except subprocess.CalledProcessError as exc: + self.fail(exc.stderr.decode("utf8")) copyfile(self.call_output, test_file) test = pd.read_table(test_file) - baseline_file = os.path.join(BASELINE_OUTPUT_DIR, test_filename) baseline = pd.read_table(baseline_file) assert_frame_equal(test, baseline, check_like=True)