""" Checkpoint and resume functionality for wikiq parquet output. This module handles: - Finding resume points in existing parquet output - Merging resumed data with existing output (streaming, memory-efficient) - Checkpoint file management for fast resume point lookup """ import json import os import sys import pyarrow.parquet as pq def cleanup_interrupted_resume(output_file, partition_namespaces): """ Merge any leftover .resume_temp files from a previous interrupted run. This should be called BEFORE get_resume_point() so the resume point is calculated from the merged data. Returns: None - no temp files found or normal merge completed "start_fresh" - both original and temp were corrupted and deleted """ import shutil if partition_namespaces: partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) temp_suffix = ".resume_temp" if not os.path.isdir(partition_dir): return has_old_temp_files = False for ns_dir in os.listdir(partition_dir): if ns_dir.startswith('namespace='): temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix) if os.path.exists(temp_path): has_old_temp_files = True break if has_old_temp_files: print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr) had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix) # Check if any valid data remains after merge has_valid_data = False for ns_dir in os.listdir(partition_dir): if ns_dir.startswith('namespace='): ns_path = os.path.join(partition_dir, ns_dir) parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')] if parquet_files: has_valid_data = True break if had_corruption and not has_valid_data: # All data was corrupted, remove checkpoint and start fresh checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if os.path.exists(checkpoint_path): os.remove(checkpoint_path) print("All partitioned files were corrupted, will start fresh.", file=sys.stderr) return "start_fresh" print("Previous temp files merged successfully.", file=sys.stderr) else: temp_output_file = output_file + ".resume_temp" if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file): print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr) merged_path = output_file + ".merged" merged = merge_parquet_files(output_file, temp_output_file, merged_path) if merged == "original_only": # Temp file was invalid, just remove it os.remove(temp_output_file) elif merged == "temp_only": # Original was corrupted, use temp as new base os.remove(output_file) os.rename(temp_output_file, output_file) print("Recovered from temp file (original was corrupted).", file=sys.stderr) elif merged == "both_invalid": # Both files corrupted, remove both and start fresh os.remove(output_file) os.remove(temp_output_file) # Also remove stale checkpoint file checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if os.path.exists(checkpoint_path): os.remove(checkpoint_path) print("Both files were corrupted, will start fresh.", file=sys.stderr) return "start_fresh" elif merged == "merged": os.remove(output_file) os.rename(merged_path, output_file) os.remove(temp_output_file) print("Previous temp file merged successfully.", file=sys.stderr) else: # Both empty - unusual os.remove(temp_output_file) def get_checkpoint_path(output_file, partition_namespaces=False): """Get the path to the checkpoint file for a given output file. For partitioned output, the checkpoint is placed outside the partition directory to avoid pyarrow trying to read it as a parquet file. The filename includes the output filename to keep it unique per input file (for parallel jobs). """ if partition_namespaces: # output_file is like partition_dir/output.parquet # checkpoint should be at parent level: parent/output.parquet.checkpoint partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) parent_dir = os.path.dirname(partition_dir) return os.path.join(parent_dir, output_filename + ".checkpoint") return str(output_file) + ".checkpoint" def read_checkpoint(output_file, partition_namespaces=False): """ Read resume point from checkpoint file if it exists. Checkpoint format: Single file: {"pageid": 54, "revid": 325} Partitioned: {"0": {"pageid": 54, "revid": 325}, "1": {"pageid": 123, "revid": 456}} Returns: For single files: A tuple (pageid, revid), or None if not found. For partitioned: A dict mapping namespace -> (pageid, revid), or None. """ checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) if not os.path.exists(checkpoint_path): return None try: with open(checkpoint_path, 'r') as f: data = json.load(f) if not data: return None # Single-file format: {"pageid": ..., "revid": ...} if "pageid" in data and "revid" in data: return (data["pageid"], data["revid"]) # Partitioned format: {"0": {"pageid": ..., "revid": ...}, ...} result = {} for key, value in data.items(): result[int(key)] = (value["pageid"], value["revid"]) return result if result else None except (json.JSONDecodeError, IOError, KeyError, TypeError) as e: print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr) return None def get_resume_point(output_file, partition_namespaces=False): """ Find the resume point(s) from existing parquet output. First checks for a checkpoint file (fast), then falls back to scanning the parquet output (slow, for backwards compatibility). Args: output_file: Path to the output file. For single files, this is the parquet file path. For partitioned namespaces, this is the path like dir/dump.parquet where namespace=* subdirectories are in the parent dir. partition_namespaces: Whether the output uses namespace partitioning. Returns: For single files: A tuple (pageid, revid) for the row with the highest pageid, or None if not found. For partitioned: A dict mapping namespace -> (pageid, revid) for each partition, or None if no partitions exist. """ # First try checkpoint file (fast) checkpoint_path = get_checkpoint_path(output_file, partition_namespaces) checkpoint_result = read_checkpoint(output_file, partition_namespaces) if checkpoint_result is not None: print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr) return checkpoint_result # Fall back to scanning parquet (slow, for backwards compatibility) print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr) try: if partition_namespaces: return _get_resume_point_partitioned(output_file) else: return _get_resume_point_single_file(output_file) except Exception as e: print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr) return None def _get_last_row_resume_point(pq_path): """Get resume point by reading only the last row group of a parquet file. Since data is written in page/revision order, the last row group contains the highest pageid/revid, and the last row in that group is the resume point. """ pf = pq.ParquetFile(pq_path) if pf.metadata.num_row_groups == 0: return None last_rg_idx = pf.metadata.num_row_groups - 1 table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid']) if table.num_rows == 0: return None max_pageid = table['articleid'][-1].as_py() max_revid = table['revid'][-1].as_py() return (max_pageid, max_revid) def _get_resume_point_partitioned(output_file): """Find per-namespace resume points from partitioned output. Only looks for the specific output file in each namespace directory. Returns a dict mapping namespace -> (max_pageid, max_revid) for each partition where the output file exists. Args: output_file: Path like 'dir/output.parquet' where namespace=* subdirectories contain files named 'output.parquet'. """ partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir): return None namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] if not namespace_dirs: return None resume_points = {} for ns_dir in namespace_dirs: ns = int(ns_dir.split('=')[1]) pq_path = os.path.join(partition_dir, ns_dir, output_filename) if not os.path.exists(pq_path): continue try: result = _get_last_row_resume_point(pq_path) if result is not None: resume_points[ns] = result except Exception as e: print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr) continue return resume_points if resume_points else None def _get_resume_point_single_file(output_file): """Find resume point from a single parquet file.""" if not os.path.exists(output_file): return None if os.path.isdir(output_file): return None return _get_last_row_resume_point(output_file) def merge_parquet_files(original_path, temp_path, merged_path): """ Merge two parquet files by streaming row groups from original and temp into merged. This is memory-efficient: only one row group is loaded at a time. Returns: "merged" - merged file was created from both sources "original_only" - temp was invalid, keep original unchanged "temp_only" - original was corrupted but temp is valid, use temp "both_invalid" - both files invalid, delete both and start fresh False - both files were valid but empty """ original_valid = False temp_valid = False original_pq = None temp_pq = None try: original_pq = pq.ParquetFile(original_path) original_valid = True except Exception as e: print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr) try: temp_pq = pq.ParquetFile(temp_path) temp_valid = True except Exception: print(f"Note: No new data in temp file {temp_path} (namespace had no records after resume point)", file=sys.stderr) if not original_valid and not temp_valid: print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr) return "both_invalid" if not original_valid and temp_valid: print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr) return "temp_only" if original_valid and not temp_valid: return "original_only" merged_writer = None # Copy all row groups from the original file for i in range(original_pq.num_row_groups): row_group = original_pq.read_row_group(i) if merged_writer is None: merged_writer = pq.ParquetWriter( merged_path, row_group.schema, flavor="spark" ) merged_writer.write_table(row_group) # Append all row groups from the temp file for i in range(temp_pq.num_row_groups): row_group = temp_pq.read_row_group(i) if merged_writer is None: merged_writer = pq.ParquetWriter( merged_path, row_group.schema, flavor="spark" ) merged_writer.write_table(row_group) # Close the writer if merged_writer is not None: merged_writer.close() return "merged" return False def merge_partitioned_namespaces(partition_dir, temp_suffix): """ Merge partitioned namespace directories after resume. For partitioned namespaces, temp files are written alongside the original files in each namespace directory with the temp suffix appended to the filename. E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp Args: partition_dir: The partition directory containing namespace=* subdirs temp_suffix: The suffix appended to temp files (e.g., '.resume_temp') Returns: True if at least one namespace has valid data after merge False if all namespaces ended up with corrupted/deleted data """ namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] had_corruption = False for ns_dir in namespace_dirs: ns_path = os.path.join(partition_dir, ns_dir) # Find all files in this namespace directory files = os.listdir(ns_path) # Find temp files (files ending with the temp suffix) temp_files = [f for f in files if f.endswith(temp_suffix)] for temp_file in temp_files: temp_path = os.path.join(ns_path, temp_file) # Original file is the temp file without the suffix original_file = temp_file[:-len(temp_suffix)] original_path = os.path.join(ns_path, original_file) if os.path.exists(original_path): # Merge the files merged_path = original_path + ".merged" merged = merge_parquet_files(original_path, temp_path, merged_path) if merged == "original_only": # Temp file was invalid (no new data), keep original unchanged os.remove(temp_path) elif merged == "temp_only": # Original was corrupted, use temp as new base os.remove(original_path) os.rename(temp_path, original_path) elif merged == "both_invalid": # Both files corrupted, remove both os.remove(original_path) os.remove(temp_path) had_corruption = True elif merged == "merged": # Replace the original file with the merged file os.remove(original_path) os.rename(merged_path, original_path) os.remove(temp_path) else: # Both files were empty (False), just remove them os.remove(original_path) os.remove(temp_path) else: # No original file, rename temp to original only if valid try: pq.ParquetFile(temp_path) os.rename(temp_path, original_path) except Exception: # Temp file invalid, just remove it os.remove(temp_path) had_corruption = True return had_corruption def finalize_resume_merge( original_output_file, temp_output_file, partition_namespaces, original_partition_dir ): """ Finalize the resume by merging temp output with original output. Args: original_output_file: Path to the original output file temp_output_file: Path to the temp output file written during resume partition_namespaces: Whether using partitioned namespace output original_partition_dir: The partition directory (for partitioned output) Raises: Exception: If merge fails (temp file is preserved for recovery) """ import shutil print("Merging resumed data with existing output...", file=sys.stderr) try: if partition_namespaces and original_partition_dir is not None: # For partitioned namespaces, temp files are written alongside originals # with '.resume_temp' suffix in each namespace directory. merge_partitioned_namespaces(original_partition_dir, ".resume_temp") # Clean up the empty temp directory we created if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file) else: # Merge single parquet files merged_output_file = original_output_file + ".merged" merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file) if merged == "original_only": # Temp file was invalid (no new data), keep original unchanged if os.path.exists(temp_output_file): os.remove(temp_output_file) elif merged == "temp_only": # Original was corrupted, use temp as new base os.remove(original_output_file) os.rename(temp_output_file, original_output_file) elif merged == "both_invalid": # Both files corrupted, remove both os.remove(original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) elif merged == "merged": # Replace the original file with the merged file os.remove(original_output_file) os.rename(merged_output_file, original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) else: # Both files were empty (False) - unusual, but clean up os.remove(original_output_file) if os.path.exists(temp_output_file): os.remove(temp_output_file) print("Merge complete.", file=sys.stderr) except Exception as e: print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr) print(f"New data saved in: {temp_output_file}", file=sys.stderr) raise def setup_resume_temp_output(output_file, partition_namespaces): """ Set up temp output for resume mode. Args: output_file: The original output file path partition_namespaces: Whether using partitioned namespace output Returns: Tuple of (original_output_file, temp_output_file, original_partition_dir) or (None, None, None) if no existing output to resume from. """ import shutil original_output_file = None temp_output_file = None original_partition_dir = None # For partitioned namespaces, check if the specific output file exists in any namespace if partition_namespaces: partition_dir = os.path.dirname(output_file) output_filename = os.path.basename(output_file) output_exists = False if os.path.isdir(partition_dir): for d in os.listdir(partition_dir): if d.startswith('namespace='): if os.path.exists(os.path.join(partition_dir, d, output_filename)): output_exists = True break if output_exists: original_partition_dir = partition_dir else: output_exists = isinstance(output_file, str) and os.path.exists(output_file) if output_exists: original_output_file = output_file temp_output_file = output_file + ".resume_temp" # Note: cleanup_interrupted_resume() should have been called before this # to merge any leftover temp files from a previous interrupted run. # Here we just clean up any remaining temp directory markers. if os.path.exists(temp_output_file): if os.path.isdir(temp_output_file): shutil.rmtree(temp_output_file) else: os.remove(temp_output_file) if partition_namespaces: os.makedirs(temp_output_file, exist_ok=True) return original_output_file, temp_output_file, original_partition_dir