532 lines
20 KiB
Python
532 lines
20 KiB
Python
"""
|
|
Checkpoint and resume functionality for wikiq output.
|
|
|
|
This module handles:
|
|
- Finding resume points in existing output (JSONL or Parquet)
|
|
- Merging resumed data with existing output (for Parquet, streaming, memory-efficient)
|
|
- Checkpoint file management for fast resume point lookup
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import deque
|
|
|
|
import pyarrow.parquet as pq
|
|
|
|
|
|
def get_checkpoint_path(output_file, partition_namespaces=False):
|
|
"""Get the path to the checkpoint file for a given output file.
|
|
|
|
For partitioned output, the checkpoint is placed outside the partition directory
|
|
to avoid pyarrow trying to read it as a parquet file. The filename includes
|
|
the output filename to keep it unique per input file (for parallel jobs).
|
|
"""
|
|
if partition_namespaces:
|
|
partition_dir = os.path.dirname(output_file)
|
|
output_filename = os.path.basename(output_file)
|
|
parent_dir = os.path.dirname(partition_dir)
|
|
return os.path.join(parent_dir, output_filename + ".checkpoint")
|
|
return str(output_file) + ".checkpoint"
|
|
|
|
|
|
def read_checkpoint(checkpoint_path, partition_namespaces=False):
|
|
"""
|
|
Read resume point from checkpoint file if it exists.
|
|
|
|
Checkpoint format:
|
|
Single file: {"pageid": 54, "revid": 325} or {"pageid": 54, "revid": 325, "part": 2}
|
|
Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...}
|
|
|
|
Returns:
|
|
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None if not found.
|
|
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
|
|
"""
|
|
if not os.path.exists(checkpoint_path):
|
|
return None
|
|
|
|
try:
|
|
with open(checkpoint_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
if not data:
|
|
return None
|
|
|
|
# Single-file format: {"pageid": ..., "revid": ..., "part": ...}
|
|
if "pageid" in data and "revid" in data:
|
|
part = data.get("part", 0)
|
|
if part > 0:
|
|
return (data["pageid"], data["revid"], part)
|
|
return (data["pageid"], data["revid"])
|
|
|
|
# Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...}
|
|
result = {}
|
|
for key, value in data.items():
|
|
part = value.get("part", 0)
|
|
result[int(key)] = (value["pageid"], value["revid"], part)
|
|
|
|
return result if result else None
|
|
|
|
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
|
|
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def cleanup_interrupted_resume(output_file, partition_namespaces):
|
|
"""
|
|
Merge any leftover .resume_temp files from a previous interrupted run.
|
|
|
|
This should be called BEFORE get_resume_point() so the resume point
|
|
is calculated from the merged data.
|
|
|
|
Returns:
|
|
None - no temp files found or normal merge completed
|
|
"start_fresh" - both original and temp were corrupted and deleted
|
|
"""
|
|
import shutil
|
|
|
|
if partition_namespaces:
|
|
partition_dir = os.path.dirname(output_file)
|
|
output_filename = os.path.basename(output_file)
|
|
temp_suffix = ".resume_temp"
|
|
|
|
if not os.path.isdir(partition_dir):
|
|
return
|
|
|
|
has_old_temp_files = False
|
|
for ns_dir in os.listdir(partition_dir):
|
|
if ns_dir.startswith('namespace='):
|
|
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
|
|
if os.path.exists(temp_path):
|
|
has_old_temp_files = True
|
|
break
|
|
|
|
if has_old_temp_files:
|
|
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
|
|
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
|
|
|
|
has_valid_data = False
|
|
for ns_dir in os.listdir(partition_dir):
|
|
if ns_dir.startswith('namespace='):
|
|
ns_path = os.path.join(partition_dir, ns_dir)
|
|
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')]
|
|
if parquet_files:
|
|
has_valid_data = True
|
|
break
|
|
|
|
if had_corruption and not has_valid_data:
|
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
|
if os.path.exists(checkpoint_path):
|
|
os.remove(checkpoint_path)
|
|
print("All partitioned files were corrupted, will start fresh.", file=sys.stderr)
|
|
return "start_fresh"
|
|
|
|
print("Previous temp files merged successfully.", file=sys.stderr)
|
|
else:
|
|
temp_output_file = output_file + ".resume_temp"
|
|
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
|
|
print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr)
|
|
merged_path = output_file + ".merged"
|
|
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
|
|
if merged == "original_only":
|
|
os.remove(temp_output_file)
|
|
elif merged == "temp_only":
|
|
if os.path.exists(output_file):
|
|
os.remove(output_file)
|
|
os.rename(temp_output_file, output_file)
|
|
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
|
|
elif merged == "both_invalid":
|
|
if os.path.exists(output_file):
|
|
os.remove(output_file)
|
|
if os.path.exists(temp_output_file):
|
|
os.remove(temp_output_file)
|
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
|
if os.path.exists(checkpoint_path):
|
|
os.remove(checkpoint_path)
|
|
print("Both files were corrupted, will start fresh.", file=sys.stderr)
|
|
return "start_fresh"
|
|
elif merged == "merged":
|
|
os.remove(output_file)
|
|
os.rename(merged_path, output_file)
|
|
os.remove(temp_output_file)
|
|
print("Previous temp file merged successfully.", file=sys.stderr)
|
|
else:
|
|
os.remove(temp_output_file)
|
|
|
|
|
|
def get_jsonl_resume_point(output_file, input_file=None):
|
|
"""Get resume point from last complete line of JSONL file.
|
|
|
|
For .jsonl.d directories, derives the file path from input_file using get_output_filename.
|
|
"""
|
|
# Handle .jsonl.d directory output
|
|
if output_file.endswith('.jsonl.d'):
|
|
if input_file is None:
|
|
return None
|
|
if os.path.isdir(output_file):
|
|
# Import here to avoid circular import
|
|
from wikiq import get_output_filename
|
|
jsonl_filename = os.path.basename(get_output_filename(input_file, 'jsonl'))
|
|
output_file = os.path.join(output_file, jsonl_filename)
|
|
print(f"Looking for resume point in: {output_file}", file=sys.stderr)
|
|
else:
|
|
return None
|
|
|
|
if not os.path.exists(output_file):
|
|
return None
|
|
|
|
try:
|
|
# Track positions of last two valid lines for potential truncation
|
|
valid_lines = deque(maxlen=2) # (end_position, record)
|
|
with open(output_file, 'rb') as f:
|
|
while True:
|
|
line = f.readline()
|
|
if not line:
|
|
break
|
|
try:
|
|
record = json.loads(line.decode('utf-8'))
|
|
valid_lines.append((f.tell(), record))
|
|
except (json.JSONDecodeError, KeyError, UnicodeDecodeError):
|
|
pass
|
|
|
|
if not valid_lines:
|
|
return None
|
|
|
|
last_valid_pos, last_valid_record = valid_lines[-1]
|
|
|
|
# Truncate if file extends past last valid line (corrupted trailing data)
|
|
file_size = os.path.getsize(output_file)
|
|
if last_valid_pos < file_size:
|
|
print(f"Truncating corrupted data from {output_file} ({file_size - last_valid_pos} bytes)", file=sys.stderr)
|
|
with open(output_file, 'r+b') as f:
|
|
f.truncate(last_valid_pos)
|
|
|
|
return (last_valid_record['articleid'], last_valid_record['revid'])
|
|
except IOError as e:
|
|
print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def get_resume_point(output_file, partition_namespaces=False, input_file=None):
|
|
"""
|
|
Find the resume point(s) from existing output.
|
|
|
|
For JSONL: reads last line of file (no checkpoint needed).
|
|
For Parquet: checks checkpoint file, falls back to scanning parquet.
|
|
|
|
Args:
|
|
output_file: Path to the output file.
|
|
partition_namespaces: Whether the output uses namespace partitioning.
|
|
input_file: Path to input file (needed for .jsonl.d directory output).
|
|
|
|
Returns:
|
|
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None.
|
|
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
|
|
"""
|
|
# For JSONL, read resume point directly from last line (no checkpoint needed)
|
|
if output_file.endswith('.jsonl') or output_file.endswith('.jsonl.d'):
|
|
result = get_jsonl_resume_point(output_file, input_file)
|
|
if result:
|
|
print(f"Resume point found from JSONL: pageid={result[0]}, revid={result[1]}", file=sys.stderr)
|
|
return result
|
|
|
|
# For Parquet, use checkpoint file (fast)
|
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
|
checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces)
|
|
if checkpoint_result is not None:
|
|
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
|
|
return checkpoint_result
|
|
|
|
# Fall back to scanning parquet (slow, for backwards compatibility)
|
|
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
|
|
try:
|
|
if partition_namespaces:
|
|
return _get_resume_point_partitioned(output_file)
|
|
else:
|
|
return _get_resume_point_single_file(output_file)
|
|
except Exception as e:
|
|
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _get_last_row_resume_point(pq_path):
|
|
"""Get resume point by reading only the last row group of a parquet file."""
|
|
pf = pq.ParquetFile(pq_path)
|
|
if pf.metadata.num_row_groups == 0:
|
|
return None
|
|
|
|
last_rg_idx = pf.metadata.num_row_groups - 1
|
|
table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid'])
|
|
if table.num_rows == 0:
|
|
return None
|
|
|
|
max_pageid = table['articleid'][-1].as_py()
|
|
max_revid = table['revid'][-1].as_py()
|
|
return (max_pageid, max_revid, 0)
|
|
|
|
|
|
def _get_resume_point_partitioned(output_file):
|
|
"""Find per-namespace resume points from partitioned output."""
|
|
partition_dir = os.path.dirname(output_file)
|
|
output_filename = os.path.basename(output_file)
|
|
|
|
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
|
|
return None
|
|
|
|
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
|
if not namespace_dirs:
|
|
return None
|
|
|
|
resume_points = {}
|
|
for ns_dir in namespace_dirs:
|
|
ns = int(ns_dir.split('=')[1])
|
|
pq_path = os.path.join(partition_dir, ns_dir, output_filename)
|
|
|
|
if not os.path.exists(pq_path):
|
|
continue
|
|
|
|
try:
|
|
result = _get_last_row_resume_point(pq_path)
|
|
if result is not None:
|
|
resume_points[ns] = result
|
|
except Exception as e:
|
|
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
|
|
continue
|
|
|
|
return resume_points if resume_points else None
|
|
|
|
|
|
def _get_resume_point_single_file(output_file):
|
|
"""Find resume point from a single parquet file."""
|
|
if not os.path.exists(output_file):
|
|
return None
|
|
|
|
if os.path.isdir(output_file):
|
|
return None
|
|
|
|
return _get_last_row_resume_point(output_file)
|
|
|
|
|
|
def merge_parquet_files(original_path, temp_path, merged_path):
|
|
"""
|
|
Merge two parquet files by streaming row groups.
|
|
|
|
Returns:
|
|
"merged" - merged file was created from both sources
|
|
"original_only" - temp was invalid, keep original unchanged
|
|
"temp_only" - original was corrupted but temp is valid
|
|
"both_invalid" - both files invalid
|
|
False - both files were valid but empty
|
|
"""
|
|
original_valid = False
|
|
temp_valid = False
|
|
original_pq = None
|
|
temp_pq = None
|
|
|
|
try:
|
|
original_pq = pq.ParquetFile(original_path)
|
|
original_valid = True
|
|
except Exception as e:
|
|
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
|
|
|
|
try:
|
|
if not os.path.exists(temp_path):
|
|
print(f"Note: Temp file {temp_path} does not exist", file=sys.stderr)
|
|
else:
|
|
temp_pq = pq.ParquetFile(temp_path)
|
|
temp_valid = True
|
|
except Exception:
|
|
print(f"Note: No new data in temp file {temp_path}", file=sys.stderr)
|
|
|
|
if not original_valid and not temp_valid:
|
|
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
|
|
return "both_invalid"
|
|
|
|
if not original_valid and temp_valid:
|
|
print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr)
|
|
return "temp_only"
|
|
|
|
if original_valid and not temp_valid:
|
|
return "original_only"
|
|
|
|
merged_writer = None
|
|
|
|
for i in range(original_pq.num_row_groups):
|
|
row_group = original_pq.read_row_group(i)
|
|
if merged_writer is None:
|
|
merged_writer = pq.ParquetWriter(
|
|
merged_path,
|
|
row_group.schema,
|
|
flavor="spark"
|
|
)
|
|
merged_writer.write_table(row_group)
|
|
|
|
for i in range(temp_pq.num_row_groups):
|
|
row_group = temp_pq.read_row_group(i)
|
|
if merged_writer is None:
|
|
merged_writer = pq.ParquetWriter(
|
|
merged_path,
|
|
row_group.schema,
|
|
flavor="spark"
|
|
)
|
|
merged_writer.write_table(row_group)
|
|
|
|
if merged_writer is not None:
|
|
merged_writer.close()
|
|
return "merged"
|
|
return False
|
|
|
|
|
|
def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
|
|
"""
|
|
Merge partitioned namespace directories after resume.
|
|
|
|
Returns:
|
|
True if at least one namespace has valid data after merge
|
|
False if all namespaces ended up with corrupted/deleted data
|
|
"""
|
|
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
|
had_corruption = False
|
|
expected_temp = file_filter + temp_suffix
|
|
|
|
for ns_dir in namespace_dirs:
|
|
ns_path = os.path.join(partition_dir, ns_dir)
|
|
temp_path = os.path.join(ns_path, expected_temp)
|
|
|
|
if not os.path.exists(temp_path):
|
|
continue
|
|
|
|
original_file = file_filter
|
|
original_path = os.path.join(ns_path, original_file)
|
|
|
|
if os.path.exists(original_path):
|
|
merged_path = original_path + ".merged"
|
|
merged = merge_parquet_files(original_path, temp_path, merged_path)
|
|
|
|
if merged == "original_only":
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
elif merged == "temp_only":
|
|
os.remove(original_path)
|
|
os.rename(temp_path, original_path)
|
|
elif merged == "both_invalid":
|
|
if os.path.exists(original_path):
|
|
os.remove(original_path)
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
had_corruption = True
|
|
elif merged == "merged":
|
|
os.remove(original_path)
|
|
os.rename(merged_path, original_path)
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
else:
|
|
if os.path.exists(original_path):
|
|
os.remove(original_path)
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
else:
|
|
try:
|
|
pq.ParquetFile(temp_path)
|
|
os.rename(temp_path, original_path)
|
|
except Exception:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
had_corruption = True
|
|
|
|
return had_corruption
|
|
|
|
|
|
def finalize_resume_merge(
|
|
original_output_file,
|
|
temp_output_file,
|
|
partition_namespaces,
|
|
original_partition_dir
|
|
):
|
|
"""
|
|
Finalize the resume by merging temp output with original output.
|
|
"""
|
|
import shutil
|
|
|
|
print("Merging resumed data with existing output...", file=sys.stderr)
|
|
try:
|
|
if partition_namespaces and original_partition_dir is not None:
|
|
file_filter = os.path.basename(original_output_file)
|
|
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
|
|
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
|
|
shutil.rmtree(temp_output_file)
|
|
else:
|
|
merged_output_file = original_output_file + ".merged"
|
|
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
|
|
|
|
if merged == "original_only":
|
|
if os.path.exists(temp_output_file):
|
|
os.remove(temp_output_file)
|
|
elif merged == "temp_only":
|
|
os.remove(original_output_file)
|
|
os.rename(temp_output_file, original_output_file)
|
|
elif merged == "both_invalid":
|
|
os.remove(original_output_file)
|
|
if os.path.exists(temp_output_file):
|
|
os.remove(temp_output_file)
|
|
elif merged == "merged":
|
|
os.remove(original_output_file)
|
|
os.rename(merged_output_file, original_output_file)
|
|
if os.path.exists(temp_output_file):
|
|
os.remove(temp_output_file)
|
|
else:
|
|
os.remove(original_output_file)
|
|
if os.path.exists(temp_output_file):
|
|
os.remove(temp_output_file)
|
|
|
|
print("Merge complete.", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr)
|
|
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
|
|
raise
|
|
|
|
|
|
def setup_resume_temp_output(output_file, partition_namespaces):
|
|
"""
|
|
Set up temp output for resume mode (Parquet only).
|
|
|
|
Returns:
|
|
Tuple of (original_output_file, temp_output_file, original_partition_dir)
|
|
or (None, None, None) if no existing output to resume from.
|
|
"""
|
|
import shutil
|
|
|
|
original_output_file = None
|
|
temp_output_file = None
|
|
original_partition_dir = None
|
|
|
|
if partition_namespaces:
|
|
partition_dir = os.path.dirname(output_file)
|
|
output_filename = os.path.basename(output_file)
|
|
output_exists = False
|
|
if os.path.isdir(partition_dir):
|
|
for d in os.listdir(partition_dir):
|
|
if d.startswith('namespace='):
|
|
if os.path.exists(os.path.join(partition_dir, d, output_filename)):
|
|
output_exists = True
|
|
break
|
|
if output_exists:
|
|
original_partition_dir = partition_dir
|
|
else:
|
|
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
|
|
|
|
if output_exists:
|
|
original_output_file = output_file
|
|
temp_output_file = output_file + ".resume_temp"
|
|
|
|
if os.path.exists(temp_output_file):
|
|
if os.path.isdir(temp_output_file):
|
|
shutil.rmtree(temp_output_file)
|
|
else:
|
|
os.remove(temp_output_file)
|
|
|
|
if partition_namespaces:
|
|
os.makedirs(temp_output_file, exist_ok=True)
|
|
|
|
return original_output_file, temp_output_file, original_partition_dir
|