refactor and enable jsonl output.

This commit is contained in:
Nathan TeBlunthuis
2025-12-21 23:42:18 -08:00
parent 6988a281dc
commit 3f1a9ba862
7 changed files with 1429 additions and 1242 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,9 @@
"""
Checkpoint and resume functionality for wikiq parquet output.
Checkpoint and resume functionality for wikiq output.
This module handles:
- Finding resume points in existing parquet output
- Merging resumed data with existing output (streaming, memory-efficient)
- Finding resume points in existing output (JSONL or Parquet)
- Merging resumed data with existing output (for Parquet, streaming, memory-efficient)
- Checkpoint file management for fast resume point lookup
"""
@@ -14,6 +14,63 @@ import sys
import pyarrow.parquet as pq
def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file.
For partitioned output, the checkpoint is placed outside the partition directory
to avoid pyarrow trying to read it as a parquet file. The filename includes
the output filename to keep it unique per input file (for parallel jobs).
"""
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
parent_dir = os.path.dirname(partition_dir)
return os.path.join(parent_dir, output_filename + ".checkpoint")
return str(output_file) + ".checkpoint"
def read_checkpoint(checkpoint_path, partition_namespaces=False):
"""
Read resume point from checkpoint file if it exists.
Checkpoint format:
Single file: {"pageid": 54, "revid": 325} or {"pageid": 54, "revid": 325, "part": 2}
Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...}
Returns:
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
"""
if not os.path.exists(checkpoint_path):
return None
try:
with open(checkpoint_path, 'r') as f:
data = json.load(f)
if not data:
return None
# Single-file format: {"pageid": ..., "revid": ..., "part": ...}
if "pageid" in data and "revid" in data:
part = data.get("part", 0)
if part > 0:
return (data["pageid"], data["revid"], part)
return (data["pageid"], data["revid"])
# Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...}
result = {}
for key, value in data.items():
part = value.get("part", 0)
result[int(key)] = (value["pageid"], value["revid"], part)
return result if result else None
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
return None
def cleanup_interrupted_resume(output_file, partition_namespaces):
"""
Merge any leftover .resume_temp files from a previous interrupted run.
@@ -47,7 +104,6 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
# Check if any valid data remains after merge
has_valid_data = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
@@ -58,7 +114,6 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
break
if had_corruption and not has_valid_data:
# All data was corrupted, remove checkpoint and start fresh
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
@@ -73,21 +128,17 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
merged_path = output_file + ".merged"
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
if merged == "original_only":
# Temp file was invalid, just remove it
os.remove(temp_output_file)
elif merged == "temp_only":
# Original was corrupted or missing, use temp as new base
if os.path.exists(output_file):
os.remove(output_file)
os.rename(temp_output_file, output_file)
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
elif merged == "both_invalid":
# Both files corrupted or missing, remove both and start fresh
if os.path.exists(output_file):
os.remove(output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
# Also remove stale checkpoint file
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
@@ -99,95 +150,34 @@ def cleanup_interrupted_resume(output_file, partition_namespaces):
os.remove(temp_output_file)
print("Previous temp file merged successfully.", file=sys.stderr)
else:
# Both empty - unusual
os.remove(temp_output_file)
def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file.
For partitioned output, the checkpoint is placed outside the partition directory
to avoid pyarrow trying to read it as a parquet file. The filename includes
the output filename to keep it unique per input file (for parallel jobs).
"""
if partition_namespaces:
# output_file is like partition_dir/output.parquet
# checkpoint should be at parent level: parent/output.parquet.checkpoint
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
parent_dir = os.path.dirname(partition_dir)
return os.path.join(parent_dir, output_filename + ".checkpoint")
return str(output_file) + ".checkpoint"
def read_checkpoint(output_file, partition_namespaces=False):
"""
Read resume point from checkpoint file if it exists.
Checkpoint format:
Single file: {"pageid": 54, "revid": 325, "part": 2}
Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...}
Returns:
For single files: A tuple (pageid, revid, part), or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
Note: part defaults to 0 for checkpoints without part numbers (backwards compat).
"""
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if not os.path.exists(checkpoint_path):
return None
try:
with open(checkpoint_path, 'r') as f:
data = json.load(f)
if not data:
return None
# Single-file format: {"pageid": ..., "revid": ..., "part": ...}
if "pageid" in data and "revid" in data:
part = data.get("part", 0)
return (data["pageid"], data["revid"], part)
# Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...}
result = {}
for key, value in data.items():
part = value.get("part", 0)
result[int(key)] = (value["pageid"], value["revid"], part)
return result if result else None
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
return None
def get_resume_point(output_file, partition_namespaces=False):
"""
Find the resume point(s) from existing parquet output.
Find the resume point(s) from existing output.
First checks for a checkpoint file (fast), then falls back to scanning
the parquet output (slow, for backwards compatibility).
Args:
output_file: Path to the output file. For single files, this is the parquet file path.
For partitioned namespaces, this is the path like dir/dump.parquet where
namespace=* subdirectories are in the parent dir.
output_file: Path to the output file.
partition_namespaces: Whether the output uses namespace partitioning.
Returns:
For single files: A tuple (pageid, revid, part) or None if not found.
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None.
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
When falling back to parquet scanning, part defaults to 0.
"""
# First try checkpoint file (fast)
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
checkpoint_result = read_checkpoint(output_file, partition_namespaces)
checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces)
if checkpoint_result is not None:
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
return checkpoint_result
# For JSONL, only checkpoint-based resume is supported
if output_file.endswith('.jsonl'):
return None
# Fall back to scanning parquet (slow, for backwards compatibility)
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
try:
@@ -201,12 +191,7 @@ def get_resume_point(output_file, partition_namespaces=False):
def _get_last_row_resume_point(pq_path):
"""Get resume point by reading only the last row group of a parquet file.
Since data is written in page/revision order, the last row group contains
the highest pageid/revid, and the last row in that group is the resume point.
Returns (pageid, revid, part) with part=0 (scanning can't determine part).
"""
"""Get resume point by reading only the last row group of a parquet file."""
pf = pq.ParquetFile(pq_path)
if pf.metadata.num_row_groups == 0:
return None
@@ -222,16 +207,7 @@ def _get_last_row_resume_point(pq_path):
def _get_resume_point_partitioned(output_file):
"""Find per-namespace resume points from partitioned output.
Only looks for the specific output file in each namespace directory.
Returns a dict mapping namespace -> (max_pageid, max_revid, part=0) for each
partition where the output file exists.
Args:
output_file: Path like 'dir/output.parquet' where namespace=* subdirectories
contain files named 'output.parquet'.
"""
"""Find per-namespace resume points from partitioned output."""
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
@@ -274,14 +250,13 @@ def _get_resume_point_single_file(output_file):
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by streaming row groups from original and temp into merged.
Merge two parquet files by streaming row groups.
This is memory-efficient: only one row group is loaded at a time.
Returns:
"merged" - merged file was created from both sources
"original_only" - temp was invalid, keep original unchanged
"temp_only" - original was corrupted but temp is valid, use temp
"both_invalid" - both files invalid, delete both and start fresh
"temp_only" - original was corrupted but temp is valid
"both_invalid" - both files invalid
False - both files were valid but empty
"""
original_valid = False
@@ -297,12 +272,12 @@ def merge_parquet_files(original_path, temp_path, merged_path):
try:
if not os.path.exists(temp_path):
print(f"Note: Temp file {temp_path} does not exist (namespace had no records after resume point)", file=sys.stderr)
print(f"Note: Temp file {temp_path} does not exist", file=sys.stderr)
else:
temp_pq = pq.ParquetFile(temp_path)
temp_valid = True
except Exception:
print(f"Note: No new data in temp file {temp_path} (file exists but is invalid)", file=sys.stderr)
print(f"Note: No new data in temp file {temp_path}", file=sys.stderr)
if not original_valid and not temp_valid:
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
@@ -317,7 +292,6 @@ def merge_parquet_files(original_path, temp_path, merged_path):
merged_writer = None
# Copy all row groups from the original file
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
@@ -328,7 +302,6 @@ def merge_parquet_files(original_path, temp_path, merged_path):
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
if merged_writer is None:
@@ -339,7 +312,6 @@ def merge_parquet_files(original_path, temp_path, merged_path):
)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
return "merged"
@@ -350,16 +322,6 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
"""
Merge partitioned namespace directories after resume.
For partitioned namespaces, temp files are written alongside the original files
in each namespace directory with the temp suffix appended to the filename.
E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp
Args:
partition_dir: The partition directory containing namespace=* subdirs
temp_suffix: The suffix appended to temp files (e.g., '.resume_temp')
file_filter: Only process temp files matching this base name
(e.g., 'enwiki-20250123-pages-meta-history24-p53238682p53445302.parquet')
Returns:
True if at least one namespace has valid data after merge
False if all namespaces ended up with corrupted/deleted data
@@ -375,49 +337,40 @@ def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
if not os.path.exists(temp_path):
continue
# Original file is the temp file without the suffix
original_file = file_filter
original_path = os.path.join(ns_path, original_file)
if os.path.exists(original_path):
# Merge the files
merged_path = original_path + ".merged"
merged = merge_parquet_files(original_path, temp_path, merged_path)
if merged == "original_only":
# Temp file was invalid (no new data), keep original unchanged
if os.path.exists(temp_path):
os.remove(temp_path)
elif merged == "temp_only":
# Original was corrupted, use temp as new base
os.remove(original_path)
os.rename(temp_path, original_path)
elif merged == "both_invalid":
# Both files corrupted, remove both
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
elif merged == "merged":
# Replace the original file with the merged file
os.remove(original_path)
os.rename(merged_path, original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
# Both files were empty (False), just remove them
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
# No original file, rename temp to original only if valid
try:
pq.ParquetFile(temp_path)
os.rename(temp_path, original_path)
except Exception:
# Temp file invalid or missing, just remove it if it exists
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
@@ -433,55 +386,36 @@ def finalize_resume_merge(
):
"""
Finalize the resume by merging temp output with original output.
Args:
original_output_file: Path to the original output file
temp_output_file: Path to the temp output file written during resume
partition_namespaces: Whether using partitioned namespace output
original_partition_dir: The partition directory (for partitioned output)
Raises:
Exception: If merge fails (temp file is preserved for recovery)
"""
import shutil
print("Merging resumed data with existing output...", file=sys.stderr)
try:
if partition_namespaces and original_partition_dir is not None:
# For partitioned namespaces, temp files are written alongside originals
# with '.resume_temp' suffix in each namespace directory.
# Only merge temp files for the current dump file, not other concurrent jobs.
file_filter = os.path.basename(original_output_file)
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
# Clean up the empty temp directory we created
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
# Merge single parquet files
merged_output_file = original_output_file + ".merged"
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
if merged == "original_only":
# Temp file was invalid (no new data), keep original unchanged
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "temp_only":
# Original was corrupted, use temp as new base
os.remove(original_output_file)
os.rename(temp_output_file, original_output_file)
elif merged == "both_invalid":
# Both files corrupted, remove both
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "merged":
# Replace the original file with the merged file
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
else:
# Both files were empty (False) - unusual, but clean up
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
@@ -495,11 +429,7 @@ def finalize_resume_merge(
def setup_resume_temp_output(output_file, partition_namespaces):
"""
Set up temp output for resume mode.
Args:
output_file: The original output file path
partition_namespaces: Whether using partitioned namespace output
Set up temp output for resume mode (Parquet only).
Returns:
Tuple of (original_output_file, temp_output_file, original_partition_dir)
@@ -511,7 +441,6 @@ def setup_resume_temp_output(output_file, partition_namespaces):
temp_output_file = None
original_partition_dir = None
# For partitioned namespaces, check if the specific output file exists in any namespace
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
@@ -531,9 +460,6 @@ def setup_resume_temp_output(output_file, partition_namespaces):
original_output_file = output_file
temp_output_file = output_file + ".resume_temp"
# Note: cleanup_interrupted_resume() should have been called before this
# to merge any leftover temp files from a previous interrupted run.
# Here we just clean up any remaining temp directory markers.
if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)

View File

@@ -17,9 +17,6 @@ T = TypeVar('T')
class RevisionField(ABC, Generic[T]):
def __init__(self):
self.data: list[T] = []
"""
Abstract type which represents a field in a table of page revisions.
"""
@@ -43,14 +40,6 @@ class RevisionField(ABC, Generic[T]):
"""
pass
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> None:
self.data.append(self.extract(page, revisions))
def pop(self) -> list[T]:
data = self.data
self.data = []
return data
class RevisionTable:
columns: list[RevisionField]
@@ -58,19 +47,15 @@ class RevisionTable:
def __init__(self, columns: list[RevisionField]):
self.columns = columns
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]):
for column in self.columns:
column.add(page=page, revisions=revisions)
def schema(self) -> pa.Schema:
return pa.schema([c.field for c in self.columns])
def pop(self) -> dict:
data = dict()
for column in self.columns:
data[column.field.name] = column.pop()
return data
def extract_row(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> dict:
"""Extract a single row dict for the given page and revisions."""
return {
column.field.name: column.extract(page, revisions)
for column in self.columns
}
class RevisionId(RevisionField[int]):