diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index 5af9e3c..67adeb9 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -45,7 +45,7 @@ def cleanup_interrupted_resume(output_file, partition_namespaces): if has_old_temp_files: print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr) - had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix) + had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename) # Check if any valid data remains after merge has_valid_data = False @@ -342,7 +342,7 @@ def merge_parquet_files(original_path, temp_path, merged_path): return False -def merge_partitioned_namespaces(partition_dir, temp_suffix): +def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter): """ Merge partitioned namespace directories after resume. @@ -353,6 +353,8 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): Args: partition_dir: The partition directory containing namespace=* subdirs temp_suffix: The suffix appended to temp files (e.g., '.resume_temp') + file_filter: Only process temp files matching this base name + (e.g., 'enwiki-20250123-pages-meta-history24-p53238682p53445302.parquet') Returns: True if at least one namespace has valid data after merge @@ -360,64 +362,61 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix): """ namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] had_corruption = False + expected_temp = file_filter + temp_suffix for ns_dir in namespace_dirs: ns_path = os.path.join(partition_dir, ns_dir) + temp_path = os.path.join(ns_path, expected_temp) - # Find all files in this namespace directory - files = os.listdir(ns_path) + if not os.path.exists(temp_path): + continue - # Find temp files (files ending with the temp suffix) - temp_files = [f for f in files if f.endswith(temp_suffix)] + # Original file is the temp file without the suffix + original_file = file_filter + original_path = os.path.join(ns_path, original_file) - for temp_file in temp_files: - temp_path = os.path.join(ns_path, temp_file) - # Original file is the temp file without the suffix - original_file = temp_file[:-len(temp_suffix)] - original_path = os.path.join(ns_path, original_file) + if os.path.exists(original_path): + # Merge the files + merged_path = original_path + ".merged" + merged = merge_parquet_files(original_path, temp_path, merged_path) - if os.path.exists(original_path): - # Merge the files - merged_path = original_path + ".merged" - merged = merge_parquet_files(original_path, temp_path, merged_path) - - if merged == "original_only": - # Temp file was invalid (no new data), keep original unchanged - if os.path.exists(temp_path): - os.remove(temp_path) - elif merged == "temp_only": - # Original was corrupted, use temp as new base + if merged == "original_only": + # Temp file was invalid (no new data), keep original unchanged + if os.path.exists(temp_path): + os.remove(temp_path) + elif merged == "temp_only": + # Original was corrupted, use temp as new base + os.remove(original_path) + os.rename(temp_path, original_path) + elif merged == "both_invalid": + # Both files corrupted, remove both + if os.path.exists(original_path): os.remove(original_path) - os.rename(temp_path, original_path) - elif merged == "both_invalid": - # Both files corrupted, remove both - if os.path.exists(original_path): - os.remove(original_path) - if os.path.exists(temp_path): - os.remove(temp_path) - had_corruption = True - elif merged == "merged": - # Replace the original file with the merged file - os.remove(original_path) - os.rename(merged_path, original_path) - if os.path.exists(temp_path): - os.remove(temp_path) - else: - # Both files were empty (False), just remove them - if os.path.exists(original_path): - os.remove(original_path) - if os.path.exists(temp_path): - os.remove(temp_path) + if os.path.exists(temp_path): + os.remove(temp_path) + had_corruption = True + elif merged == "merged": + # Replace the original file with the merged file + os.remove(original_path) + os.rename(merged_path, original_path) + if os.path.exists(temp_path): + os.remove(temp_path) else: - # No original file, rename temp to original only if valid - try: - pq.ParquetFile(temp_path) - os.rename(temp_path, original_path) - except Exception: - # Temp file invalid or missing, just remove it if it exists - if os.path.exists(temp_path): - os.remove(temp_path) - had_corruption = True + # Both files were empty (False), just remove them + if os.path.exists(original_path): + os.remove(original_path) + if os.path.exists(temp_path): + os.remove(temp_path) + else: + # No original file, rename temp to original only if valid + try: + pq.ParquetFile(temp_path) + os.rename(temp_path, original_path) + except Exception: + # Temp file invalid or missing, just remove it if it exists + if os.path.exists(temp_path): + os.remove(temp_path) + had_corruption = True return had_corruption @@ -447,7 +446,9 @@ def finalize_resume_merge( if partition_namespaces and original_partition_dir is not None: # For partitioned namespaces, temp files are written alongside originals # with '.resume_temp' suffix in each namespace directory. - merge_partitioned_namespaces(original_partition_dir, ".resume_temp") + # Only merge temp files for the current dump file, not other concurrent jobs. + file_filter = os.path.basename(original_output_file) + merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter) # Clean up the empty temp directory we created if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file)