""" Checkpoint and resume functionality for wikiq output. This module handles: - Finding resume points in existing output (JSONL or Parquet) - Merging resumed data with existing output (for Parquet, streaming, memory-efficient) - Checkpoint file management for fast resume point lookup """ import json import os import sys from collections import deque import pyarrow.parquet as pq def get_checkpoint_path(output_file, partition_namespaces=False): """Get the path to the checkpoint file for a given output file. For partitioned output, the checkpoint is placed outside the partition directory to avoid pyarrow trying to read it as a parquet file. The filename includes the output filename to keep it unique per input file (for parallel jobs). """ if partition_namespaces: partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) parent_dir = os.path.dirname(partition_dir) return os.path.join(parent_dir, output_filename + ".checkpoint") return str(output_file) + ".checkpoint" def read_checkpoint(checkpoint_path, partition_namespaces=False): """ Read resume point from checkpoint file if it exists. Checkpoint format: Single file: {"pageid": 54, "revid": 325} or {"pageid": 54, "revid": 325, "part": 2} Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...} Returns: For single files: A tuple (pageid, revid) or (pageid, revid, part), or None if not found. For partitioned: A dict mapping namespace -> (pageid, revid, part), or None. """ if not os.path.exists(checkpoint_path): return None try: with open(checkpoint_path, 'r') as f: data = json.load(f) if not data: return None # Single-file format: {"pageid": ..., "revid": ..., "part": ...} if "pageid" in data and "revid" in data: part = data.get("part", 0) if part > 0: return (data["pageid"], data["revid"], part) return (data["pageid"], data["revid"]) # Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...} result = {} for key, value in data.items(): part = value.get("part", 0) result[int(key)] = (value["pageid"], value["revid"], part) return result if result else None except (json.JSONDecodeError, IOError, KeyError, TypeError) as e: print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr) return None def cleanup_interrupted_resume(output_file, partition_namespaces): """ Merge any leftover .resume_temp files from a previous interrupted run. This should be called BEFORE get_resume_point() so the resume point is calculated from the merged data. Returns: None - no temp files found or normal merge completed "start_fresh" - both original and temp were corrupted and deleted """ import shutil if partition_namespaces: partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) temp_suffix = ".resume_temp" if not os.path.isdir(partition_dir): return has_old_temp_files = False for ns_dir in os.listdir(partition_dir): if ns_dir.startswith('namespace='): temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix) if os.path.exists(temp_path): has_old_temp_files = True break if has_old_temp_files: print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr) had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename) has_valid_data = False for ns_dir in os.listdir(partition_dir): if ns_dir.startswith('namespace='): ns_path = os.path.join(partition_dir, ns_dir) parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')] if parquet_files: has_valid_data = True break if had_corruption and not has_valid_data: checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if os.path.exists(checkpoint_path): os.remove(checkpoint_path) print("All partitioned files were corrupted, will start fresh.", file=sys.stderr) return "start_fresh" print("Previous temp files merged successfully.", file=sys.stderr) else: temp_output_file = output_file + ".resume_temp" if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file): print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr) merged_path = output_file + ".merged" merged = merge_parquet_files(output_file, temp_output_file, merged_path) if merged == "original_only": os.remove(temp_output_file) elif merged == "temp_only": if os.path.exists(output_file): os.remove(output_file) os.rename(temp_output_file, output_file) print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr) elif merged == "both_invalid": if os.path.exists(output_file): os.remove(output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if os.path.exists(checkpoint_path): os.remove(checkpoint_path) print("Both files were corrupted, will start fresh.", file=sys.stderr) return "start_fresh" elif merged == "merged": os.remove(output_file) os.rename(merged_path, output_file) os.remove(temp_output_file) print("Previous temp file merged successfully.", file=sys.stderr) else: os.remove(temp_output_file) def get_jsonl_resume_point(output_file, input_file=None): """Get resume point from last complete line of JSONL file. For .jsonl.d directories, derives the file path from input_file using get_output_filename. """ # Handle .jsonl.d directory output if output_file.endswith('.jsonl.d'): if input_file is None: return None if os.path.isdir(output_file): # Import here to avoid circular import from wikiq import get_output_filename jsonl_filename = os.path.basename(get_output_filename(input_file, 'jsonl')) output_file = os.path.join(output_file, jsonl_filename) else: return None if not os.path.exists(output_file): return None try: with open(output_file) as f: # Stream through file, keeping only last 2 lines in memory for line in reversed(deque(f, maxlen=2)): try: record = json.loads(line) return (record['articleid'], record['revid']) except (json.JSONDecodeError, KeyError): continue return None except IOError as e: print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr) return None def get_resume_point(output_file, partition_namespaces=False, input_file=None): """ Find the resume point(s) from existing output. For JSONL: reads last line of file (no checkpoint needed). For Parquet: checks checkpoint file, falls back to scanning parquet. Args: output_file: Path to the output file. partition_namespaces: Whether the output uses namespace partitioning. input_file: Path to input file (needed for .jsonl.d directory output). Returns: For single files: A tuple (pageid, revid) or (pageid, revid, part), or None. For partitioned: A dict mapping namespace -> (pageid, revid, part), or None. """ # For JSONL, read resume point directly from last line (no checkpoint needed) if output_file.endswith('.jsonl') or output_file.endswith('.jsonl.d'): result = get_jsonl_resume_point(output_file, input_file) if result: print(f"Resume point found from JSONL: pageid={result[0]}, revid={result[1]}", file=sys.stderr) return result # For Parquet, use checkpoint file (fast) checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces) if checkpoint_result is not None: print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr) return checkpoint_result # Fall back to scanning parquet (slow, for backwards compatibility) print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr) try: if partition_namespaces: return _get_resume_point_partitioned(output_file) else: return _get_resume_point_single_file(output_file) except Exception as e: print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr) return None def _get_last_row_resume_point(pq_path): """Get resume point by reading only the last row group of a parquet file.""" pf = pq.ParquetFile(pq_path) if pf.metadata.num_row_groups == 0: return None last_rg_idx = pf.metadata.num_row_groups - 1 table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid']) if table.num_rows == 0: return None max_pageid = table['articleid'][-1].as_py() max_revid = table['revid'][-1].as_py() return (max_pageid, max_revid, 0) def _get_resume_point_partitioned(output_file): """Find per-namespace resume points from partitioned output.""" partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir): return None namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] if not namespace_dirs: return None resume_points = {} for ns_dir in namespace_dirs: ns = int(ns_dir.split('=')[1]) pq_path = os.path.join(partition_dir, ns_dir, output_filename) if not os.path.exists(pq_path): continue try: result = _get_last_row_resume_point(pq_path) if result is not None: resume_points[ns] = result except Exception as e: print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr) continue return resume_points if resume_points else None def _get_resume_point_single_file(output_file): """Find resume point from a single parquet file.""" if not os.path.exists(output_file): return None if os.path.isdir(output_file): return None return _get_last_row_resume_point(output_file) def merge_parquet_files(original_path, temp_path, merged_path): """ Merge two parquet files by streaming row groups. Returns: "merged" - merged file was created from both sources "original_only" - temp was invalid, keep original unchanged "temp_only" - original was corrupted but temp is valid "both_invalid" - both files invalid False - both files were valid but empty """ original_valid = False temp_valid = False original_pq = None temp_pq = None try: original_pq = pq.ParquetFile(original_path) original_valid = True except Exception as e: print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr) try: if not os.path.exists(temp_path): print(f"Note: Temp file {temp_path} does not exist", file=sys.stderr) else: temp_pq = pq.ParquetFile(temp_path) temp_valid = True except Exception: print(f"Note: No new data in temp file {temp_path}", file=sys.stderr) if not original_valid and not temp_valid: print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr) return "both_invalid" if not original_valid and temp_valid: print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr) return "temp_only" if original_valid and not temp_valid: return "original_only" merged_writer = None for i in range(original_pq.num_row_groups): row_group = original_pq.read_row_group(i) if merged_writer is None: merged_writer = pq.ParquetWriter( merged_path, row_group.schema, flavor="spark" ) merged_writer.write_table(row_group) for i in range(temp_pq.num_row_groups): row_group = temp_pq.read_row_group(i) if merged_writer is None: merged_writer = pq.ParquetWriter( merged_path, row_group.schema, flavor="spark" ) merged_writer.write_table(row_group) if merged_writer is not None: merged_writer.close() return "merged" return False def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter): """ Merge partitioned namespace directories after resume. Returns: True if at least one namespace has valid data after merge False if all namespaces ended up with corrupted/deleted data """ namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] had_corruption = False expected_temp = file_filter + temp_suffix for ns_dir in namespace_dirs: ns_path = os.path.join(partition_dir, ns_dir) temp_path = os.path.join(ns_path, expected_temp) if not os.path.exists(temp_path): continue original_file = file_filter original_path = os.path.join(ns_path, original_file) if os.path.exists(original_path): merged_path = original_path + ".merged" merged = merge_parquet_files(original_path, temp_path, merged_path) if merged == "original_only": if os.path.exists(temp_path): os.remove(temp_path) elif merged == "temp_only": os.remove(original_path) os.rename(temp_path, original_path) elif merged == "both_invalid": if os.path.exists(original_path): os.remove(original_path) if os.path.exists(temp_path): os.remove(temp_path) had_corruption = True elif merged == "merged": os.remove(original_path) os.rename(merged_path, original_path) if os.path.exists(temp_path): os.remove(temp_path) else: if os.path.exists(original_path): os.remove(original_path) if os.path.exists(temp_path): os.remove(temp_path) else: try: pq.ParquetFile(temp_path) os.rename(temp_path, original_path) except Exception: if os.path.exists(temp_path): os.remove(temp_path) had_corruption = True return had_corruption def finalize_resume_merge( original_output_file, temp_output_file, partition_namespaces, original_partition_dir ): """ Finalize the resume by merging temp output with original output. """ import shutil print("Merging resumed data with existing output...", file=sys.stderr) try: if partition_namespaces and original_partition_dir is not None: file_filter = os.path.basename(original_output_file) merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter) if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file) else: merged_output_file = original_output_file + ".merged" merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file) if merged == "original_only": if os.path.exists(temp_output_file): os.remove(temp_output_file) elif merged == "temp_only": os.remove(original_output_file) os.rename(temp_output_file, original_output_file) elif merged == "both_invalid": os.remove(original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) elif merged == "merged": os.remove(original_output_file) os.rename(merged_output_file, original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) else: os.remove(original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) print("Merge complete.", file=sys.stderr) except Exception as e: print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr) print(f"New data saved in: {temp_output_file}", file=sys.stderr) raise def setup_resume_temp_output(output_file, partition_namespaces): """ Set up temp output for resume mode (Parquet only). Returns: Tuple of (original_output_file, temp_output_file, original_partition_dir) or (None, None, None) if no existing output to resume from. """ import shutil original_output_file = None temp_output_file = None original_partition_dir = None if partition_namespaces: partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) output_exists = False if os.path.isdir(partition_dir): for d in os.listdir(partition_dir): if d.startswith('namespace='): if os.path.exists(os.path.join(partition_dir, d, output_filename)): output_exists = True break if output_exists: original_partition_dir = partition_dir else: output_exists = isinstance(output_file, str) and os.path.exists(output_file) if output_exists: original_output_file = output_file temp_output_file = output_file + ".resume_temp" if os.path.exists(temp_output_file): if os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file) else: os.remove(temp_output_file) if partition_namespaces: os.makedirs(temp_output_file, exist_ok=True) return original_output_file, temp_output_file, original_partition_dir