diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 22da27a..37bcb24 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -34,6 +34,7 @@ from wikiq.resume import ( setup_resume_temp_output, finalize_resume_merge, get_checkpoint_path, + cleanup_interrupted_resume, ) TO_ENCODE = ("title", "editor") @@ -1227,6 +1228,8 @@ def main(): resume_point = None if args.resume: if output_parquet and not args.stdout: + # First, merge any leftover temp files from a previous interrupted run + cleanup_interrupted_resume(output_file, args.partition_namespaces) resume_point = get_resume_point(output_file, args.partition_namespaces) if resume_point is not None: if args.partition_namespaces: diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index 38e4b09..a1fa37c 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -14,6 +14,54 @@ import sys import pyarrow.parquet as pq +def cleanup_interrupted_resume(output_file, partition_namespaces): + """ + Merge any leftover .resume_temp files from a previous interrupted run. + + This should be called BEFORE get_resume_point() so the resume point + is calculated from the merged data. + """ + import shutil + + if partition_namespaces: + partition_dir = os.path.dirname(output_file) + output_filename = os.path.basename(output_file) + temp_suffix = ".resume_temp" + + if not os.path.isdir(partition_dir): + return + + has_old_temp_files = False + for ns_dir in os.listdir(partition_dir): + if ns_dir.startswith('namespace='): + temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix) + if os.path.exists(temp_path): + has_old_temp_files = True + break + + if has_old_temp_files: + print("Found leftover temp files from previous interrupted partitioned run, merging first...", file=sys.stderr) + merge_partitioned_namespaces(partition_dir, temp_suffix) + print("Previous temp files merged successfully.", file=sys.stderr) + else: + temp_output_file = output_file + ".resume_temp" + if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file): + print("Found leftover temp file from previous interrupted run, merging first...", file=sys.stderr) + merged_path = output_file + ".merged" + merged = merge_parquet_files(output_file, temp_output_file, merged_path) + if merged is None: + # Temp file was invalid, just remove it + os.remove(temp_output_file) + elif merged: + os.remove(output_file) + os.rename(merged_path, output_file) + os.remove(temp_output_file) + print("Previous temp file merged successfully.", file=sys.stderr) + else: + # Both empty - unusual + os.remove(temp_output_file) + + def get_checkpoint_path(output_file, partition_namespaces=False): """Get the path to the checkpoint file for a given output file. @@ -183,10 +231,19 @@ def merge_parquet_files(original_path, temp_path, merged_path): Merge two parquet files by streaming row groups from original and temp into merged. This is memory-efficient: only one row group is loaded at a time. - Returns True if merged file was created, False if both sources were empty. + Returns: + True if merged file was created + False if both sources were empty + None if temp file is invalid (caller should keep original unchanged) """ original_pq = pq.ParquetFile(original_path) - temp_pq = pq.ParquetFile(temp_path) + + try: + temp_pq = pq.ParquetFile(temp_path) + except Exception: + # Temp file is invalid (empty or corrupted) - no new data was written + print(f"Note: No new data in temp file (namespace had no records after resume point)", file=sys.stderr) + return None merged_writer = None @@ -253,7 +310,10 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): merged_path = original_path + ".merged" merged = merge_parquet_files(original_path, temp_path, merged_path) - if merged: + if merged is None: + # Temp file was invalid (no new data), keep original unchanged + os.remove(temp_path) + elif merged: # Replace the original file with the merged file os.remove(original_path) os.rename(merged_path, original_path) @@ -263,8 +323,13 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): os.remove(original_path) os.remove(temp_path) else: - # No original file, rename temp to original - os.rename(temp_path, original_path) + # No original file, rename temp to original only if valid + try: + pq.ParquetFile(temp_path) + os.rename(temp_path, original_path) + except Exception: + # Temp file invalid, just remove it + os.remove(temp_path) def finalize_resume_merge( @@ -299,15 +364,23 @@ def finalize_resume_merge( else: # Merge single parquet files merged_output_file = original_output_file + ".merged" - merge_parquet_files(original_output_file, temp_output_file, merged_output_file) + merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file) - # Replace the original file with the merged file - os.remove(original_output_file) - os.rename(merged_output_file, original_output_file) - - # Clean up the temp file - if os.path.exists(temp_output_file): - os.remove(temp_output_file) + if merged is None: + # Temp file was invalid (no new data), keep original unchanged + if os.path.exists(temp_output_file): + os.remove(temp_output_file) + elif merged: + # Replace the original file with the merged file + os.remove(original_output_file) + os.rename(merged_output_file, original_output_file) + if os.path.exists(temp_output_file): + os.remove(temp_output_file) + else: + # Both files were empty - unusual, but clean up + os.remove(original_output_file) + if os.path.exists(temp_output_file): + os.remove(temp_output_file) print("Merge complete.", file=sys.stderr) except Exception as e: @@ -354,15 +427,15 @@ def setup_resume_temp_output(output_file, partition_namespaces): original_output_file = output_file temp_output_file = output_file + ".resume_temp" - # Remove temp file/dir if it exists from a previous failed run + # Note: cleanup_interrupted_resume() should have been called before this + # to merge any leftover temp files from a previous interrupted run. + # Here we just clean up any remaining temp directory markers. if os.path.exists(temp_output_file): if os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file) else: os.remove(temp_output_file) - # For partitioned namespaces, create an empty temp directory - # (actual temp files go in namespace=* dirs with .resume_temp suffix) if partition_namespaces: os.makedirs(temp_output_file, exist_ok=True) diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index ed47e64..36c55bf 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1043,3 +1043,43 @@ def test_resume_simple(): assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") + + +def test_resume_merge_with_invalid_temp_file(): + """Test that resume handles invalid/empty temp files gracefully. + + This can happen when a namespace has no records after the resume point, + resulting in a temp file that was created but never written to. + """ + import pyarrow.parquet as pq + from wikiq.resume import merge_parquet_files, merge_partitioned_namespaces + import tempfile + + # Create a valid parquet file + with tempfile.TemporaryDirectory() as tmpdir: + original_path = os.path.join(tmpdir, "original.parquet") + temp_path = os.path.join(tmpdir, "temp.parquet") + merged_path = os.path.join(tmpdir, "merged.parquet") + + # Create a valid original file + import pyarrow as pa + table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]}) + pq.write_table(table, original_path) + + # Create an invalid temp file (empty file, not valid parquet) + with open(temp_path, 'w') as f: + f.write("") + + # merge_parquet_files should return None for invalid temp file + result = merge_parquet_files(original_path, temp_path, merged_path) + assert result is None, "Expected None when temp file is invalid" + + # Original file should still exist and be unchanged + assert os.path.exists(original_path), "Original file should still exist" + original_table = pq.read_table(original_path) + assert len(original_table) == 3, "Original file should be unchanged" + + # Merged file should not have been created + assert not os.path.exists(merged_path), "Merged file should not be created" + + print("Resume merge with invalid temp file test passed!")