make updating the checkpoint files atomic.

This commit is contained in:
Nathan TeBlunthuis
2025-12-23 08:41:38 -08:00
parent d822085698
commit 9e6b0fb64c

View File

@@ -587,8 +587,8 @@ class WikiqParser:
else: else:
self.output_file = open(output_file, "wb") self.output_file = open(output_file, "wb")
# Checkpoint file for tracking resume point # Checkpoint for tracking resume point (path only, no open file handle for NFS safety)
self.checkpoint_file = None self.checkpoint_path = None
self.checkpoint_state = {} # namespace -> (pageid, revid) or None -> (pageid, revid) self.checkpoint_state = {} # namespace -> (pageid, revid) or None -> (pageid, revid)
def request_shutdown(self): def request_shutdown(self):
@@ -624,39 +624,40 @@ class WikiqParser:
return path.parent / f"{path.stem}.part{part_num}{path.suffix}" return path.parent / f"{path.stem}.part{part_num}{path.suffix}"
def _open_checkpoint(self, output_file): def _open_checkpoint(self, output_file):
"""Open checkpoint file for writing. Keeps file open for performance.""" """Enable checkpointing for the given output file."""
if (not self.output_jsonl and not self.output_parquet) or output_file == sys.stdout.buffer: if (not self.output_jsonl and not self.output_parquet) or output_file == sys.stdout.buffer:
return return
checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces) self.checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces)
Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True) Path(self.checkpoint_path).parent.mkdir(parents=True, exist_ok=True)
self.checkpoint_file = open(checkpoint_path, 'w') print(f"Checkpoint enabled: {self.checkpoint_path}", file=sys.stderr)
print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr)
def _update_checkpoint(self, pageid, revid, namespace=None, part=0): def _update_checkpoint(self, pageid, revid, namespace=None, part=0):
"""Update checkpoint state and write to file.""" """Update checkpoint state and write atomically (NFS-safe)."""
if self.checkpoint_file is None: if self.checkpoint_path is None:
return return
if self.partition_namespaces: if self.partition_namespaces:
self.checkpoint_state[namespace] = {"pageid": pageid, "revid": revid, "part": part} self.checkpoint_state[namespace] = {"pageid": pageid, "revid": revid, "part": part}
else: else:
self.checkpoint_state = {"pageid": pageid, "revid": revid, "part": part} self.checkpoint_state = {"pageid": pageid, "revid": revid, "part": part}
self.checkpoint_file.seek(0) # Atomic write: write to temp file, then rename
self.checkpoint_file.truncate() temp_path = self.checkpoint_path + ".tmp"
json.dump(self.checkpoint_state, self.checkpoint_file) with open(temp_path, 'w') as f:
self.checkpoint_file.flush() json.dump(self.checkpoint_state, f)
os.replace(temp_path, self.checkpoint_path)
def _close_checkpoint(self, delete=False): def _close_checkpoint(self, delete=False):
"""Close checkpoint file, optionally deleting it.""" """Clean up checkpoint, optionally deleting it."""
if self.checkpoint_file is None: if self.checkpoint_path is None:
return return
checkpoint_path = self.checkpoint_file.name if delete and os.path.exists(self.checkpoint_path):
self.checkpoint_file.close() os.remove(self.checkpoint_path)
self.checkpoint_file = None print(f"Checkpoint deleted: {self.checkpoint_path}", file=sys.stderr)
if delete and os.path.exists(checkpoint_path): elif os.path.exists(self.checkpoint_path):
os.remove(checkpoint_path) print(f"Checkpoint preserved for resume: {self.checkpoint_path}", file=sys.stderr)
print(f"Checkpoint file deleted (processing complete): {checkpoint_path}", file=sys.stderr) # Clean up any leftover temp file
else: temp_path = self.checkpoint_path + ".tmp"
print(f"Checkpoint file preserved for resume: {checkpoint_path}", file=sys.stderr) if os.path.exists(temp_path):
os.remove(temp_path)
def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_base_paths, sorting_cols, namespace=None, part_numbers=None): def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_base_paths, sorting_cols, namespace=None, part_numbers=None):
"""Write a batch of rows to the appropriate writer. """Write a batch of rows to the appropriate writer.