more work on resuming.

This commit is contained in:
Nathan TeBlunthuis 2025-12-10 21:07:52 -08:00
parent c3d31b4ab5
commit 6b4f3939a5
5 changed files with 512 additions and 532 deletions

View File

@ -76,14 +76,17 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
# Temp file was invalid, just remove it
os.remove(temp_output_file)
elif merged == "temp_only":
# Original was corrupted, use temp as new base
os.remove(output_file)
# Original was corrupted or missing, use temp as new base
if os.path.exists(output_file):
os.remove(output_file)
os.rename(temp_output_file, output_file)
print("Recovered from temp file (original was corrupted).", file=sys.stderr)
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
elif merged == "both_invalid":
# Both files corrupted, remove both and start fresh
os.remove(output_file)
os.remove(temp_output_file)
# Both files corrupted or missing, remove both and start fresh
if os.path.exists(output_file):
os.remove(output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
# Also remove stale checkpoint file
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):

View File

@ -1,112 +1,48 @@
import os
import shutil
import subprocess
import sys
import tracemalloc
from io import StringIO
from typing import Final, Union
import pytest
import numpy as np
import pandas as pd
import pyarrow as pa
import pytest
from pandas import DataFrame
from pandas.testing import assert_frame_equal, assert_series_equal
# Make references to files and wikiq relative to this file, not to the current working directory.
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
from wikiq_test_utils import (
BASELINE_DIR,
IKWIKI,
REGEXTEST,
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
TWINPEAKS,
WIKIQ,
WikiqTester,
)
def setup():
tracemalloc.start()
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
if not os.path.exists(TEST_OUTPUT_DIR):
os.mkdir(TEST_OUTPUT_DIR)
# Always run setup, even if this is executed via "python -m unittest" rather
# than as __main__.
setup()
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
# If case_name is unset, there are no relevant baseline or test files.
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
def test_WP_noargs():
tester = WikiqTester(IKWIKI, "noargs")
@ -442,207 +378,6 @@ def test_parquet():
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
def test_resume():
"""Test that --resume properly resumes processing from the last written revid."""
import pyarrow.parquet as pq
# First, create a complete baseline output
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get the middle revid to use as the resume point
middle_idx = len(full_table) // 2
resume_revid = full_table.column("revid")[middle_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
# Create a partial output by copying row groups to preserve the exact schema
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
# Create partial output by filtering the table and writing with the same schema
partial_table = full_table.slice(0, middle_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# The resumed output should match the full output
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_diff():
"""Test that --resume works correctly with diff computation."""
import pyarrow.parquet as pq
# First, create a complete baseline output with diff
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get a revid about 1/3 through to use as the resume point
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by filtering the table to preserve the exact schema
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
# Create partial output by slicing the table
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces.
Interrupts wikiq partway through processing, then resumes and verifies
the result matches an uninterrupted run. Uses --flush-per-batch to ensure
data is written to disk after each batch, making interruption deterministic.
"""
import signal
import time
import pyarrow.dataset as ds
# Use separate subdirectories for full and partial runs to isolate them
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
# Clean up any existing output directories from previous runs
for output_dir in [full_dir, partial_dir]:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
# Paths within each isolated directory
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
# Run wikiq fully to get baseline output
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
try:
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read full output
full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
full_df = full_dataset.to_table().to_pandas()
total_rows = len(full_df)
print(f"Full run produced {total_rows} rows")
# Start wikiq for the interrupted run (use list args so SIGTERM goes to Python)
batch_size = 10
cmd_partial = [
sys.executable, WIKIQ, input_file,
"-o", partial_output,
"--batch-size", str(batch_size),
"--partition-namespaces"
]
print(f"Starting: {' '.join(cmd_partial)}")
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
# Wait a short time to allow some processing
interrupt_delay = 5 # seconds - enough for some pages but not all
time.sleep(interrupt_delay)
if proc.poll() is not None:
pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
# Simulate SLURM job termination: send SIGUSR1 first (early warning),
# then wait for graceful shutdown, then SIGTERM if still running
print(f"Sending SIGUSR1 after {interrupt_delay}s")
proc.send_signal(signal.SIGUSR1)
# Wait for graceful shutdown
try:
proc.wait(timeout=5)
print("Process exited gracefully after SIGUSR1")
except subprocess.TimeoutExpired:
# Process didn't exit, send SIGTERM
print("Sending SIGTERM after SIGUSR1 timeout")
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=30)
# Read interrupted output
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
interrupted_rows = interrupted_dataset.count_rows()
print(f"Interrupted run wrote {interrupted_rows} rows")
assert interrupted_rows < total_rows, \
f"Process wrote all {interrupted_rows} rows before being killed"
# Resume
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read resumed output
resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
resumed_df = resumed_dataset.to_table().to_pandas()
# Check revid sets match (the important invariant)
full_revids = set(full_df['revid'])
resumed_revids = set(resumed_df['revid'])
missing_revids = full_revids - resumed_revids
extra_revids = resumed_revids - full_revids
assert missing_revids == set() and extra_revids == set(), \
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
assert len(resumed_df) == len(full_df), \
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
def test_external_links_only():
"""Test that --external-links extracts external links correctly."""
@ -977,242 +712,3 @@ def test_headings():
print(f"Headings test passed! {len(test)} rows processed")
def test_resume_file_not_found():
"""Test that --resume exits with error when output file doesn't exist."""
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
# Ensure the output file does not exist
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
if os.path.exists(expected_output):
os.remove(expected_output)
try:
tester.call_wikiq("--resume")
pytest.fail("Expected error when --resume is used but output file doesn't exist")
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode("utf8")
assert "Error: --resume specified but output file not found" in stderr, \
f"Expected error message about missing output file, got: {stderr}"
print("Resume file not found test passed!")
def test_resume_simple():
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
import pyarrow.parquet as pq
# First, create a complete baseline output (no fandom-2020, no partition-namespaces)
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get a revid about 1/3 through to use as the resume point
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by slicing the table
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_merge_with_invalid_temp_file():
"""Test that resume handles invalid/empty temp files gracefully.
This can happen when a namespace has no records after the resume point,
resulting in a temp file that was created but never written to.
"""
import pyarrow.parquet as pq
from wikiq.resume import merge_parquet_files, merge_partitioned_namespaces
import tempfile
# Create a valid parquet file
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
# Create a valid original file
import pyarrow as pa
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
pq.write_table(table, original_path)
# Create an invalid temp file (empty file, not valid parquet)
with open(temp_path, 'w') as f:
f.write("")
# merge_parquet_files should return "original_only" for invalid temp file
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}"
# Original file should still exist and be unchanged
assert os.path.exists(original_path), "Original file should still exist"
original_table = pq.read_table(original_path)
assert len(original_table) == 3, "Original file should be unchanged"
# Merged file should not have been created
assert not os.path.exists(merged_path), "Merged file should not be created"
print("Resume merge with invalid temp file test passed!")
def test_resume_merge_with_corrupted_original():
"""Test that resume recovers from a corrupted original file if temp is valid.
This can happen if the original file was being written when the process
was killed, leaving it in a corrupted state.
"""
import pyarrow.parquet as pq
from wikiq.resume import merge_parquet_files
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
# Create a corrupted original file (not valid parquet)
with open(original_path, 'w') as f:
f.write("corrupted data")
# Create a valid temp file
import pyarrow as pa
table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]})
pq.write_table(table, temp_path)
# merge_parquet_files should return "temp_only" for corrupted original
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}"
# Merged file should not have been created (caller handles renaming temp)
assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case"
print("Resume merge with corrupted original test passed!")
def test_resume_merge_both_invalid():
"""Test that resume handles both files being invalid."""
from wikiq.resume import merge_parquet_files
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
# Create corrupted original file
with open(original_path, 'w') as f:
f.write("corrupted original")
# Create corrupted temp file
with open(temp_path, 'w') as f:
f.write("corrupted temp")
# merge_parquet_files should return "both_invalid"
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}"
print("Resume merge with both invalid test passed!")
def test_cleanup_interrupted_resume_both_corrupted():
"""Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted."""
from wikiq.resume import cleanup_interrupted_resume, get_checkpoint_path
import tempfile
import json
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
# Create corrupted original file
with open(output_file, 'w') as f:
f.write("corrupted original")
# Create corrupted temp file
with open(temp_file, 'w') as f:
f.write("corrupted temp")
# Create a checkpoint file (should be deleted)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": 100, "revid": 200}, f)
# cleanup_interrupted_resume should return "start_fresh"
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result == "start_fresh", f"Expected 'start_fresh', got {result}"
# All files should be deleted
assert not os.path.exists(output_file), "Corrupted original should be deleted"
assert not os.path.exists(temp_file), "Corrupted temp should be deleted"
assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted"
print("Cleanup interrupted resume with both corrupted test passed!")
def test_cleanup_interrupted_resume_original_corrupted_temp_valid():
"""Test that cleanup recovers from temp when original is corrupted."""
from wikiq.resume import cleanup_interrupted_resume, get_resume_point
import pyarrow as pa
import pyarrow.parquet as pq
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
# Create corrupted original file
with open(output_file, 'w') as f:
f.write("corrupted original")
# Create valid temp file with some data
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
pq.write_table(table, temp_file)
# cleanup_interrupted_resume should recover from temp (not return "start_fresh")
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result is None, f"Expected None (normal recovery), got {result}"
# Original should now contain the temp file's data
assert os.path.exists(output_file), "Output file should exist after recovery"
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
# Verify the recovered data
recovered_table = pq.read_table(output_file)
assert len(recovered_table) == 3, "Recovered file should have 3 rows"
# get_resume_point should find the resume point from recovered file
resume_point = get_resume_point(output_file, partition_namespaces=False)
assert resume_point is not None, "Should find resume point from recovered file"
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
print("Cleanup with original corrupted, temp valid test passed!")

