diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 6172fa4..33dcca8 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -587,8 +587,8 @@ class WikiqParser: else: self.output_file = open(output_file, "wb") - # Checkpoint file for tracking resume point - self.checkpoint_file = None + # Checkpoint for tracking resume point (path only, no open file handle for NFS safety) + self.checkpoint_path = None self.checkpoint_state = {} # namespace -> (pageid, revid) or None -> (pageid, revid) def request_shutdown(self): @@ -624,39 +624,40 @@ class WikiqParser: return path.parent / f"{path.stem}.part{part_num}{path.suffix}" def _open_checkpoint(self, output_file): - """Open checkpoint file for writing. Keeps file open for performance.""" + """Enable checkpointing for the given output file.""" if (not self.output_jsonl and not self.output_parquet) or output_file == sys.stdout.buffer: return - checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces) - Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True) - self.checkpoint_file = open(checkpoint_path, 'w') - print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr) + self.checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces) + Path(self.checkpoint_path).parent.mkdir(parents=True, exist_ok=True) + print(f"Checkpoint enabled: {self.checkpoint_path}", file=sys.stderr) def _update_checkpoint(self, pageid, revid, namespace=None, part=0): - """Update checkpoint state and write to file.""" - if self.checkpoint_file is None: + """Update checkpoint state and write atomically (NFS-safe).""" + if self.checkpoint_path is None: return if self.partition_namespaces: self.checkpoint_state[namespace] = {"pageid": pageid, "revid": revid, "part": part} else: self.checkpoint_state = {"pageid": pageid, "revid": revid, "part": part} - self.checkpoint_file.seek(0) - self.checkpoint_file.truncate() - json.dump(self.checkpoint_state, self.checkpoint_file) - self.checkpoint_file.flush() + # Atomic write: write to temp file, then rename + temp_path = self.checkpoint_path + ".tmp" + with open(temp_path, 'w') as f: + json.dump(self.checkpoint_state, f) + os.replace(temp_path, self.checkpoint_path) def _close_checkpoint(self, delete=False): - """Close checkpoint file, optionally deleting it.""" - if self.checkpoint_file is None: + """Clean up checkpoint, optionally deleting it.""" + if self.checkpoint_path is None: return - checkpoint_path = self.checkpoint_file.name - self.checkpoint_file.close() - self.checkpoint_file = None - if delete and os.path.exists(checkpoint_path): - os.remove(checkpoint_path) - print(f"Checkpoint file deleted (processing complete): {checkpoint_path}", file=sys.stderr) - else: - print(f"Checkpoint file preserved for resume: {checkpoint_path}", file=sys.stderr) + if delete and os.path.exists(self.checkpoint_path): + os.remove(self.checkpoint_path) + print(f"Checkpoint deleted: {self.checkpoint_path}", file=sys.stderr) + elif os.path.exists(self.checkpoint_path): + print(f"Checkpoint preserved for resume: {self.checkpoint_path}", file=sys.stderr) + # Clean up any leftover temp file + temp_path = self.checkpoint_path + ".tmp" + if os.path.exists(temp_path): + os.remove(temp_path) def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_base_paths, sorting_cols, namespace=None, part_numbers=None): """Write a batch of rows to the appropriate writer.