diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 6f89f0d..22da27a 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -322,7 +322,7 @@ class WikiqParser: """Open checkpoint file for writing. Keeps file open for performance.""" if not self.output_parquet or output_file == sys.stdout.buffer: return - checkpoint_path = get_checkpoint_path(output_file) + checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces) Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True) self.checkpoint_file = open(checkpoint_path, 'w') print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr) diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index 0484be3..38e4b09 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -14,12 +14,24 @@ import sys import pyarrow.parquet as pq -def get_checkpoint_path(output_file): - """Get the path to the checkpoint file for a given output file.""" +def get_checkpoint_path(output_file, partition_namespaces=False): + """Get the path to the checkpoint file for a given output file. + + For partitioned output, the checkpoint is placed outside the partition directory + to avoid pyarrow trying to read it as a parquet file. The filename includes + the output filename to keep it unique per input file (for parallel jobs). + """ + if partition_namespaces: + # output_file is like partition_dir/output.parquet + # checkpoint should be at parent level: parent/output.parquet.checkpoint + partition_dir = os.path.dirname(output_file) + output_filename = os.path.basename(output_file) + parent_dir = os.path.dirname(partition_dir) + return os.path.join(parent_dir, output_filename + ".checkpoint") return str(output_file) + ".checkpoint" -def read_checkpoint(output_file): +def read_checkpoint(output_file, partition_namespaces=False): """ Read resume point from checkpoint file if it exists. @@ -31,7 +43,7 @@ def read_checkpoint(output_file): For single files: A tuple (pageid, revid), or None if not found. For partitioned: A dict mapping namespace -> (pageid, revid), or None. """ - checkpoint_path = get_checkpoint_path(output_file) + checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if not os.path.exists(checkpoint_path): return None @@ -78,7 +90,7 @@ def get_resume_point(output_file, partition_namespaces=False): or None if no partitions exist. """ # First try checkpoint file (fast) - checkpoint_result = read_checkpoint(output_file) + checkpoint_result = read_checkpoint(output_file, partition_namespaces) if checkpoint_result is not None: print(f"Resume point found in checkpoint file", file=sys.stderr) return checkpoint_result