don't try to remove files that don't exist.
This commit is contained in:
parent
70a10db228
commit
5d1a246898
@ -292,10 +292,13 @@ def merge_parquet_files(original_path, temp_path, merged_path):
|
|||||||
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
|
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
if not os.path.exists(temp_path):
|
||||||
|
print(f"Note: Temp file {temp_path} does not exist (namespace had no records after resume point)", file=sys.stderr)
|
||||||
|
else:
|
||||||
temp_pq = pq.ParquetFile(temp_path)
|
temp_pq = pq.ParquetFile(temp_path)
|
||||||
temp_valid = True
|
temp_valid = True
|
||||||
except Exception:
|
except Exception:
|
||||||
print(f"Note: No new data in temp file {temp_path} (namespace had no records after resume point)", file=sys.stderr)
|
print(f"Note: No new data in temp file {temp_path} (file exists but is invalid)", file=sys.stderr)
|
||||||
|
|
||||||
if not original_valid and not temp_valid:
|
if not original_valid and not temp_valid:
|
||||||
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
|
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
|
||||||
@ -380,6 +383,7 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
|
|||||||
|
|
||||||
if merged == "original_only":
|
if merged == "original_only":
|
||||||
# Temp file was invalid (no new data), keep original unchanged
|
# Temp file was invalid (no new data), keep original unchanged
|
||||||
|
if os.path.exists(temp_path):
|
||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
elif merged == "temp_only":
|
elif merged == "temp_only":
|
||||||
# Original was corrupted, use temp as new base
|
# Original was corrupted, use temp as new base
|
||||||
@ -387,17 +391,22 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
|
|||||||
os.rename(temp_path, original_path)
|
os.rename(temp_path, original_path)
|
||||||
elif merged == "both_invalid":
|
elif merged == "both_invalid":
|
||||||
# Both files corrupted, remove both
|
# Both files corrupted, remove both
|
||||||
|
if os.path.exists(original_path):
|
||||||
os.remove(original_path)
|
os.remove(original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
had_corruption = True
|
had_corruption = True
|
||||||
elif merged == "merged":
|
elif merged == "merged":
|
||||||
# Replace the original file with the merged file
|
# Replace the original file with the merged file
|
||||||
os.remove(original_path)
|
os.remove(original_path)
|
||||||
os.rename(merged_path, original_path)
|
os.rename(merged_path, original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
else:
|
else:
|
||||||
# Both files were empty (False), just remove them
|
# Both files were empty (False), just remove them
|
||||||
|
if os.path.exists(original_path):
|
||||||
os.remove(original_path)
|
os.remove(original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
else:
|
else:
|
||||||
# No original file, rename temp to original only if valid
|
# No original file, rename temp to original only if valid
|
||||||
@ -405,7 +414,8 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix):
|
|||||||
pq.ParquetFile(temp_path)
|
pq.ParquetFile(temp_path)
|
||||||
os.rename(temp_path, original_path)
|
os.rename(temp_path, original_path)
|
||||||
except Exception:
|
except Exception:
|
||||||
# Temp file invalid, just remove it
|
# Temp file invalid or missing, just remove it if it exists
|
||||||
|
if os.path.exists(temp_path):
|
||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
had_corruption = True
|
had_corruption = True
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user