402 lines
16 KiB
Python
402 lines
16 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
import pyarrow as pa
|
|
import pyarrow.dataset as ds
|
|
import pyarrow.parquet as pq
|
|
import pytest
|
|
from pandas.testing import assert_frame_equal
|
|
|
|
from wikiq.resume import (
|
|
cleanup_interrupted_resume,
|
|
get_checkpoint_path,
|
|
get_resume_point,
|
|
merge_parquet_files,
|
|
)
|
|
from wikiq_test_utils import (
|
|
SAILORMOON,
|
|
TEST_DIR,
|
|
TEST_OUTPUT_DIR,
|
|
WIKIQ,
|
|
WikiqTester,
|
|
)
|
|
|
|
|
|
def test_resume():
|
|
"""Test that --resume properly resumes processing from the last written revid."""
|
|
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
|
|
|
|
try:
|
|
tester_full.call_wikiq("--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
|
full_table = pq.read_table(full_output_path)
|
|
|
|
middle_idx = len(full_table) // 2
|
|
resume_revid = full_table.column("revid")[middle_idx].as_py()
|
|
|
|
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
|
|
|
|
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
|
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
|
|
|
partial_table = full_table.slice(0, middle_idx + 1)
|
|
pq.write_table(partial_table, partial_output_path)
|
|
|
|
try:
|
|
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
resumed_table = pq.read_table(partial_output_path)
|
|
|
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
|
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
|
|
|
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
|
|
|
|
|
def test_resume_with_diff():
|
|
"""Test that --resume works correctly with diff computation."""
|
|
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
|
|
|
|
try:
|
|
tester_full.call_wikiq("--diff", "--fandom-2020")
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
|
full_table = pq.read_table(full_output_path)
|
|
|
|
resume_idx = len(full_table) // 3
|
|
resume_revid = full_table.column("revid")[resume_idx].as_py()
|
|
|
|
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
|
|
|
|
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
|
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
|
|
|
partial_table = full_table.slice(0, resume_idx + 1)
|
|
pq.write_table(partial_table, partial_output_path)
|
|
|
|
try:
|
|
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
resumed_table = pq.read_table(partial_output_path)
|
|
|
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
|
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
|
|
|
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
|
|
|
|
|
def test_resume_with_partition_namespaces():
|
|
"""Test that --resume works correctly with --partition-namespaces.
|
|
|
|
Interrupts wikiq partway through processing, then resumes and verifies
|
|
the result matches an uninterrupted run. Uses --flush-per-batch to ensure
|
|
data is written to disk after each batch, making interruption deterministic.
|
|
"""
|
|
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
|
|
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
|
|
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
|
|
|
for output_dir in [full_dir, partial_dir]:
|
|
if os.path.exists(output_dir):
|
|
shutil.rmtree(output_dir)
|
|
os.makedirs(output_dir)
|
|
|
|
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
|
|
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
|
|
|
|
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
|
|
try:
|
|
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
|
|
full_df = full_dataset.to_table().to_pandas()
|
|
total_rows = len(full_df)
|
|
print(f"Full run produced {total_rows} rows")
|
|
|
|
batch_size = 10
|
|
cmd_partial = [
|
|
sys.executable, WIKIQ, input_file,
|
|
"-o", partial_output,
|
|
"--batch-size", str(batch_size),
|
|
"--partition-namespaces"
|
|
]
|
|
print(f"Starting: {' '.join(cmd_partial)}")
|
|
|
|
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
|
|
|
|
interrupt_delay = 5
|
|
time.sleep(interrupt_delay)
|
|
|
|
if proc.poll() is not None:
|
|
pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
|
|
|
|
print(f"Sending SIGUSR1 after {interrupt_delay}s")
|
|
proc.send_signal(signal.SIGUSR1)
|
|
|
|
try:
|
|
proc.wait(timeout=5)
|
|
print("Process exited gracefully after SIGUSR1")
|
|
except subprocess.TimeoutExpired:
|
|
print("Sending SIGTERM after SIGUSR1 timeout")
|
|
proc.send_signal(signal.SIGTERM)
|
|
proc.wait(timeout=30)
|
|
|
|
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
|
|
interrupted_rows = interrupted_dataset.count_rows()
|
|
print(f"Interrupted run wrote {interrupted_rows} rows")
|
|
|
|
assert interrupted_rows < total_rows, \
|
|
f"Process wrote all {interrupted_rows} rows before being killed"
|
|
|
|
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
|
|
try:
|
|
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
|
|
resumed_df = resumed_dataset.to_table().to_pandas()
|
|
|
|
full_revids = set(full_df['revid'])
|
|
resumed_revids = set(resumed_df['revid'])
|
|
missing_revids = full_revids - resumed_revids
|
|
extra_revids = resumed_revids - full_revids
|
|
assert missing_revids == set() and extra_revids == set(), \
|
|
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
|
|
assert len(resumed_df) == len(full_df), \
|
|
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
|
|
|
|
print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
|
|
|
|
|
|
def test_resume_file_not_found():
|
|
"""Test that --resume exits with error when output file doesn't exist."""
|
|
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
|
|
|
|
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
|
|
if os.path.exists(expected_output):
|
|
os.remove(expected_output)
|
|
|
|
try:
|
|
tester.call_wikiq("--resume")
|
|
pytest.fail("Expected error when --resume is used but output file doesn't exist")
|
|
except subprocess.CalledProcessError as exc:
|
|
stderr = exc.stderr.decode("utf8")
|
|
assert "Error: --resume specified but output file not found" in stderr, \
|
|
f"Expected error message about missing output file, got: {stderr}"
|
|
|
|
print("Resume file not found test passed!")
|
|
|
|
|
|
def test_resume_simple():
|
|
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
|
|
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
|
|
|
|
try:
|
|
tester_full.call_wikiq()
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
|
full_table = pq.read_table(full_output_path)
|
|
|
|
resume_idx = len(full_table) // 3
|
|
resume_revid = full_table.column("revid")[resume_idx].as_py()
|
|
|
|
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
|
|
|
|
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
|
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
|
|
|
partial_table = full_table.slice(0, resume_idx + 1)
|
|
pq.write_table(partial_table, partial_output_path)
|
|
|
|
try:
|
|
tester_partial.call_wikiq("--resume")
|
|
except subprocess.CalledProcessError as exc:
|
|
pytest.fail(exc.stderr.decode("utf8"))
|
|
|
|
resumed_table = pq.read_table(partial_output_path)
|
|
|
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
|
|
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
|
|
|
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
|
|
|
|
|
def test_resume_merge_with_invalid_temp_file():
|
|
"""Test that resume handles invalid/empty temp files gracefully.
|
|
|
|
This can happen when a namespace has no records after the resume point,
|
|
resulting in a temp file that was created but never written to.
|
|
"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
original_path = os.path.join(tmpdir, "original.parquet")
|
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
|
|
|
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
|
|
pq.write_table(table, original_path)
|
|
|
|
with open(temp_path, 'w') as f:
|
|
f.write("")
|
|
|
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
|
assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}"
|
|
|
|
assert os.path.exists(original_path), "Original file should still exist"
|
|
original_table = pq.read_table(original_path)
|
|
assert len(original_table) == 3, "Original file should be unchanged"
|
|
|
|
assert not os.path.exists(merged_path), "Merged file should not be created"
|
|
|
|
print("Resume merge with invalid temp file test passed!")
|
|
|
|
|
|
def test_resume_merge_with_corrupted_original():
|
|
"""Test that resume recovers from a corrupted original file if temp is valid.
|
|
|
|
This can happen if the original file was being written when the process
|
|
was killed, leaving it in a corrupted state.
|
|
"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
original_path = os.path.join(tmpdir, "original.parquet")
|
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
|
|
|
with open(original_path, 'w') as f:
|
|
f.write("corrupted data")
|
|
|
|
table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]})
|
|
pq.write_table(table, temp_path)
|
|
|
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
|
assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}"
|
|
|
|
assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case"
|
|
|
|
print("Resume merge with corrupted original test passed!")
|
|
|
|
|
|
def test_resume_merge_both_invalid():
|
|
"""Test that resume handles both files being invalid."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
original_path = os.path.join(tmpdir, "original.parquet")
|
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
|
|
|
with open(original_path, 'w') as f:
|
|
f.write("corrupted original")
|
|
|
|
with open(temp_path, 'w') as f:
|
|
f.write("corrupted temp")
|
|
|
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
|
assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}"
|
|
|
|
print("Resume merge with both invalid test passed!")
|
|
|
|
|
|
def test_cleanup_interrupted_resume_both_corrupted():
|
|
"""Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
output_file = os.path.join(tmpdir, "output.parquet")
|
|
temp_file = output_file + ".resume_temp"
|
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("corrupted original")
|
|
|
|
with open(temp_file, 'w') as f:
|
|
f.write("corrupted temp")
|
|
|
|
with open(checkpoint_path, 'w') as f:
|
|
json.dump({"pageid": 100, "revid": 200}, f)
|
|
|
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
|
assert result == "start_fresh", f"Expected 'start_fresh', got {result}"
|
|
|
|
assert not os.path.exists(output_file), "Corrupted original should be deleted"
|
|
assert not os.path.exists(temp_file), "Corrupted temp should be deleted"
|
|
assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted"
|
|
|
|
print("Cleanup interrupted resume with both corrupted test passed!")
|
|
|
|
|
|
def test_cleanup_interrupted_resume_original_corrupted_temp_valid():
|
|
"""Test that cleanup recovers from temp when original is corrupted."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
output_file = os.path.join(tmpdir, "output.parquet")
|
|
temp_file = output_file + ".resume_temp"
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("corrupted original")
|
|
|
|
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
|
|
pq.write_table(table, temp_file)
|
|
|
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
|
assert result is None, f"Expected None (normal recovery), got {result}"
|
|
|
|
assert os.path.exists(output_file), "Output file should exist after recovery"
|
|
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
|
|
|
|
recovered_table = pq.read_table(output_file)
|
|
assert len(recovered_table) == 3, "Recovered file should have 3 rows"
|
|
|
|
resume_point = get_resume_point(output_file, partition_namespaces=False)
|
|
assert resume_point is not None, "Should find resume point from recovered file"
|
|
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
|
|
|
|
print("Cleanup with original corrupted, temp valid test passed!")
|
|
|
|
|
|
def test_cleanup_original_missing_temp_valid_no_checkpoint():
|
|
"""Test recovery when original is missing, temp is valid, and no checkpoint exists."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
output_file = os.path.join(tmpdir, "output.parquet")
|
|
temp_file = output_file + ".resume_temp"
|
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
|
|
|
|
assert not os.path.exists(output_file)
|
|
|
|
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
|
|
pq.write_table(table, temp_file)
|
|
|
|
assert not os.path.exists(checkpoint_path)
|
|
|
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
|
assert result is None, f"Expected None (normal recovery), got {result}"
|
|
|
|
assert os.path.exists(output_file), "Output file should exist after recovery"
|
|
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
|
|
|
|
resume_point = get_resume_point(output_file, partition_namespaces=False)
|
|
assert resume_point is not None, "Should find resume point from recovered file"
|
|
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
|
|
|
|
print("Original missing, temp valid, no checkpoint test passed!")
|