don't put checkpoint files inside namespace directories.

This commit is contained in:
Nathan TeBlunthuis 2025-12-07 06:24:04 -08:00
parent 783f5fd8bc
commit d1fc094c96
2 changed files with 18 additions and 6 deletions

View File

@ -322,7 +322,7 @@ class WikiqParser:
"""Open checkpoint file for writing. Keeps file open for performance."""
if not self.output_parquet or output_file == sys.stdout.buffer:
return
checkpoint_path = get_checkpoint_path(output_file)
checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces)
Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True)
self.checkpoint_file = open(checkpoint_path, 'w')
print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr)

View File

@ -14,12 +14,24 @@ import sys
import pyarrow.parquet as pq
def get_checkpoint_path(output_file):
"""Get the path to the checkpoint file for a given output file."""
def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file.
For partitioned output, the checkpoint is placed outside the partition directory
to avoid pyarrow trying to read it as a parquet file. The filename includes
the output filename to keep it unique per input file (for parallel jobs).
"""
if partition_namespaces:
# output_file is like partition_dir/output.parquet
# checkpoint should be at parent level: parent/output.parquet.checkpoint
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
parent_dir = os.path.dirname(partition_dir)
return os.path.join(parent_dir, output_filename + ".checkpoint")
return str(output_file) + ".checkpoint"
def read_checkpoint(output_file):
def read_checkpoint(output_file, partition_namespaces=False):
"""
Read resume point from checkpoint file if it exists.
@ -31,7 +43,7 @@ def read_checkpoint(output_file):
For single files: A tuple (pageid, revid), or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid), or None.
"""
checkpoint_path = get_checkpoint_path(output_file)
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if not os.path.exists(checkpoint_path):
return None
@ -78,7 +90,7 @@ def get_resume_point(output_file, partition_namespaces=False):
or None if no partitions exist.
"""
# First try checkpoint file (fast)
checkpoint_result = read_checkpoint(output_file)
checkpoint_result = read_checkpoint(output_file, partition_namespaces)
if checkpoint_result is not None:
print(f"Resume point found in checkpoint file", file=sys.stderr)
return checkpoint_result