add logic for resuming after a resume.

This commit is contained in:
Nathan TeBlunthuis 2025-12-10 19:26:54 -08:00
parent d1fc094c96
commit f427291fd8
3 changed files with 132 additions and 16 deletions

View File

@ -34,6 +34,7 @@ from wikiq.resume import (
setup_resume_temp_output, setup_resume_temp_output,
finalize_resume_merge, finalize_resume_merge,
get_checkpoint_path, get_checkpoint_path,
cleanup_interrupted_resume,
) )
TO_ENCODE = ("title", "editor") TO_ENCODE = ("title", "editor")
@ -1227,6 +1228,8 @@ def main():
resume_point = None resume_point = None
if args.resume: if args.resume:
if output_parquet and not args.stdout: if output_parquet and not args.stdout:
# First, merge any leftover temp files from a previous interrupted run
cleanup_interrupted_resume(output_file, args.partition_namespaces)
resume_point = get_resume_point(output_file, args.partition_namespaces) resume_point = get_resume_point(output_file, args.partition_namespaces)
if resume_point is not None: if resume_point is not None:
if args.partition_namespaces: if args.partition_namespaces:

View File

@ -14,6 +14,54 @@ import sys
import pyarrow.parquet as pq import pyarrow.parquet as pq
def cleanup_interrupted_resume(output_file, partition_namespaces):
"""
Merge any leftover .resume_temp files from a previous interrupted run.
This should be called BEFORE get_resume_point() so the resume point
is calculated from the merged data.
"""
import shutil
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
temp_suffix = ".resume_temp"
if not os.path.isdir(partition_dir):
return
has_old_temp_files = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
if os.path.exists(temp_path):
has_old_temp_files = True
break
if has_old_temp_files:
print("Found leftover temp files from previous interrupted partitioned run, merging first...", file=sys.stderr)
merge_partitioned_namespaces(partition_dir, temp_suffix)
print("Previous temp files merged successfully.", file=sys.stderr)
else:
temp_output_file = output_file + ".resume_temp"
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
print("Found leftover temp file from previous interrupted run, merging first...", file=sys.stderr)
merged_path = output_file + ".merged"
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
if merged is None:
# Temp file was invalid, just remove it
os.remove(temp_output_file)
elif merged:
os.remove(output_file)
os.rename(merged_path, output_file)
os.remove(temp_output_file)
print("Previous temp file merged successfully.", file=sys.stderr)
else:
# Both empty - unusual
os.remove(temp_output_file)
def get_checkpoint_path(output_file, partition_namespaces=False): def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file. """Get the path to the checkpoint file for a given output file.
@ -183,10 +231,19 @@ def merge_parquet_files(original_path, temp_path, merged_path):
Merge two parquet files by streaming row groups from original and temp into merged. Merge two parquet files by streaming row groups from original and temp into merged.
This is memory-efficient: only one row group is loaded at a time. This is memory-efficient: only one row group is loaded at a time.
Returns True if merged file was created, False if both sources were empty. Returns:
True if merged file was created
False if both sources were empty
None if temp file is invalid (caller should keep original unchanged)
""" """
original_pq = pq.ParquetFile(original_path) original_pq = pq.ParquetFile(original_path)
temp_pq = pq.ParquetFile(temp_path)
try:
temp_pq = pq.ParquetFile(temp_path)
except Exception:
# Temp file is invalid (empty or corrupted) - no new data was written
print(f"Note: No new data in temp file (namespace had no records after resume point)", file=sys.stderr)
return None
merged_writer = None merged_writer = None
@ -253,7 +310,10 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
merged_path = original_path + ".merged" merged_path = original_path + ".merged"
merged = merge_parquet_files(original_path, temp_path, merged_path) merged = merge_parquet_files(original_path, temp_path, merged_path)
if merged: if merged is None:
# Temp file was invalid (no new data), keep original unchanged
os.remove(temp_path)
elif merged:
# Replace the original file with the merged file # Replace the original file with the merged file
os.remove(original_path) os.remove(original_path)
os.rename(merged_path, original_path) os.rename(merged_path, original_path)
@ -263,8 +323,13 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
os.remove(original_path) os.remove(original_path)
os.remove(temp_path) os.remove(temp_path)
else: else:
# No original file, rename temp to original # No original file, rename temp to original only if valid
os.rename(temp_path, original_path) try:
pq.ParquetFile(temp_path)
os.rename(temp_path, original_path)
except Exception:
# Temp file invalid, just remove it
os.remove(temp_path)
def finalize_resume_merge( def finalize_resume_merge(
@ -299,15 +364,23 @@ def finalize_resume_merge(
else: else:
# Merge single parquet files # Merge single parquet files
merged_output_file = original_output_file + ".merged" merged_output_file = original_output_file + ".merged"
merge_parquet_files(original_output_file, temp_output_file, merged_output_file) merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
# Replace the original file with the merged file if merged is None:
os.remove(original_output_file) # Temp file was invalid (no new data), keep original unchanged
os.rename(merged_output_file, original_output_file) if os.path.exists(temp_output_file):
os.remove(temp_output_file)
# Clean up the temp file elif merged:
if os.path.exists(temp_output_file): # Replace the original file with the merged file
os.remove(temp_output_file) os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
else:
# Both files were empty - unusual, but clean up
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr) print("Merge complete.", file=sys.stderr)
except Exception as e: except Exception as e:
@ -354,15 +427,15 @@ def setup_resume_temp_output(output_file, partition_namespaces):
original_output_file = output_file original_output_file = output_file
temp_output_file = output_file + ".resume_temp" temp_output_file = output_file + ".resume_temp"
# Remove temp file/dir if it exists from a previous failed run # Note: cleanup_interrupted_resume() should have been called before this
# to merge any leftover temp files from a previous interrupted run.
# Here we just clean up any remaining temp directory markers.
if os.path.exists(temp_output_file): if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file): if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file) shutil.rmtree(temp_output_file)
else: else:
os.remove(temp_output_file) os.remove(temp_output_file)
# For partitioned namespaces, create an empty temp directory
# (actual temp files go in namespace=* dirs with .resume_temp suffix)
if partition_namespaces: if partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True) os.makedirs(temp_output_file, exist_ok=True)

View File

@ -1043,3 +1043,43 @@ def test_resume_simple():
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_merge_with_invalid_temp_file():
"""Test that resume handles invalid/empty temp files gracefully.
This can happen when a namespace has no records after the resume point,
resulting in a temp file that was created but never written to.
"""
import pyarrow.parquet as pq
from wikiq.resume import merge_parquet_files, merge_partitioned_namespaces
import tempfile
# Create a valid parquet file
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
# Create a valid original file
import pyarrow as pa
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
pq.write_table(table, original_path)
# Create an invalid temp file (empty file, not valid parquet)
with open(temp_path, 'w') as f:
f.write("")
# merge_parquet_files should return None for invalid temp file
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result is None, "Expected None when temp file is invalid"
# Original file should still exist and be unchanged
assert os.path.exists(original_path), "Original file should still exist"
original_table = pq.read_table(original_path)
assert len(original_table) == 3, "Original file should be unchanged"
# Merged file should not have been created
assert not os.path.exists(merged_path), "Merged file should not be created"
print("Resume merge with invalid temp file test passed!")