From 5ebdb26d8207c8a0e000198b61510b0d79edddd2 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Tue, 23 Dec 2025 09:09:51 -0800 Subject: [PATCH] make resume with jsonl output fault tolerant. --- src/wikiq/__init__.py | 10 +++++--- src/wikiq/resume.py | 55 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 33dcca8..169ec71 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -624,8 +624,11 @@ class WikiqParser: return path.parent / f"{path.stem}.part{part_num}{path.suffix}" def _open_checkpoint(self, output_file): - """Enable checkpointing for the given output file.""" - if (not self.output_jsonl and not self.output_parquet) or output_file == sys.stdout.buffer: + """Enable checkpointing for Parquet output only. + + JSONL doesn't need checkpoint files - resume point is derived from last line. + """ + if not self.output_parquet or output_file == sys.stdout.buffer: return self.checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces) Path(self.checkpoint_path).parent.mkdir(parents=True, exist_ok=True) @@ -1481,7 +1484,8 @@ def main(): else: resume_point = get_resume_point(output_file, partition_namespaces) else: - resume_point = read_checkpoint(get_checkpoint_path(output_file)) + # JSONL: get resume point from last line of file (no checkpoint) + resume_point = get_resume_point(output_file, input_file=filename) if resume_point is not None: if isinstance(resume_point, dict): print(f"Resuming from checkpoint for {len(resume_point)} namespaces", file=sys.stderr) diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py index 27db8ac..79cc212 100644 --- a/src/wikiq/resume.py +++ b/src/wikiq/resume.py @@ -10,6 +10,7 @@ This module handles: import json import os import sys +from collections import deque import pyarrow.parquet as pq @@ -153,31 +154,71 @@ def cleanup_interrupted_resume(output_file, partition_namespaces): os.remove(temp_output_file) -def get_resume_point(output_file, partition_namespaces=False): +def get_jsonl_resume_point(output_file, input_file=None): + """Get resume point from last complete line of JSONL file. + + For .jsonl.d directories, derives the file path from input_file using get_output_filename. + """ + # Handle .jsonl.d directory output + if output_file.endswith('.jsonl.d'): + if input_file is None: + return None + if os.path.isdir(output_file): + # Import here to avoid circular import + from wikiq import get_output_filename + jsonl_filename = os.path.basename(get_output_filename(input_file, 'jsonl')) + output_file = os.path.join(output_file, jsonl_filename) + else: + return None + + if not os.path.exists(output_file): + return None + + try: + with open(output_file) as f: + # Stream through file, keeping only last 2 lines in memory + for line in reversed(deque(f, maxlen=2)): + try: + record = json.loads(line) + return (record['articleid'], record['revid']) + except (json.JSONDecodeError, KeyError): + continue + return None + except IOError as e: + print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr) + return None + + +def get_resume_point(output_file, partition_namespaces=False, input_file=None): """ Find the resume point(s) from existing output. - First checks for a checkpoint file (fast), then falls back to scanning - the parquet output (slow, for backwards compatibility). + For JSONL: reads last line of file (no checkpoint needed). + For Parquet: checks checkpoint file, falls back to scanning parquet. Args: output_file: Path to the output file. partition_namespaces: Whether the output uses namespace partitioning. + input_file: Path to input file (needed for .jsonl.d directory output). Returns: For single files: A tuple (pageid, revid) or (pageid, revid, part), or None. For partitioned: A dict mapping namespace -> (pageid, revid, part), or None. """ + # For JSONL, read resume point directly from last line (no checkpoint needed) + if output_file.endswith('.jsonl') or output_file.endswith('.jsonl.d'): + result = get_jsonl_resume_point(output_file, input_file) + if result: + print(f"Resume point found from JSONL: pageid={result[0]}, revid={result[1]}", file=sys.stderr) + return result + + # For Parquet, use checkpoint file (fast) checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces) if checkpoint_result is not None: print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr) return checkpoint_result - # For JSONL, only checkpoint-based resume is supported - if output_file.endswith('.jsonl'): - return None - # Fall back to scanning parquet (slow, for backwards compatibility) print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr) try: