From 6b4f3939a53affbc9f9418474350ecdb1a256c81 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Wed, 10 Dec 2025 21:07:52 -0800 Subject: [PATCH] more work on resuming. --- src/wikiq/resume.py | 15 +- test/Wikiq_Unit_Test.py | 548 ++------------------------------------- test/conftest.py | 5 + test/test_resume.py | 401 ++++++++++++++++++++++++++++ test/wikiq_test_utils.py | 75 ++++++ 5 files changed, 512 insertions(+), 532 deletions(-) create mode 100644 test/conftest.py create mode 100644 test/test_resume.py create mode 100644 test/wikiq_test_utils.py diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index ae51555..2c93a86 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -76,14 +76,17 @@ def cleanup_interrupted_resume(output_file, partition_namespaces): # Temp file was invalid, just remove it os.remove(temp_output_file) elif merged == "temp_only": - # Original was corrupted, use temp as new base - os.remove(output_file) + # Original was corrupted or missing, use temp as new base + if os.path.exists(output_file): + os.remove(output_file) os.rename(temp_output_file, output_file) - print("Recovered from temp file (original was corrupted).", file=sys.stderr) + print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr) elif merged == "both_invalid": - # Both files corrupted, remove both and start fresh - os.remove(output_file) - os.remove(temp_output_file) + # Both files corrupted or missing, remove both and start fresh + if os.path.exists(output_file): + os.remove(output_file) + if os.path.exists(temp_output_file): + os.remove(temp_output_file) # Also remove stale checkpoint file checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if os.path.exists(checkpoint_path): diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 8258f87..1e272f0 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,112 +1,48 @@ import os -import shutil import subprocess import sys import tracemalloc from io import StringIO -from typing import Final, Union -import pytest + import numpy as np import pandas as pd import pyarrow as pa +import pytest from pandas import DataFrame from pandas.testing import assert_frame_equal, assert_series_equal -# Make references to files and wikiq relative to this file, not to the current working directory. -TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__)) -WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py") -TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output") -BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output") - -IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history" -SAILORMOON: Final[str] = "sailormoon" -TWINPEAKS: Final[str] = "twinpeaks" -REGEXTEST: Final[str] = "regextest" +from wikiq_test_utils import ( + BASELINE_DIR, + IKWIKI, + REGEXTEST, + SAILORMOON, + TEST_DIR, + TEST_OUTPUT_DIR, + TWINPEAKS, + WIKIQ, + WikiqTester, +) def setup(): tracemalloc.start() - # Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup. if not os.path.exists(TEST_OUTPUT_DIR): os.mkdir(TEST_OUTPUT_DIR) -# Always run setup, even if this is executed via "python -m unittest" rather -# than as __main__. setup() -class WikiqTester: - def __init__( - self, - wiki: str, - case_name: str, - suffix: Union[str, None] = None, - in_compression: str = "bz2", - baseline_format: str = "tsv", - out_format: str = "tsv", - ): - self.input_file = os.path.join( - TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression) - ) - - basename = "{0}_{1}".format(case_name, wiki) - if suffix: - basename = "{0}_{1}".format(basename, suffix) - - self.output = os.path.join( - TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format) - ) - - if os.path.exists(self.output): - if os.path.isfile(self.output): - os.remove(self.output) - else: - shutil.rmtree(self.output) - - if out_format == "parquet": - os.makedirs(self.output, exist_ok=True) - - if suffix is None: - self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format) - self.wikiq_out_name = "{0}.{1}".format(wiki, out_format) - else: - self.wikiq_baseline_name = "{0}_{1}.{2}".format( - wiki, suffix, baseline_format - ) - self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format) - - # If case_name is unset, there are no relevant baseline or test files. - if case_name is not None: - self.baseline_file = os.path.join( - BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name) - ) - - def call_wikiq(self, *args: str, out: bool = True): - """ - Calls wikiq with the passed arguments on the input file relevant to the test. - :param args: The command line arguments to pass to wikiq. - :param out: Whether to pass an output argument to wikiq. - :return: The output of the wikiq call. - """ - if out: - call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args]) - else: - call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args]) - - print(call) - return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True) - - # with / without pwr DONE - # with / without url encode DONE - # with / without collapse user DONE - # with output to stdout DONE - # note that the persistence radius is 7 by default - # reading various file formats including - # 7z, gz, bz2, xml DONE - # wikia and wikipedia data DONE - # malformed xmls DONE +# with / without pwr DONE +# with / without url encode DONE +# with / without collapse user DONE +# with output to stdout DONE +# note that the persistence radius is 7 by default +# reading various file formats including +# 7z, gz, bz2, xml DONE +# wikia and wikipedia data DONE +# malformed xmls DONE def test_WP_noargs(): tester = WikiqTester(IKWIKI, "noargs") @@ -442,207 +378,6 @@ def test_parquet(): # assert_frame_equal(test, baseline, check_like=True, check_dtype=False) -def test_resume(): - """Test that --resume properly resumes processing from the last written revid.""" - import pyarrow.parquet as pq - - # First, create a complete baseline output - tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet") - - try: - tester_full.call_wikiq("--fandom-2020") - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the full output - full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") - full_table = pq.read_table(full_output_path) - - # Get the middle revid to use as the resume point - middle_idx = len(full_table) // 2 - resume_revid = full_table.column("revid")[middle_idx].as_py() - - print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}") - - # Create a partial output by copying row groups to preserve the exact schema - tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet") - partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") - - # Create partial output by filtering the table and writing with the same schema - partial_table = full_table.slice(0, middle_idx + 1) - pq.write_table(partial_table, partial_output_path) - - # Now resume from the partial output - try: - tester_partial.call_wikiq("--fandom-2020", "--resume") - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the resumed output - resumed_table = pq.read_table(partial_output_path) - - # The resumed output should match the full output - # Convert to dataframes for comparison, sorting by revid - resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) - full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) - - # Compare the dataframes - assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) - - print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") - -def test_resume_with_diff(): - """Test that --resume works correctly with diff computation.""" - import pyarrow.parquet as pq - - # First, create a complete baseline output with diff - tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet") - - try: - tester_full.call_wikiq("--diff", "--fandom-2020") - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the full output - full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") - full_table = pq.read_table(full_output_path) - - # Get a revid about 1/3 through to use as the resume point - resume_idx = len(full_table) // 3 - resume_revid = full_table.column("revid")[resume_idx].as_py() - - print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}") - - # Create a partial output by filtering the table to preserve the exact schema - tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet") - partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") - - # Create partial output by slicing the table - partial_table = full_table.slice(0, resume_idx + 1) - pq.write_table(partial_table, partial_output_path) - - # Now resume from the partial output - try: - tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume") - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the resumed output - resumed_table = pq.read_table(partial_output_path) - - # Convert to dataframes for comparison, sorting by revid - resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) - full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) - - # Compare the dataframes - assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) - - print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") - -def test_resume_with_partition_namespaces(): - """Test that --resume works correctly with --partition-namespaces. - - Interrupts wikiq partway through processing, then resumes and verifies - the result matches an uninterrupted run. Uses --flush-per-batch to ensure - data is written to disk after each batch, making interruption deterministic. - """ - import signal - import time - import pyarrow.dataset as ds - - # Use separate subdirectories for full and partial runs to isolate them - full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full") - partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial") - input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") - - # Clean up any existing output directories from previous runs - for output_dir in [full_dir, partial_dir]: - if os.path.exists(output_dir): - shutil.rmtree(output_dir) - os.makedirs(output_dir) - - # Paths within each isolated directory - full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet") - partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet") - - # Run wikiq fully to get baseline output - cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces" - try: - subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True) - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read full output - full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive") - full_df = full_dataset.to_table().to_pandas() - total_rows = len(full_df) - print(f"Full run produced {total_rows} rows") - - # Start wikiq for the interrupted run (use list args so SIGTERM goes to Python) - batch_size = 10 - cmd_partial = [ - sys.executable, WIKIQ, input_file, - "-o", partial_output, - "--batch-size", str(batch_size), - "--partition-namespaces" - ] - print(f"Starting: {' '.join(cmd_partial)}") - - proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE) - - # Wait a short time to allow some processing - interrupt_delay = 5 # seconds - enough for some pages but not all - time.sleep(interrupt_delay) - - if proc.poll() is not None: - pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt") - - # Simulate SLURM job termination: send SIGUSR1 first (early warning), - # then wait for graceful shutdown, then SIGTERM if still running - print(f"Sending SIGUSR1 after {interrupt_delay}s") - proc.send_signal(signal.SIGUSR1) - - # Wait for graceful shutdown - try: - proc.wait(timeout=5) - print("Process exited gracefully after SIGUSR1") - except subprocess.TimeoutExpired: - # Process didn't exit, send SIGTERM - print("Sending SIGTERM after SIGUSR1 timeout") - proc.send_signal(signal.SIGTERM) - proc.wait(timeout=30) - - # Read interrupted output - interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") - interrupted_rows = interrupted_dataset.count_rows() - print(f"Interrupted run wrote {interrupted_rows} rows") - - assert interrupted_rows < total_rows, \ - f"Process wrote all {interrupted_rows} rows before being killed" - - # Resume - cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume" - try: - subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True) - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read resumed output - resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") - resumed_df = resumed_dataset.to_table().to_pandas() - - # Check revid sets match (the important invariant) - full_revids = set(full_df['revid']) - resumed_revids = set(resumed_df['revid']) - missing_revids = full_revids - resumed_revids - extra_revids = resumed_revids - full_revids - assert missing_revids == set() and extra_revids == set(), \ - f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}" - assert len(resumed_df) == len(full_df), \ - f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}" - - print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}") - def test_external_links_only(): """Test that --external-links extracts external links correctly.""" @@ -977,242 +712,3 @@ def test_headings(): print(f"Headings test passed! {len(test)} rows processed") -def test_resume_file_not_found(): - """Test that --resume exits with error when output file doesn't exist.""" - tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet") - - # Ensure the output file does not exist - expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet") - if os.path.exists(expected_output): - os.remove(expected_output) - - try: - tester.call_wikiq("--resume") - pytest.fail("Expected error when --resume is used but output file doesn't exist") - except subprocess.CalledProcessError as exc: - stderr = exc.stderr.decode("utf8") - assert "Error: --resume specified but output file not found" in stderr, \ - f"Expected error message about missing output file, got: {stderr}" - - print("Resume file not found test passed!") - - -def test_resume_simple(): - """Test that --resume works without --fandom-2020 and --partition-namespaces.""" - import pyarrow.parquet as pq - - # First, create a complete baseline output (no fandom-2020, no partition-namespaces) - tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet") - - try: - tester_full.call_wikiq() - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the full output - full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") - full_table = pq.read_table(full_output_path) - - # Get a revid about 1/3 through to use as the resume point - resume_idx = len(full_table) // 3 - resume_revid = full_table.column("revid")[resume_idx].as_py() - - print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}") - - # Create a partial output by slicing the table - tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet") - partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") - - partial_table = full_table.slice(0, resume_idx + 1) - pq.write_table(partial_table, partial_output_path) - - # Now resume from the partial output - try: - tester_partial.call_wikiq("--resume") - except subprocess.CalledProcessError as exc: - pytest.fail(exc.stderr.decode("utf8")) - - # Read the resumed output - resumed_table = pq.read_table(partial_output_path) - - # Convert to dataframes for comparison, sorting by revid - resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) - full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) - - # Compare the dataframes - assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) - - print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") - - -def test_resume_merge_with_invalid_temp_file(): - """Test that resume handles invalid/empty temp files gracefully. - - This can happen when a namespace has no records after the resume point, - resulting in a temp file that was created but never written to. - """ - import pyarrow.parquet as pq - from wikiq.resume import merge_parquet_files, merge_partitioned_namespaces - import tempfile - - # Create a valid parquet file - with tempfile.TemporaryDirectory() as tmpdir: - original_path = os.path.join(tmpdir, "original.parquet") - temp_path = os.path.join(tmpdir, "temp.parquet") - merged_path = os.path.join(tmpdir, "merged.parquet") - - # Create a valid original file - import pyarrow as pa - table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]}) - pq.write_table(table, original_path) - - # Create an invalid temp file (empty file, not valid parquet) - with open(temp_path, 'w') as f: - f.write("") - - # merge_parquet_files should return "original_only" for invalid temp file - result = merge_parquet_files(original_path, temp_path, merged_path) - assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}" - - # Original file should still exist and be unchanged - assert os.path.exists(original_path), "Original file should still exist" - original_table = pq.read_table(original_path) - assert len(original_table) == 3, "Original file should be unchanged" - - # Merged file should not have been created - assert not os.path.exists(merged_path), "Merged file should not be created" - - print("Resume merge with invalid temp file test passed!") - - -def test_resume_merge_with_corrupted_original(): - """Test that resume recovers from a corrupted original file if temp is valid. - - This can happen if the original file was being written when the process - was killed, leaving it in a corrupted state. - """ - import pyarrow.parquet as pq - from wikiq.resume import merge_parquet_files - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - original_path = os.path.join(tmpdir, "original.parquet") - temp_path = os.path.join(tmpdir, "temp.parquet") - merged_path = os.path.join(tmpdir, "merged.parquet") - - # Create a corrupted original file (not valid parquet) - with open(original_path, 'w') as f: - f.write("corrupted data") - - # Create a valid temp file - import pyarrow as pa - table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]}) - pq.write_table(table, temp_path) - - # merge_parquet_files should return "temp_only" for corrupted original - result = merge_parquet_files(original_path, temp_path, merged_path) - assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}" - - # Merged file should not have been created (caller handles renaming temp) - assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case" - - print("Resume merge with corrupted original test passed!") - - -def test_resume_merge_both_invalid(): - """Test that resume handles both files being invalid.""" - from wikiq.resume import merge_parquet_files - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - original_path = os.path.join(tmpdir, "original.parquet") - temp_path = os.path.join(tmpdir, "temp.parquet") - merged_path = os.path.join(tmpdir, "merged.parquet") - - # Create corrupted original file - with open(original_path, 'w') as f: - f.write("corrupted original") - - # Create corrupted temp file - with open(temp_path, 'w') as f: - f.write("corrupted temp") - - # merge_parquet_files should return "both_invalid" - result = merge_parquet_files(original_path, temp_path, merged_path) - assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}" - - print("Resume merge with both invalid test passed!") - - -def test_cleanup_interrupted_resume_both_corrupted(): - """Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted.""" - from wikiq.resume import cleanup_interrupted_resume, get_checkpoint_path - import tempfile - import json - - with tempfile.TemporaryDirectory() as tmpdir: - output_file = os.path.join(tmpdir, "output.parquet") - temp_file = output_file + ".resume_temp" - checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False) - - # Create corrupted original file - with open(output_file, 'w') as f: - f.write("corrupted original") - - # Create corrupted temp file - with open(temp_file, 'w') as f: - f.write("corrupted temp") - - # Create a checkpoint file (should be deleted) - with open(checkpoint_path, 'w') as f: - json.dump({"pageid": 100, "revid": 200}, f) - - # cleanup_interrupted_resume should return "start_fresh" - result = cleanup_interrupted_resume(output_file, partition_namespaces=False) - assert result == "start_fresh", f"Expected 'start_fresh', got {result}" - - # All files should be deleted - assert not os.path.exists(output_file), "Corrupted original should be deleted" - assert not os.path.exists(temp_file), "Corrupted temp should be deleted" - assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted" - - print("Cleanup interrupted resume with both corrupted test passed!") - - -def test_cleanup_interrupted_resume_original_corrupted_temp_valid(): - """Test that cleanup recovers from temp when original is corrupted.""" - from wikiq.resume import cleanup_interrupted_resume, get_resume_point - import pyarrow as pa - import pyarrow.parquet as pq - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - output_file = os.path.join(tmpdir, "output.parquet") - temp_file = output_file + ".resume_temp" - - # Create corrupted original file - with open(output_file, 'w') as f: - f.write("corrupted original") - - # Create valid temp file with some data - table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]}) - pq.write_table(table, temp_file) - - # cleanup_interrupted_resume should recover from temp (not return "start_fresh") - result = cleanup_interrupted_resume(output_file, partition_namespaces=False) - assert result is None, f"Expected None (normal recovery), got {result}" - - # Original should now contain the temp file's data - assert os.path.exists(output_file), "Output file should exist after recovery" - assert not os.path.exists(temp_file), "Temp file should be renamed to output" - - # Verify the recovered data - recovered_table = pq.read_table(output_file) - assert len(recovered_table) == 3, "Recovered file should have 3 rows" - - # get_resume_point should find the resume point from recovered file - resume_point = get_resume_point(output_file, partition_namespaces=False) - assert resume_point is not None, "Should find resume point from recovered file" - assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}" - - print("Cleanup with original corrupted, temp valid test passed!") diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..60db483 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,5 @@ +import os +import sys + +# Add the test directory to Python path so test utilities can be imported +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) diff --git a/test/test_resume.py b/test/test_resume.py new file mode 100644 index 0000000..6bc96c7 --- /dev/null +++ b/test/test_resume.py @@ -0,0 +1,401 @@ +import json +import os +import shutil +import signal +import subprocess +import sys +import tempfile +import time + +import pyarrow as pa +import pyarrow.dataset as ds +import pyarrow.parquet as pq +import pytest +from pandas.testing import assert_frame_equal + +from wikiq.resume import ( + cleanup_interrupted_resume, + get_checkpoint_path, + get_resume_point, + merge_parquet_files, +) +from wikiq_test_utils import ( + SAILORMOON, + TEST_DIR, + TEST_OUTPUT_DIR, + WIKIQ, + WikiqTester, +) + + +def test_resume(): + """Test that --resume properly resumes processing from the last written revid.""" + tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet") + + try: + tester_full.call_wikiq("--fandom-2020") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") + full_table = pq.read_table(full_output_path) + + middle_idx = len(full_table) // 2 + resume_revid = full_table.column("revid")[middle_idx].as_py() + + print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}") + + tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet") + partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") + + partial_table = full_table.slice(0, middle_idx + 1) + pq.write_table(partial_table, partial_output_path) + + try: + tester_partial.call_wikiq("--fandom-2020", "--resume") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + resumed_table = pq.read_table(partial_output_path) + + resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) + full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) + + assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) + + print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") + + +def test_resume_with_diff(): + """Test that --resume works correctly with diff computation.""" + tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet") + + try: + tester_full.call_wikiq("--diff", "--fandom-2020") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") + full_table = pq.read_table(full_output_path) + + resume_idx = len(full_table) // 3 + resume_revid = full_table.column("revid")[resume_idx].as_py() + + print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}") + + tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet") + partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") + + partial_table = full_table.slice(0, resume_idx + 1) + pq.write_table(partial_table, partial_output_path) + + try: + tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + resumed_table = pq.read_table(partial_output_path) + + resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) + full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) + + assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) + + print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") + + +def test_resume_with_partition_namespaces(): + """Test that --resume works correctly with --partition-namespaces. + + Interrupts wikiq partway through processing, then resumes and verifies + the result matches an uninterrupted run. Uses --flush-per-batch to ensure + data is written to disk after each batch, making interruption deterministic. + """ + full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full") + partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial") + input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") + + for output_dir in [full_dir, partial_dir]: + if os.path.exists(output_dir): + shutil.rmtree(output_dir) + os.makedirs(output_dir) + + full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet") + partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet") + + cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces" + try: + subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True) + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive") + full_df = full_dataset.to_table().to_pandas() + total_rows = len(full_df) + print(f"Full run produced {total_rows} rows") + + batch_size = 10 + cmd_partial = [ + sys.executable, WIKIQ, input_file, + "-o", partial_output, + "--batch-size", str(batch_size), + "--partition-namespaces" + ] + print(f"Starting: {' '.join(cmd_partial)}") + + proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE) + + interrupt_delay = 5 + time.sleep(interrupt_delay) + + if proc.poll() is not None: + pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt") + + print(f"Sending SIGUSR1 after {interrupt_delay}s") + proc.send_signal(signal.SIGUSR1) + + try: + proc.wait(timeout=5) + print("Process exited gracefully after SIGUSR1") + except subprocess.TimeoutExpired: + print("Sending SIGTERM after SIGUSR1 timeout") + proc.send_signal(signal.SIGTERM) + proc.wait(timeout=30) + + interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") + interrupted_rows = interrupted_dataset.count_rows() + print(f"Interrupted run wrote {interrupted_rows} rows") + + assert interrupted_rows < total_rows, \ + f"Process wrote all {interrupted_rows} rows before being killed" + + cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume" + try: + subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True) + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") + resumed_df = resumed_dataset.to_table().to_pandas() + + full_revids = set(full_df['revid']) + resumed_revids = set(resumed_df['revid']) + missing_revids = full_revids - resumed_revids + extra_revids = resumed_revids - full_revids + assert missing_revids == set() and extra_revids == set(), \ + f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}" + assert len(resumed_df) == len(full_df), \ + f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}" + + print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}") + + +def test_resume_file_not_found(): + """Test that --resume exits with error when output file doesn't exist.""" + tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet") + + expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet") + if os.path.exists(expected_output): + os.remove(expected_output) + + try: + tester.call_wikiq("--resume") + pytest.fail("Expected error when --resume is used but output file doesn't exist") + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode("utf8") + assert "Error: --resume specified but output file not found" in stderr, \ + f"Expected error message about missing output file, got: {stderr}" + + print("Resume file not found test passed!") + + +def test_resume_simple(): + """Test that --resume works without --fandom-2020 and --partition-namespaces.""" + tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet") + + try: + tester_full.call_wikiq() + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") + full_table = pq.read_table(full_output_path) + + resume_idx = len(full_table) // 3 + resume_revid = full_table.column("revid")[resume_idx].as_py() + + print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}") + + tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet") + partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") + + partial_table = full_table.slice(0, resume_idx + 1) + pq.write_table(partial_table, partial_output_path) + + try: + tester_partial.call_wikiq("--resume") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + resumed_table = pq.read_table(partial_output_path) + + resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) + full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) + + assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) + + print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") + + +def test_resume_merge_with_invalid_temp_file(): + """Test that resume handles invalid/empty temp files gracefully. + + This can happen when a namespace has no records after the resume point, + resulting in a temp file that was created but never written to. + """ + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]}) + pq.write_table(table, original_path) + + with open(temp_path, 'w') as f: + f.write("") + + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}" + + assert os.path.exists(original_path), "Original file should still exist" + original_table = pq.read_table(original_path) + assert len(original_table) == 3, "Original file should be unchanged" + + assert not os.path.exists(merged_path), "Merged file should not be created" + + print("Resume merge with invalid temp file test passed!") + + +def test_resume_merge_with_corrupted_original(): + """Test that resume recovers from a corrupted original file if temp is valid. + + This can happen if the original file was being written when the process + was killed, leaving it in a corrupted state. + """ + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + with open(original_path, 'w') as f: + f.write("corrupted data") + + table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]}) + pq.write_table(table, temp_path) + + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}" + + assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case" + + print("Resume merge with corrupted original test passed!") + + +def test_resume_merge_both_invalid(): + """Test that resume handles both files being invalid.""" + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + with open(original_path, 'w') as f: + f.write("corrupted original") + + with open(temp_path, 'w') as f: + f.write("corrupted temp") + + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}" + + print("Resume merge with both invalid test passed!") + + +def test_cleanup_interrupted_resume_both_corrupted(): + """Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, "output.parquet") + temp_file = output_file + ".resume_temp" + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False) + + with open(output_file, 'w') as f: + f.write("corrupted original") + + with open(temp_file, 'w') as f: + f.write("corrupted temp") + + with open(checkpoint_path, 'w') as f: + json.dump({"pageid": 100, "revid": 200}, f) + + result = cleanup_interrupted_resume(output_file, partition_namespaces=False) + assert result == "start_fresh", f"Expected 'start_fresh', got {result}" + + assert not os.path.exists(output_file), "Corrupted original should be deleted" + assert not os.path.exists(temp_file), "Corrupted temp should be deleted" + assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted" + + print("Cleanup interrupted resume with both corrupted test passed!") + + +def test_cleanup_interrupted_resume_original_corrupted_temp_valid(): + """Test that cleanup recovers from temp when original is corrupted.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, "output.parquet") + temp_file = output_file + ".resume_temp" + + with open(output_file, 'w') as f: + f.write("corrupted original") + + table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]}) + pq.write_table(table, temp_file) + + result = cleanup_interrupted_resume(output_file, partition_namespaces=False) + assert result is None, f"Expected None (normal recovery), got {result}" + + assert os.path.exists(output_file), "Output file should exist after recovery" + assert not os.path.exists(temp_file), "Temp file should be renamed to output" + + recovered_table = pq.read_table(output_file) + assert len(recovered_table) == 3, "Recovered file should have 3 rows" + + resume_point = get_resume_point(output_file, partition_namespaces=False) + assert resume_point is not None, "Should find resume point from recovered file" + assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}" + + print("Cleanup with original corrupted, temp valid test passed!") + + +def test_cleanup_original_missing_temp_valid_no_checkpoint(): + """Test recovery when original is missing, temp is valid, and no checkpoint exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, "output.parquet") + temp_file = output_file + ".resume_temp" + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False) + + assert not os.path.exists(output_file) + + table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]}) + pq.write_table(table, temp_file) + + assert not os.path.exists(checkpoint_path) + + result = cleanup_interrupted_resume(output_file, partition_namespaces=False) + assert result is None, f"Expected None (normal recovery), got {result}" + + assert os.path.exists(output_file), "Output file should exist after recovery" + assert not os.path.exists(temp_file), "Temp file should be renamed to output" + + resume_point = get_resume_point(output_file, partition_namespaces=False) + assert resume_point is not None, "Should find resume point from recovered file" + assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}" + + print("Original missing, temp valid, no checkpoint test passed!") diff --git a/test/wikiq_test_utils.py b/test/wikiq_test_utils.py new file mode 100644 index 0000000..ca08707 --- /dev/null +++ b/test/wikiq_test_utils.py @@ -0,0 +1,75 @@ +import os +import shutil +import subprocess +from typing import Final, Union + +TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__)) +WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py") +TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output") +BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output") + +IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history" +SAILORMOON: Final[str] = "sailormoon" +TWINPEAKS: Final[str] = "twinpeaks" +REGEXTEST: Final[str] = "regextest" + + +class WikiqTester: + def __init__( + self, + wiki: str, + case_name: str, + suffix: Union[str, None] = None, + in_compression: str = "bz2", + baseline_format: str = "tsv", + out_format: str = "tsv", + ): + self.input_file = os.path.join( + TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression) + ) + + basename = "{0}_{1}".format(case_name, wiki) + if suffix: + basename = "{0}_{1}".format(basename, suffix) + + self.output = os.path.join( + TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format) + ) + + if os.path.exists(self.output): + if os.path.isfile(self.output): + os.remove(self.output) + else: + shutil.rmtree(self.output) + + if out_format == "parquet": + os.makedirs(self.output, exist_ok=True) + + if suffix is None: + self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format) + self.wikiq_out_name = "{0}.{1}".format(wiki, out_format) + else: + self.wikiq_baseline_name = "{0}_{1}.{2}".format( + wiki, suffix, baseline_format + ) + self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format) + + if case_name is not None: + self.baseline_file = os.path.join( + BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name) + ) + + def call_wikiq(self, *args: str, out: bool = True): + """ + Calls wikiq with the passed arguments on the input file relevant to the test. + :param args: The command line arguments to pass to wikiq. + :param out: Whether to pass an output argument to wikiq. + :return: The output of the wikiq call. + """ + if out: + call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args]) + else: + call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args]) + + print(call) + return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)