5
test/conftest.py Normal file
View File

@ -0,0 +1,5 @@
import os
import sys
# Add the test directory to Python path so test utilities can be imported
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

401
test/test_resume.py Normal file
View File

@ -0,0 +1,401 @@
import json
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pytest
from pandas.testing import assert_frame_equal
from wikiq.resume import (
cleanup_interrupted_resume,
get_checkpoint_path,
get_resume_point,
merge_parquet_files,
)
from wikiq_test_utils import (
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
WIKIQ,
WikiqTester,
)
def test_resume():
"""Test that --resume properly resumes processing from the last written revid."""
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
middle_idx = len(full_table) // 2
resume_revid = full_table.column("revid")[middle_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, middle_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_diff():
"""Test that --resume works correctly with diff computation."""
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces.
Interrupts wikiq partway through processing, then resumes and verifies
the result matches an uninterrupted run. Uses --flush-per-batch to ensure
data is written to disk after each batch, making interruption deterministic.
"""
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
for output_dir in [full_dir, partial_dir]:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
try:
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
full_df = full_dataset.to_table().to_pandas()
total_rows = len(full_df)
print(f"Full run produced {total_rows} rows")
batch_size = 10
cmd_partial = [
sys.executable, WIKIQ, input_file,
"-o", partial_output,
"--batch-size", str(batch_size),
"--partition-namespaces"
]
print(f"Starting: {' '.join(cmd_partial)}")
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
interrupt_delay = 5
time.sleep(interrupt_delay)
if proc.poll() is not None:
pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
print(f"Sending SIGUSR1 after {interrupt_delay}s")
proc.send_signal(signal.SIGUSR1)
try:
proc.wait(timeout=5)
print("Process exited gracefully after SIGUSR1")
except subprocess.TimeoutExpired:
print("Sending SIGTERM after SIGUSR1 timeout")
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=30)
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
interrupted_rows = interrupted_dataset.count_rows()
print(f"Interrupted run wrote {interrupted_rows} rows")
assert interrupted_rows < total_rows, \
f"Process wrote all {interrupted_rows} rows before being killed"
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
resumed_df = resumed_dataset.to_table().to_pandas()
full_revids = set(full_df['revid'])
resumed_revids = set(resumed_df['revid'])
missing_revids = full_revids - resumed_revids
extra_revids = resumed_revids - full_revids
assert missing_revids == set() and extra_revids == set(), \
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
assert len(resumed_df) == len(full_df), \
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
def test_resume_file_not_found():
"""Test that --resume exits with error when output file doesn't exist."""
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
if os.path.exists(expected_output):
os.remove(expected_output)
try:
tester.call_wikiq("--resume")
pytest.fail("Expected error when --resume is used but output file doesn't exist")
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode("utf8")
assert "Error: --resume specified but output file not found" in stderr, \
f"Expected error message about missing output file, got: {stderr}"
print("Resume file not found test passed!")
def test_resume_simple():
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_merge_with_invalid_temp_file():
"""Test that resume handles invalid/empty temp files gracefully.
This can happen when a namespace has no records after the resume point,
resulting in a temp file that was created but never written to.
"""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
pq.write_table(table, original_path)
with open(temp_path, 'w') as f:
f.write("")
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}"
assert os.path.exists(original_path), "Original file should still exist"
original_table = pq.read_table(original_path)
assert len(original_table) == 3, "Original file should be unchanged"
assert not os.path.exists(merged_path), "Merged file should not be created"
print("Resume merge with invalid temp file test passed!")
def test_resume_merge_with_corrupted_original():
"""Test that resume recovers from a corrupted original file if temp is valid.
This can happen if the original file was being written when the process
was killed, leaving it in a corrupted state.
"""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
with open(original_path, 'w') as f:
f.write("corrupted data")
table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]})
pq.write_table(table, temp_path)
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}"
assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case"
print("Resume merge with corrupted original test passed!")
def test_resume_merge_both_invalid():
"""Test that resume handles both files being invalid."""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
with open(original_path, 'w') as f:
f.write("corrupted original")
with open(temp_path, 'w') as f:
f.write("corrupted temp")
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}"
print("Resume merge with both invalid test passed!")
def test_cleanup_interrupted_resume_both_corrupted():
"""Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
with open(output_file, 'w') as f:
f.write("corrupted original")
with open(temp_file, 'w') as f:
f.write("corrupted temp")
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": 100, "revid": 200}, f)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result == "start_fresh", f"Expected 'start_fresh', got {result}"
assert not os.path.exists(output_file), "Corrupted original should be deleted"
assert not os.path.exists(temp_file), "Corrupted temp should be deleted"
assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted"
print("Cleanup interrupted resume with both corrupted test passed!")
def test_cleanup_interrupted_resume_original_corrupted_temp_valid():
"""Test that cleanup recovers from temp when original is corrupted."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
with open(output_file, 'w') as f:
f.write("corrupted original")
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
pq.write_table(table, temp_file)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result is None, f"Expected None (normal recovery), got {result}"
assert os.path.exists(output_file), "Output file should exist after recovery"
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
recovered_table = pq.read_table(output_file)
assert len(recovered_table) == 3, "Recovered file should have 3 rows"
resume_point = get_resume_point(output_file, partition_namespaces=False)
assert resume_point is not None, "Should find resume point from recovered file"
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
print("Cleanup with original corrupted, temp valid test passed!")
def test_cleanup_original_missing_temp_valid_no_checkpoint():
"""Test recovery when original is missing, temp is valid, and no checkpoint exists."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
assert not os.path.exists(output_file)
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
pq.write_table(table, temp_file)
assert not os.path.exists(checkpoint_path)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result is None, f"Expected None (normal recovery), got {result}"
assert os.path.exists(output_file), "Output file should exist after recovery"
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
resume_point = get_resume_point(output_file, partition_namespaces=False)
assert resume_point is not None, "Should find resume point from recovered file"
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
print("Original missing, temp valid, no checkpoint test passed!")

75
test/wikiq_test_utils.py Normal file
View File

@ -0,0 +1,75 @@
import os
import shutil
import subprocess
from typing import Final, Union
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)