From c3d31b4ab5b31639b96fc8bd3cdc6a58415b218a Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Wed, 10 Dec 2025 20:33:04 -0800 Subject: [PATCH] handle case when we have a valid resume file, but a corrupted original. --- src/wikiq/__init__.py | 38 ++++++----- src/wikiq/resume.py | 130 ++++++++++++++++++++++++++++++-------- test/Wikiq_Unit_Test.py | 137 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 262 insertions(+), 43 deletions(-) diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 37bcb24..153f2fa 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -1226,27 +1226,33 @@ def main(): # Handle resume functionality before opening input file resume_point = None + start_fresh = False if args.resume: if output_parquet and not args.stdout: # First, merge any leftover temp files from a previous interrupted run - cleanup_interrupted_resume(output_file, args.partition_namespaces) - resume_point = get_resume_point(output_file, args.partition_namespaces) - if resume_point is not None: - if args.partition_namespaces: - ns_list = sorted(resume_point.keys()) - print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr) - for ns in ns_list: - pageid, revid = resume_point[ns] - print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr) - else: - pageid, revid = resume_point - print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr) + cleanup_result = cleanup_interrupted_resume(output_file, args.partition_namespaces) + if cleanup_result == "start_fresh": + # All data was corrupted, start from beginning + start_fresh = True + print("Starting fresh due to data corruption.", file=sys.stderr) else: - if args.partition_namespaces: - partition_dir = os.path.dirname(output_file) - sys.exit(f"Error: --resume specified but partitioned output not found in: {partition_dir}") + resume_point = get_resume_point(output_file, args.partition_namespaces) + if resume_point is not None: + if args.partition_namespaces: + ns_list = sorted(resume_point.keys()) + print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr) + for ns in ns_list: + pageid, revid = resume_point[ns] + print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr) + else: + pageid, revid = resume_point + print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr) else: - sys.exit(f"Error: --resume specified but output file not found: {output_file}") + if args.partition_namespaces: + partition_dir = os.path.dirname(output_file) + sys.exit(f"Error: --resume specified but partitioned output not found in: {partition_dir}") + else: + sys.exit(f"Error: --resume specified but output file not found: {output_file}") else: sys.exit("Error: --resume only works with parquet output (not stdout or TSV)") diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index 9554014..ae51555 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -20,6 +20,10 @@ def cleanup_interrupted_resume(output_file, partition_namespaces): This should be called BEFORE get_resume_point() so the resume point is calculated from the merged data. + + Returns: + None - no temp files found or normal merge completed + "start_fresh" - both original and temp were corrupted and deleted """ import shutil @@ -40,19 +44,53 @@ def cleanup_interrupted_resume(output_file, partition_namespaces): break if has_old_temp_files: - print("Found leftover temp files from previous interrupted partitioned run, merging first...", file=sys.stderr) - merge_partitioned_namespaces(partition_dir, temp_suffix) + print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr) + had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix) + + # Check if any valid data remains after merge + has_valid_data = False + for ns_dir in os.listdir(partition_dir): + if ns_dir.startswith('namespace='): + ns_path = os.path.join(partition_dir, ns_dir) + parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')] + if parquet_files: + has_valid_data = True + break + + if had_corruption and not has_valid_data: + # All data was corrupted, remove checkpoint and start fresh + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) + if os.path.exists(checkpoint_path): + os.remove(checkpoint_path) + print("All partitioned files were corrupted, will start fresh.", file=sys.stderr) + return "start_fresh" + print("Previous temp files merged successfully.", file=sys.stderr) else: temp_output_file = output_file + ".resume_temp" if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file): - print("Found leftover temp file from previous interrupted run, merging first...", file=sys.stderr) + print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr) merged_path = output_file + ".merged" merged = merge_parquet_files(output_file, temp_output_file, merged_path) - if merged is None: + if merged == "original_only": # Temp file was invalid, just remove it os.remove(temp_output_file) - elif merged: + elif merged == "temp_only": + # Original was corrupted, use temp as new base + os.remove(output_file) + os.rename(temp_output_file, output_file) + print("Recovered from temp file (original was corrupted).", file=sys.stderr) + elif merged == "both_invalid": + # Both files corrupted, remove both and start fresh + os.remove(output_file) + os.remove(temp_output_file) + # Also remove stale checkpoint file + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) + if os.path.exists(checkpoint_path): + os.remove(checkpoint_path) + print("Both files were corrupted, will start fresh.", file=sys.stderr) + return "start_fresh" + elif merged == "merged": os.remove(output_file) os.rename(merged_path, output_file) os.remove(temp_output_file) @@ -138,13 +176,14 @@ def get_resume_point(output_file, partition_namespaces=False): or None if no partitions exist. """ # First try checkpoint file (fast) + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) checkpoint_result = read_checkpoint(output_file, partition_namespaces) if checkpoint_result is not None: - print(f"Resume point found in checkpoint file", file=sys.stderr) + print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr) return checkpoint_result # Fall back to scanning parquet (slow, for backwards compatibility) - print(f"No checkpoint file found, scanning parquet output...", file=sys.stderr) + print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr) try: if partition_namespaces: return _get_resume_point_partitioned(output_file) @@ -232,24 +271,39 @@ def merge_parquet_files(original_path, temp_path, merged_path): This is memory-efficient: only one row group is loaded at a time. Returns: - True if merged file was created - False if both sources were empty - None if temp file is invalid (caller should keep original unchanged) + "merged" - merged file was created from both sources + "original_only" - temp was invalid, keep original unchanged + "temp_only" - original was corrupted but temp is valid, use temp + "both_invalid" - both files invalid, delete both and start fresh + False - both files were valid but empty """ + original_valid = False + temp_valid = False + original_pq = None + temp_pq = None try: original_pq = pq.ParquetFile(original_path) - except Exception: - # Original file is invalid (empty or corrupted) - print(f"Note: No data in original file {original_path} (namespace had no records or file was not properly written)", file=sys.stderr) - return None - + original_valid = True + except Exception as e: + print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr) + try: temp_pq = pq.ParquetFile(temp_path) + temp_valid = True except Exception: - # Temp file is invalid (empty or corrupted) - no new data was written print(f"Note: No new data in temp file {temp_path} (namespace had no records after resume point)", file=sys.stderr) - return None + + if not original_valid and not temp_valid: + print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr) + return "both_invalid" + + if not original_valid and temp_valid: + print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr) + return "temp_only" + + if original_valid and not temp_valid: + return "original_only" merged_writer = None @@ -278,7 +332,7 @@ def merge_parquet_files(original_path, temp_path, merged_path): # Close the writer if merged_writer is not None: merged_writer.close() - return True + return "merged" return False @@ -293,8 +347,13 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): Args: partition_dir: The partition directory containing namespace=* subdirs temp_suffix: The suffix appended to temp files (e.g., '.resume_temp') + + Returns: + True if at least one namespace has valid data after merge + False if all namespaces ended up with corrupted/deleted data """ namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] + had_corruption = False for ns_dir in namespace_dirs: ns_path = os.path.join(partition_dir, ns_dir) @@ -316,16 +375,25 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): merged_path = original_path + ".merged" merged = merge_parquet_files(original_path, temp_path, merged_path) - if merged is None: + if merged == "original_only": # Temp file was invalid (no new data), keep original unchanged os.remove(temp_path) - elif merged: + elif merged == "temp_only": + # Original was corrupted, use temp as new base + os.remove(original_path) + os.rename(temp_path, original_path) + elif merged == "both_invalid": + # Both files corrupted, remove both + os.remove(original_path) + os.remove(temp_path) + had_corruption = True + elif merged == "merged": # Replace the original file with the merged file os.remove(original_path) os.rename(merged_path, original_path) os.remove(temp_path) else: - # Both files were empty, just remove them + # Both files were empty (False), just remove them os.remove(original_path) os.remove(temp_path) else: @@ -336,6 +404,9 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): except Exception: # Temp file invalid, just remove it os.remove(temp_path) + had_corruption = True + + return had_corruption def finalize_resume_merge( @@ -372,25 +443,34 @@ def finalize_resume_merge( merged_output_file = original_output_file + ".merged" merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file) - if merged is None: + if merged == "original_only": # Temp file was invalid (no new data), keep original unchanged if os.path.exists(temp_output_file): os.remove(temp_output_file) - elif merged: + elif merged == "temp_only": + # Original was corrupted, use temp as new base + os.remove(original_output_file) + os.rename(temp_output_file, original_output_file) + elif merged == "both_invalid": + # Both files corrupted, remove both + os.remove(original_output_file) + if os.path.exists(temp_output_file): + os.remove(temp_output_file) + elif merged == "merged": # Replace the original file with the merged file os.remove(original_output_file) os.rename(merged_output_file, original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) else: - # Both files were empty - unusual, but clean up + # Both files were empty (False) - unusual, but clean up os.remove(original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) print("Merge complete.", file=sys.stderr) except Exception as e: - print(f"Error merging resume data: {e}", file=sys.stderr) + print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr) print(f"New data saved in: {temp_output_file}", file=sys.stderr) raise diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 36c55bf..8258f87 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1070,9 +1070,9 @@ def test_resume_merge_with_invalid_temp_file(): with open(temp_path, 'w') as f: f.write("") - # merge_parquet_files should return None for invalid temp file + # merge_parquet_files should return "original_only" for invalid temp file result = merge_parquet_files(original_path, temp_path, merged_path) - assert result is None, "Expected None when temp file is invalid" + assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}" # Original file should still exist and be unchanged assert os.path.exists(original_path), "Original file should still exist" @@ -1083,3 +1083,136 @@ def test_resume_merge_with_invalid_temp_file(): assert not os.path.exists(merged_path), "Merged file should not be created" print("Resume merge with invalid temp file test passed!") + + +def test_resume_merge_with_corrupted_original(): + """Test that resume recovers from a corrupted original file if temp is valid. + + This can happen if the original file was being written when the process + was killed, leaving it in a corrupted state. + """ + import pyarrow.parquet as pq + from wikiq.resume import merge_parquet_files + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + # Create a corrupted original file (not valid parquet) + with open(original_path, 'w') as f: + f.write("corrupted data") + + # Create a valid temp file + import pyarrow as pa + table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]}) + pq.write_table(table, temp_path) + + # merge_parquet_files should return "temp_only" for corrupted original + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}" + + # Merged file should not have been created (caller handles renaming temp) + assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case" + + print("Resume merge with corrupted original test passed!") + + +def test_resume_merge_both_invalid(): + """Test that resume handles both files being invalid.""" + from wikiq.resume import merge_parquet_files + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + # Create corrupted original file + with open(original_path, 'w') as f: + f.write("corrupted original") + + # Create corrupted temp file + with open(temp_path, 'w') as f: + f.write("corrupted temp") + + # merge_parquet_files should return "both_invalid" + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}" + + print("Resume merge with both invalid test passed!") + + +def test_cleanup_interrupted_resume_both_corrupted(): + """Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted.""" + from wikiq.resume import cleanup_interrupted_resume, get_checkpoint_path + import tempfile + import json + + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, "output.parquet") + temp_file = output_file + ".resume_temp" + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False) + + # Create corrupted original file + with open(output_file, 'w') as f: + f.write("corrupted original") + + # Create corrupted temp file + with open(temp_file, 'w') as f: + f.write("corrupted temp") + + # Create a checkpoint file (should be deleted) + with open(checkpoint_path, 'w') as f: + json.dump({"pageid": 100, "revid": 200}, f) + + # cleanup_interrupted_resume should return "start_fresh" + result = cleanup_interrupted_resume(output_file, partition_namespaces=False) + assert result == "start_fresh", f"Expected 'start_fresh', got {result}" + + # All files should be deleted + assert not os.path.exists(output_file), "Corrupted original should be deleted" + assert not os.path.exists(temp_file), "Corrupted temp should be deleted" + assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted" + + print("Cleanup interrupted resume with both corrupted test passed!") + + +def test_cleanup_interrupted_resume_original_corrupted_temp_valid(): + """Test that cleanup recovers from temp when original is corrupted.""" + from wikiq.resume import cleanup_interrupted_resume, get_resume_point + import pyarrow as pa + import pyarrow.parquet as pq + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, "output.parquet") + temp_file = output_file + ".resume_temp" + + # Create corrupted original file + with open(output_file, 'w') as f: + f.write("corrupted original") + + # Create valid temp file with some data + table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]}) + pq.write_table(table, temp_file) + + # cleanup_interrupted_resume should recover from temp (not return "start_fresh") + result = cleanup_interrupted_resume(output_file, partition_namespaces=False) + assert result is None, f"Expected None (normal recovery), got {result}" + + # Original should now contain the temp file's data + assert os.path.exists(output_file), "Output file should exist after recovery" + assert not os.path.exists(temp_file), "Temp file should be renamed to output" + + # Verify the recovered data + recovered_table = pq.read_table(output_file) + assert len(recovered_table) == 3, "Recovered file should have 3 rows" + + # get_resume_point should find the resume point from recovered file + resume_point = get_resume_point(output_file, partition_namespaces=False) + assert resume_point is not None, "Should find resume point from recovered file" + assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}" + + print("Cleanup with original corrupted, temp valid test passed!")