only merge the correct partitioned files.

This commit is contained in:
Nathan TeBlunthuis
2025-12-19 11:47:18 -08:00
parent 006feb795c
commit 38dabd0547

View File

@@ -45,7 +45,7 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
if has_old_temp_files: if has_old_temp_files:
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr) print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix) had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
# Check if any valid data remains after merge # Check if any valid data remains after merge
has_valid_data = False has_valid_data = False
@@ -342,7 +342,7 @@ def merge_parquet_files(original_path, temp_path, merged_path):
return False return False
def merge_partitioned_namespaces(partition_dir, temp_suffix): def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
""" """
Merge partitioned namespace directories after resume. Merge partitioned namespace directories after resume.
@@ -353,6 +353,8 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
Args: Args:
partition_dir: The partition directory containing namespace=* subdirs partition_dir: The partition directory containing namespace=* subdirs
temp_suffix: The suffix appended to temp files (e.g., '.resume_temp') temp_suffix: The suffix appended to temp files (e.g., '.resume_temp')
file_filter: Only process temp files matching this base name
(e.g., 'enwiki-20250123-pages-meta-history24-p53238682p53445302.parquet')
Returns: Returns:
True if at least one namespace has valid data after merge True if at least one namespace has valid data after merge
@@ -360,64 +362,61 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
""" """
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
had_corruption = False had_corruption = False
expected_temp = file_filter + temp_suffix
for ns_dir in namespace_dirs: for ns_dir in namespace_dirs:
ns_path = os.path.join(partition_dir, ns_dir) ns_path = os.path.join(partition_dir, ns_dir)
temp_path = os.path.join(ns_path, expected_temp)
# Find all files in this namespace directory if not os.path.exists(temp_path):
files = os.listdir(ns_path) continue
# Find temp files (files ending with the temp suffix) # Original file is the temp file without the suffix
temp_files = [f for f in files if f.endswith(temp_suffix)] original_file = file_filter
original_path = os.path.join(ns_path, original_file)
for temp_file in temp_files: if os.path.exists(original_path):
temp_path = os.path.join(ns_path, temp_file) # Merge the files
# Original file is the temp file without the suffix merged_path = original_path + ".merged"
original_file = temp_file[:-len(temp_suffix)] merged = merge_parquet_files(original_path, temp_path, merged_path)
original_path = os.path.join(ns_path, original_file)
if os.path.exists(original_path): if merged == "original_only":
# Merge the files # Temp file was invalid (no new data), keep original unchanged
merged_path = original_path + ".merged" if os.path.exists(temp_path):
merged = merge_parquet_files(original_path, temp_path, merged_path) os.remove(temp_path)
elif merged == "temp_only":
if merged == "original_only": # Original was corrupted, use temp as new base
# Temp file was invalid (no new data), keep original unchanged os.remove(original_path)
if os.path.exists(temp_path): os.rename(temp_path, original_path)
os.remove(temp_path) elif merged == "both_invalid":
elif merged == "temp_only": # Both files corrupted, remove both
# Original was corrupted, use temp as new base if os.path.exists(original_path):
os.remove(original_path) os.remove(original_path)
os.rename(temp_path, original_path) if os.path.exists(temp_path):
elif merged == "both_invalid": os.remove(temp_path)
# Both files corrupted, remove both had_corruption = True
if os.path.exists(original_path): elif merged == "merged":
os.remove(original_path) # Replace the original file with the merged file
if os.path.exists(temp_path): os.remove(original_path)
os.remove(temp_path) os.rename(merged_path, original_path)
had_corruption = True if os.path.exists(temp_path):
elif merged == "merged": os.remove(temp_path)
# Replace the original file with the merged file
os.remove(original_path)
os.rename(merged_path, original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
# Both files were empty (False), just remove them
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else: else:
# No original file, rename temp to original only if valid # Both files were empty (False), just remove them
try: if os.path.exists(original_path):
pq.ParquetFile(temp_path) os.remove(original_path)
os.rename(temp_path, original_path) if os.path.exists(temp_path):
except Exception: os.remove(temp_path)
# Temp file invalid or missing, just remove it if it exists else:
if os.path.exists(temp_path): # No original file, rename temp to original only if valid
os.remove(temp_path) try:
had_corruption = True pq.ParquetFile(temp_path)
os.rename(temp_path, original_path)
except Exception:
# Temp file invalid or missing, just remove it if it exists
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
return had_corruption return had_corruption
@@ -447,7 +446,9 @@ def finalize_resume_merge(
if partition_namespaces and original_partition_dir is not None: if partition_namespaces and original_partition_dir is not None:
# For partitioned namespaces, temp files are written alongside originals # For partitioned namespaces, temp files are written alongside originals
# with '.resume_temp' suffix in each namespace directory. # with '.resume_temp' suffix in each namespace directory.
merge_partitioned_namespaces(original_partition_dir, ".resume_temp") # Only merge temp files for the current dump file, not other concurrent jobs.
file_filter = os.path.basename(original_output_file)
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
# Clean up the empty temp directory we created # Clean up the empty temp directory we created
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file): if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file) shutil.rmtree(temp_output_file)