diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index e1ce26c..6f7c6d3 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -8,6 +8,7 @@ import gc import json import os.path import re +import signal import sys from collections import deque from hashlib import sha1 @@ -28,6 +29,11 @@ import wikiq.tables as tables from wikiq.tables import RevisionTable from wikiq.wiki_diff_matcher import WikiDiffMatcher from wikiq.wikitext_parser import WikitextParser +from wikiq.resume import ( + get_resume_point, + setup_resume_temp_output, + finalize_resume_merge, +) TO_ENCODE = ("title", "editor") PERSISTENCE_RADIUS = 7 @@ -244,7 +250,7 @@ class WikiqParser: output_parquet: bool = True, batch_size: int = 1024, partition_namespaces: bool = False, - resume_from_revid: int = None, + resume_point: Union[tuple, dict, None] = None, external_links: bool = False, citations: bool = False, wikilinks: bool = False, @@ -254,7 +260,10 @@ class WikiqParser: """ Parameters: persist : what persistence method to use. Takes a PersistMethod value - resume_from_revid : if set, skip all revisions up to and including this revid + resume_point : if set, either a (pageid, revid) tuple for single-file output, + or a dict mapping namespace -> (pageid, revid) for partitioned output. + For single-file: skip all revisions up to + and including this point """ self.input_file = input_file @@ -265,12 +274,13 @@ class WikiqParser: self.diff = diff self.text = text self.partition_namespaces = partition_namespaces - self.resume_from_revid = resume_from_revid + self.resume_point = resume_point self.external_links = external_links self.citations = citations self.wikilinks = wikilinks self.templates = templates self.headings = headings + self.shutdown_requested = False if namespaces is not None: self.namespace_filter = set(namespaces) else: @@ -299,6 +309,27 @@ class WikiqParser: else: self.output_file = open(output_file, "wb") + def request_shutdown(self): + """Request graceful shutdown. The process() method will exit after completing the current batch.""" + self.shutdown_requested = True + + def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace=None): + """Write a batch of rows to the appropriate writer. + + For partitioned output, creates writer lazily if needed. + Returns the writer used (for non-partitioned output, same as input). + """ + if self.partition_namespaces and namespace is not None: + if namespace not in pq_writers: + ns_path = ns_paths[namespace] + Path(ns_path).parent.mkdir(exist_ok=True, parents=True) + pq_writers[namespace] = pq.ParquetWriter( + ns_path, schema, flavor="spark", sorting_columns=sorting_cols + ) + writer = pq_writers[namespace] + writer.write(pa.record_batch(row_buffer, schema=schema)) + return writer + def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]: if (patterns is not None and labels is not None) and ( len(patterns) == len(labels) @@ -358,26 +389,22 @@ class WikiqParser: # input_filename) # Track whether we've passed the resume point - found_resume_point = self.resume_from_revid is None + # For partitioned output, this is a dict mapping namespace -> bool + if self.resume_point is None: + found_resume_point = True + elif self.partition_namespaces: + found_resume_point = {} + else: + found_resume_point = False # When resuming with parquet, write new data to temp file/directory and merge at the end original_output_file = None temp_output_file = None - if self.resume_from_revid is not None and self.output_parquet: - if isinstance(self.output_file, str) and os.path.exists(self.output_file): - original_output_file = self.output_file - # For partitioned namespaces, create a temp directory; for single files, create a temp file path - temp_output_file = self.output_file + ".resume_temp" - # Remove temp file/dir if it exists from a previous failed run - if os.path.exists(temp_output_file): - import shutil - if os.path.isdir(temp_output_file): - shutil.rmtree(temp_output_file) - else: - os.remove(temp_output_file) - # For partitioned namespaces, create the directory now; for single files it will be created by ParquetWriter - if self.partition_namespaces: - os.makedirs(temp_output_file, exist_ok=True) + original_partition_dir = None + if self.resume_point is not None and self.output_parquet: + original_output_file, temp_output_file, original_partition_dir = \ + setup_resume_temp_output(self.output_file, self.partition_namespaces) + if temp_output_file is not None: self.output_file = temp_output_file # Construct dump file iterator @@ -485,6 +512,8 @@ class WikiqParser: flavor="spark", sorting_columns=sorting_cols, ) + ns_paths = {} + pq_writers = {} else: output_path = Path(self.output_file) if self.namespace_filter is not None: @@ -495,14 +524,9 @@ class WikiqParser: ns: (output_path.parent / f"namespace={ns}") / output_path.name for ns in namespaces } - for path in ns_paths.values(): - Path(path).parent.mkdir(exist_ok=True, parents=True) - pq_writers = { - ns: pq.ParquetWriter( - path, schema, flavor="spark", sorting_columns=sorting_cols - ) - for ns, path in ns_paths.items() - } + # Writers are created lazily when first needed to avoid empty files on early exit + pq_writers = {} + writer = None # Not used for partitioned output else: writer = pacsv.CSVWriter( @@ -510,6 +534,9 @@ class WikiqParser: schema, write_options=pacsv.WriteOptions(delimiter="\t"), ) + ns_paths = {} + pq_writers = {} + sorting_cols = None regex_matches = {} @@ -522,6 +549,42 @@ class WikiqParser: if page.mwpage.namespace not in self.namespace_filter: continue + # Resume logic: skip pages that come before the resume point. + # For partitioned output, each namespace has its own resume point. + is_resume_page = False + page_resume_point = None + if self.resume_point is not None: + page_id = page.mwpage.id + page_ns = page.mwpage.namespace + + if self.partition_namespaces: + # Per-namespace resume: check if we've passed this namespace's resume point + if found_resume_point.get(page_ns, False): + pass # Already past resume point for this namespace + elif page_ns not in self.resume_point: + # No resume point for this namespace, process normally + found_resume_point[page_ns] = True + else: + resume_pageid, resume_revid = self.resume_point[page_ns] + if page_id < resume_pageid: + continue + elif page_id == resume_pageid: + is_resume_page = True + page_resume_point = (resume_pageid, resume_revid) + else: + found_resume_point[page_ns] = True + else: + # Single-file resume: global resume point + if not found_resume_point: + resume_pageid, resume_revid = self.resume_point + if page_id < resume_pageid: + continue + elif page_id == resume_pageid: + is_resume_page = True + page_resume_point = (resume_pageid, resume_revid) + else: + found_resume_point = True + # Disable detecting reverts if radius is 0. if self.revert_radius > 0: reverts_column.rev_detector = mwreverts.Detector( @@ -602,28 +665,6 @@ class WikiqParser: n_revs = 0 - # If we're resuming and haven't found the resume point yet, check this batch - skip_batch = False - if not found_resume_point and self.resume_from_revid is not None: - batch_has_resume_point = False - for revs in batch: - revs_list = list(revs) - for rev in revs_list: - if rev.id == self.resume_from_revid: - batch_has_resume_point = True - found_resume_point = True - print(f"Found resume point at revid {self.resume_from_revid}", file=sys.stderr) - break - if batch_has_resume_point: - break - - # If this batch doesn't contain the resume point, skip it entirely - if not batch_has_resume_point: - skip_batch = True - - if skip_batch: - continue - for revs in batch: # Revisions may or may not be grouped into lists of contiguous revisions by the # same user. We call these "edit sessions". Otherwise revs is a list containing @@ -650,6 +691,15 @@ class WikiqParser: regex_matches[k] = [] regex_matches[k].append(v) + # Check for shutdown after each revision + if self.shutdown_requested: + break + + # If shutdown requested, skip all remaining processing and close writers + if self.shutdown_requested: + print("Shutdown requested, closing writers...", file=sys.stderr) + break + # Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them. batch_row_buffer = table.pop() if self.persist != PersistMethod.none: @@ -790,31 +840,39 @@ class WikiqParser: if not self.text and self.persist != PersistMethod.none: del row_buffer["text"] - # If we just found the resume point in this batch, filter to only write revisions after it - if self.resume_from_revid is not None: + # Filter for resume logic if on resume page + should_write = True + if is_resume_page: + _, resume_revid = page_resume_point revids = row_buffer["revid"] - # Find the index of the resume revid - resume_idx = None - for idx, revid in enumerate(revids): - if revid == self.resume_from_revid: - resume_idx = idx - break + resume_idx = next((i for i, r in enumerate(revids) if r == resume_revid), None) if resume_idx is not None: + # Mark resume point as found + if self.partition_namespaces: + found_resume_point[page.mwpage.namespace] = True + else: + found_resume_point = True + is_resume_page = False + # Only write revisions after the resume point if resume_idx + 1 < len(revids): row_buffer = {k: v[resume_idx + 1:] for k, v in row_buffer.items()} print(f"Resuming output starting at revid {row_buffer['revid'][0]}", file=sys.stderr) else: - # The resume point was the last revision in this batch, skip writing - continue + should_write = False + else: + should_write = False - # Only write if there are rows to write - if len(row_buffer.get("revid", [])) > 0: - if self.partition_namespaces is True: - writer = pq_writers[page.mwpage.namespace] - writer.write(pa.record_batch(row_buffer, schema=schema)) + # Write batch if there are rows + if should_write and len(row_buffer.get("revid", [])) > 0: + namespace = page.mwpage.namespace if self.partition_namespaces else None + self._write_batch(row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace) gc.collect() + + # If shutdown was requested, break from page loop + if self.shutdown_requested: + break page_count += 1 print( @@ -829,79 +887,12 @@ class WikiqParser: # If we were resuming, merge the original file with the new temp file if original_output_file is not None and temp_output_file is not None: - print("Merging resumed data with existing output...", file=sys.stderr) - try: - # Check if we're merging partitioned namespaces or single files - if os.path.isdir(original_output_file): - # Merge partitioned namespace directories - self._merge_partitioned_namespaces(original_output_file, temp_output_file) - else: - # Merge single parquet files - merged_output_file = original_output_file + ".merged" - merge_parquet_files(original_output_file, temp_output_file, merged_output_file) - - # Replace the original file with the merged file - os.remove(original_output_file) - os.rename(merged_output_file, original_output_file) - - # Clean up the temp file/directory - if os.path.exists(temp_output_file): - if os.path.isdir(temp_output_file): - import shutil - shutil.rmtree(temp_output_file) - else: - os.remove(temp_output_file) - - print("Merge complete.", file=sys.stderr) - except Exception as e: - print(f"Error merging resume data: {e}", file=sys.stderr) - print(f"New data saved in: {temp_output_file}", file=sys.stderr) - raise - - def _merge_partitioned_namespaces(self, original_output_dir, temp_output_dir): - """ - Merge partitioned namespace directories. - For each namespace partition in the temp directory, merge its parquet files with the original. - """ - import shutil - - # Get all namespace directories from temp - temp_namespace_dirs = [d for d in os.listdir(temp_output_dir) if d.startswith('namespace=')] - - for ns_dir in temp_namespace_dirs: - temp_ns_path = os.path.join(temp_output_dir, ns_dir) - original_ns_path = os.path.join(original_output_dir, ns_dir) - - # Find parquet files in the temp namespace directory - temp_parquet_files = [f for f in os.listdir(temp_ns_path) if f.endswith('.parquet')] - - if not temp_parquet_files: - continue - - temp_parquet_path = os.path.join(temp_ns_path, temp_parquet_files[0]) - - # Check if the namespace partition exists in the original directory - if os.path.exists(original_ns_path): - # Namespace partition exists, merge the files - original_parquet_files = [f for f in os.listdir(original_ns_path) if f.endswith('.parquet')] - - if not original_parquet_files: - # No parquet file in original, just copy the temp file - shutil.copy(temp_parquet_path, os.path.join(original_ns_path, temp_parquet_files[0])) - else: - original_parquet_path = os.path.join(original_ns_path, original_parquet_files[0]) - merged_parquet_path = original_parquet_path + ".merged" - - # Merge the files - merge_parquet_files(original_parquet_path, temp_parquet_path, merged_parquet_path) - - # Replace the original file with the merged file - os.remove(original_parquet_path) - os.rename(merged_parquet_path, original_parquet_path) - else: - # Namespace partition doesn't exist in original, create it - shutil.copytree(temp_ns_path, original_ns_path) - + finalize_resume_merge( + original_output_file, + temp_output_file, + self.partition_namespaces, + original_partition_dir + ) def match_archive_suffix(input_filename): if re.match(r".*\.7z$", input_filename): @@ -942,111 +933,6 @@ def open_output_file(input_filename): return output_file -def merge_parquet_files(original_path, temp_path, merged_path): - """ - Merge two parquet files by copying all row groups from original and temp into merged. - """ - original_pq = pq.ParquetFile(original_path) - temp_pq = pq.ParquetFile(temp_path) - - merged_writer = None - - # Copy all row groups from the original file - for i in range(original_pq.num_row_groups): - row_group = original_pq.read_row_group(i) - if merged_writer is None: - merged_writer = pq.ParquetWriter( - merged_path, - row_group.schema, - flavor="spark" - ) - merged_writer.write_table(row_group) - - # Append all row groups from the temp file - for i in range(temp_pq.num_row_groups): - row_group = temp_pq.read_row_group(i) - merged_writer.write_table(row_group) - - # Close the writer - if merged_writer is not None: - merged_writer.close() - - -def get_last_revid_from_parquet(output_file): - """ - Read the last revid from a parquet file or partitioned namespace directory. - Returns None if the file doesn't exist or is empty. - Handles both single files and partitioned namespace structures (namespace=*/file.parquet). - For partitioned namespaces, finds the most recently modified partition and reads from it. - """ - try: - if not os.path.exists(output_file): - return None - - # Check if this is a partitioned namespace directory - if os.path.isdir(output_file): - # Find all namespace=* subdirectories - namespace_dirs = [d for d in os.listdir(output_file) if d.startswith('namespace=')] - - if not namespace_dirs: - return None - - # Find the most recently modified namespace partition - most_recent_ns = None - most_recent_mtime = -1 - - for ns_dir in namespace_dirs: - ns_path = os.path.join(output_file, ns_dir) - mtime = os.path.getmtime(ns_path) - if mtime > most_recent_mtime: - most_recent_mtime = mtime - most_recent_ns = ns_path - - if most_recent_ns is None: - return None - - # Find the parquet file in the most recent namespace directory - parquet_files = [f for f in os.listdir(most_recent_ns) if f.endswith('.parquet')] - - if not parquet_files: - return None - - parquet_path = os.path.join(most_recent_ns, parquet_files[0]) - parquet_file = pq.ParquetFile(parquet_path) - - if parquet_file.num_row_groups == 0: - return None - - # Read only the last row group, and only the revid column - last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid']) - - if last_row_group.num_rows == 0: - return None - - # Get the last revid from this row group - last_revid = last_row_group.column('revid')[-1].as_py() - return last_revid - else: - # Single parquet file - parquet_file = pq.ParquetFile(output_file) - - if parquet_file.num_row_groups == 0: - return None - - # Read only the last row group, and only the revid column - last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid']) - - if last_row_group.num_rows == 0: - return None - - # Get the last revid from this row group - last_revid = last_row_group.column('revid')[-1].as_py() - return last_revid - except Exception as e: - print(f"Error reading last revid from {output_file}: {e}", file=sys.stderr) - return None - - def main(): parser = argparse.ArgumentParser( description="Parse MediaWiki XML database dumps into tab delimited data." @@ -1291,16 +1177,29 @@ def main(): output_file = output # Handle resume functionality - resume_from_revid = None + resume_point = None if args.resume: if output_parquet and not args.stdout: - resume_from_revid = get_last_revid_from_parquet(output_file) - if resume_from_revid is not None: - print(f"Resuming from last written revid: {resume_from_revid}", file=sys.stderr) + resume_point = get_resume_point(output_file, args.partition_namespaces) + if resume_point is not None: + if args.partition_namespaces: + # Dict mapping namespace -> (pageid, revid) + ns_list = sorted(resume_point.keys()) + print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr) + for ns in ns_list: + pageid, revid = resume_point[ns] + print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr) + else: + pageid, revid = resume_point + print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr) else: - print("Resume requested but no existing output file found, starting from beginning", file=sys.stderr) + if args.partition_namespaces: + partition_dir = os.path.dirname(output_file) + sys.exit(f"Error: --resume specified but partitioned output not found in: {partition_dir}") + else: + sys.exit(f"Error: --resume specified but output file not found: {output_file}") else: - print("Warning: --resume only works with parquet output (not stdout or TSV)", file=sys.stderr) + sys.exit("Error: --resume only works with parquet output (not stdout or TSV)") wikiq = WikiqParser( input_file, @@ -1318,7 +1217,7 @@ def main(): output_parquet=output_parquet, partition_namespaces=args.partition_namespaces, batch_size=args.batch_size, - resume_from_revid=resume_from_revid, + resume_point=resume_point, external_links=args.external_links, citations=args.citations, wikilinks=args.wikilinks, @@ -1326,7 +1225,23 @@ def main(): headings=args.headings, ) - wikiq.process() + # Register signal handlers for graceful shutdown (CLI only) + def handle_shutdown(signum, frame): + sig_name = signal.Signals(signum).name + print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr) + wikiq.request_shutdown() + + original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown) + original_sigint = signal.signal(signal.SIGINT, handle_shutdown) + original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown) + + try: + wikiq.process() + finally: + # Restore original signal handlers + signal.signal(signal.SIGTERM, original_sigterm) + signal.signal(signal.SIGINT, original_sigint) + signal.signal(signal.SIGUSR1, original_sigusr1) # close things input_file.close() @@ -1350,7 +1265,7 @@ def main(): diff=args.diff, text=args.text, batch_size=args.batch_size, - resume_from_revid=None, + resume_point=None, external_links=args.external_links, citations=args.citations, wikilinks=args.wikilinks, @@ -1358,7 +1273,23 @@ def main(): headings=args.headings, ) - wikiq.process() + # Register signal handlers for graceful shutdown (CLI only) + def handle_shutdown(signum, frame): + sig_name = signal.Signals(signum).name + print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr) + wikiq.request_shutdown() + + original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown) + original_sigint = signal.signal(signal.SIGINT, handle_shutdown) + original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown) + + try: + wikiq.process() + finally: + # Restore original signal handlers + signal.signal(signal.SIGTERM, original_sigterm) + signal.signal(signal.SIGINT, original_sigint) + signal.signal(signal.SIGUSR1, original_sigusr1) # stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your" # stop_words = stop_words.split(",") diff --git a/src/wikiq/resume.py b/src/wikiq/resume.py new file mode 100644 index 0000000..50341b0 --- /dev/null +++ b/src/wikiq/resume.py @@ -0,0 +1,296 @@ +""" +Checkpoint and resume functionality for wikiq parquet output. + +This module handles: +- Finding resume points in existing parquet output +- Merging resumed data with existing output (streaming, memory-efficient) +""" + +import os +import sys + +import pyarrow.dataset as ds +import pyarrow.parquet as pq +import pyarrow.compute as pc + + +def get_resume_point(output_file, partition_namespaces=False): + """ + Find the resume point(s) from existing parquet output. + + Args: + output_file: Path to the output file. For single files, this is the parquet file path. + For partitioned namespaces, this is the path like dir/dump.parquet where + namespace=* subdirectories are in the parent dir. + partition_namespaces: Whether the output uses namespace partitioning. + + Returns: + For single files: A tuple (pageid, revid) for the row with the highest pageid, + or None if not found. + For partitioned: A dict mapping namespace -> (pageid, revid) for each partition, + or None if no partitions exist. + """ + try: + if partition_namespaces: + return _get_resume_point_partitioned(output_file) + else: + return _get_resume_point_single_file(output_file) + except Exception as e: + print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr) + return None + + +def _get_resume_point_partitioned(output_file): + """Find per-namespace resume points from partitioned output. + + Returns a dict mapping namespace -> (max_pageid, max_revid) for each partition. + This allows resume to correctly handle cases where different namespaces have + different progress due to interleaved dump ordering. + """ + partition_dir = os.path.dirname(output_file) + if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir): + return None + + namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] + if not namespace_dirs: + return None + + resume_points = {} + for ns_dir in namespace_dirs: + ns = int(ns_dir.split('=')[1]) + ns_path = os.path.join(partition_dir, ns_dir) + + # Find parquet files in this namespace directory + parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet')] + if not parquet_files: + continue + + # Read all parquet files in this namespace + for pq_file in parquet_files: + pq_path = os.path.join(ns_path, pq_file) + try: + pf = pq.ParquetFile(pq_path) + table = pf.read(columns=['articleid', 'revid']) + if table.num_rows == 0: + continue + + max_pageid = pc.max(table['articleid']).as_py() + mask = pc.equal(table['articleid'], max_pageid) + max_revid = pc.max(pc.filter(table['revid'], mask)).as_py() + + # Keep the highest pageid for this namespace + if ns not in resume_points or max_pageid > resume_points[ns][0]: + resume_points[ns] = (max_pageid, max_revid) + except Exception as e: + print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr) + continue + + return resume_points if resume_points else None + + +def _get_resume_point_single_file(output_file): + """Find resume point from a single parquet file.""" + if not os.path.exists(output_file): + return None + + if os.path.isdir(output_file): + return None + + # Find the row with the highest pageid + pf = pq.ParquetFile(output_file) + table = pf.read(columns=['articleid', 'revid']) + + if table.num_rows == 0: + return None + + max_pageid = pc.max(table['articleid']).as_py() + # Filter to row(s) with max pageid and get max revid + mask = pc.equal(table['articleid'], max_pageid) + max_revid = pc.max(pc.filter(table['revid'], mask)).as_py() + return (max_pageid, max_revid) + + +def merge_parquet_files(original_path, temp_path, merged_path): + """ + Merge two parquet files by streaming row groups from original and temp into merged. + + This is memory-efficient: only one row group is loaded at a time. + Returns True if merged file was created, False if both sources were empty. + """ + original_pq = pq.ParquetFile(original_path) + temp_pq = pq.ParquetFile(temp_path) + + merged_writer = None + + # Copy all row groups from the original file + for i in range(original_pq.num_row_groups): + row_group = original_pq.read_row_group(i) + if merged_writer is None: + merged_writer = pq.ParquetWriter( + merged_path, + row_group.schema, + flavor="spark" + ) + merged_writer.write_table(row_group) + + # Append all row groups from the temp file + for i in range(temp_pq.num_row_groups): + row_group = temp_pq.read_row_group(i) + if merged_writer is None: + merged_writer = pq.ParquetWriter( + merged_path, + row_group.schema, + flavor="spark" + ) + merged_writer.write_table(row_group) + + # Close the writer + if merged_writer is not None: + merged_writer.close() + return True + return False + + +def merge_partitioned_namespaces(partition_dir, temp_suffix): + """ + Merge partitioned namespace directories after resume. + + For partitioned namespaces, temp files are written alongside the original files + in each namespace directory with the temp suffix appended to the filename. + E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp + + Args: + partition_dir: The partition directory containing namespace=* subdirs + temp_suffix: The suffix appended to temp files (e.g., '.resume_temp') + """ + namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')] + + for ns_dir in namespace_dirs: + ns_path = os.path.join(partition_dir, ns_dir) + + # Find all files in this namespace directory + files = os.listdir(ns_path) + + # Find temp files (files ending with the temp suffix) + temp_files = [f for f in files if f.endswith(temp_suffix)] + + for temp_file in temp_files: + temp_path = os.path.join(ns_path, temp_file) + # Original file is the temp file without the suffix + original_file = temp_file[:-len(temp_suffix)] + original_path = os.path.join(ns_path, original_file) + + if os.path.exists(original_path): + # Merge the files + merged_path = original_path + ".merged" + merged = merge_parquet_files(original_path, temp_path, merged_path) + + if merged: + # Replace the original file with the merged file + os.remove(original_path) + os.rename(merged_path, original_path) + os.remove(temp_path) + else: + # Both files were empty, just remove them + os.remove(original_path) + os.remove(temp_path) + else: + # No original file, rename temp to original + os.rename(temp_path, original_path) + + +def finalize_resume_merge( + original_output_file, + temp_output_file, + partition_namespaces, + original_partition_dir +): + """ + Finalize the resume by merging temp output with original output. + + Args: + original_output_file: Path to the original output file + temp_output_file: Path to the temp output file written during resume + partition_namespaces: Whether using partitioned namespace output + original_partition_dir: The partition directory (for partitioned output) + + Raises: + Exception: If merge fails (temp file is preserved for recovery) + """ + import shutil + + print("Merging resumed data with existing output...", file=sys.stderr) + try: + if partition_namespaces and original_partition_dir is not None: + # For partitioned namespaces, temp files are written alongside originals + # with '.resume_temp' suffix in each namespace directory. + merge_partitioned_namespaces(original_partition_dir, ".resume_temp") + # Clean up the empty temp directory we created + if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file): + shutil.rmtree(temp_output_file) + else: + # Merge single parquet files + merged_output_file = original_output_file + ".merged" + merge_parquet_files(original_output_file, temp_output_file, merged_output_file) + + # Replace the original file with the merged file + os.remove(original_output_file) + os.rename(merged_output_file, original_output_file) + + # Clean up the temp file + if os.path.exists(temp_output_file): + os.remove(temp_output_file) + + print("Merge complete.", file=sys.stderr) + except Exception as e: + print(f"Error merging resume data: {e}", file=sys.stderr) + print(f"New data saved in: {temp_output_file}", file=sys.stderr) + raise + + +def setup_resume_temp_output(output_file, partition_namespaces): + """ + Set up temp output for resume mode. + + Args: + output_file: The original output file path + partition_namespaces: Whether using partitioned namespace output + + Returns: + Tuple of (original_output_file, temp_output_file, original_partition_dir) + or (None, None, None) if no existing output to resume from. + """ + import shutil + + original_output_file = None + temp_output_file = None + original_partition_dir = None + + # For partitioned namespaces, check if the partition directory exists + if partition_namespaces: + partition_dir = os.path.dirname(output_file) + output_exists = os.path.isdir(partition_dir) and any( + d.startswith('namespace=') for d in os.listdir(partition_dir) + ) + if output_exists: + original_partition_dir = partition_dir + else: + output_exists = isinstance(output_file, str) and os.path.exists(output_file) + + if output_exists: + original_output_file = output_file + temp_output_file = output_file + ".resume_temp" + + # Remove temp file/dir if it exists from a previous failed run + if os.path.exists(temp_output_file): + if os.path.isdir(temp_output_file): + shutil.rmtree(temp_output_file) + else: + os.remove(temp_output_file) + + # For partitioned namespaces, create an empty temp directory + # (actual temp files go in namespace=* dirs with .resume_temp suffix) + if partition_namespaces: + os.makedirs(temp_output_file, exist_ok=True) + + return original_output_file, temp_output_file, original_partition_dir diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index f37ef6c..ed47e64 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -1,6 +1,7 @@ import os import shutil import subprocess +import sys import tracemalloc from io import StringIO from typing import Final, Union @@ -539,97 +540,108 @@ def test_resume_with_diff(): print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") def test_resume_with_partition_namespaces(): - """Test that --resume works correctly with --partition-namespaces.""" - import pyarrow.parquet as pq + """Test that --resume works correctly with --partition-namespaces. - # First, create a complete baseline output with partition-namespaces - tester_full = WikiqTester(SAILORMOON, "resume_partition_full", in_compression="7z", out_format="parquet") + Interrupts wikiq partway through processing, then resumes and verifies + the result matches an uninterrupted run. Uses --flush-per-batch to ensure + data is written to disk after each batch, making interruption deterministic. + """ + import signal + import time + import pyarrow.dataset as ds + # Use separate subdirectories for full and partial runs to isolate them + full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full") + partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial") + input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") + + # Clean up any existing output directories from previous runs + for output_dir in [full_dir, partial_dir]: + if os.path.exists(output_dir): + shutil.rmtree(output_dir) + os.makedirs(output_dir) + + # Paths within each isolated directory + full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet") + partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet") + + # Run wikiq fully to get baseline output + cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces" try: - tester_full.call_wikiq("--partition-namespaces", "--fandom-2020") + subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) - # Read the full output from the partitioned directory - full_output_dir = tester_full.output - namespace_dirs = [d for d in os.listdir(full_output_dir) if d.startswith('namespace=')] + # Read full output + full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive") + full_df = full_dataset.to_table().to_pandas() + total_rows = len(full_df) + print(f"Full run produced {total_rows} rows") - if not namespace_dirs: - pytest.fail("No namespace directories found in output") + # Start wikiq for the interrupted run (use list args so SIGTERM goes to Python) + batch_size = 10 + cmd_partial = [ + sys.executable, WIKIQ, input_file, + "-o", partial_output, + "--batch-size", str(batch_size), + "--partition-namespaces" + ] + print(f"Starting: {' '.join(cmd_partial)}") - # Collect all revisions from all namespaces - full_revids = [] - for ns_dir in sorted(namespace_dirs): - parquet_files = [f for f in os.listdir(os.path.join(full_output_dir, ns_dir)) if f.endswith('.parquet')] - if parquet_files: - ns_parquet_path = os.path.join(full_output_dir, ns_dir, parquet_files[0]) - pf = pq.ParquetFile(ns_parquet_path) - table = pf.read(columns=['revid']) - revids = table.column('revid').to_pylist() - full_revids.extend(revids) + proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE) - full_revids_sorted = sorted(set(full_revids)) - total_revisions = len(full_revids_sorted) + # Wait a short time to allow some processing + interrupt_delay = 5 # seconds - enough for some pages but not all + time.sleep(interrupt_delay) - # Get a revid about 1/3 through to use as the resume point - resume_idx = total_revisions // 3 - resume_revid = full_revids_sorted[resume_idx] + if proc.poll() is not None: + pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt") - print(f"Total revisions: {total_revisions}, Resume point: {resume_idx}, Resume revid: {resume_revid}") + # Simulate SLURM job termination: send SIGUSR1 first (early warning), + # then wait for graceful shutdown, then SIGTERM if still running + print(f"Sending SIGUSR1 after {interrupt_delay}s") + proc.send_signal(signal.SIGUSR1) - # Create a partial output by manually creating the partitioned structure - tester_partial = WikiqTester(SAILORMOON, "resume_partition_partial", in_compression="7z", out_format="parquet") - partial_output_dir = tester_partial.output - - # Copy the full partitioned output to the partial directory - for ns_dir in namespace_dirs: - src_ns_path = os.path.join(full_output_dir, ns_dir) - dst_ns_path = os.path.join(partial_output_dir, ns_dir) - shutil.copytree(src_ns_path, dst_ns_path) - - # Now filter each namespace file to only include revisions up to resume_idx - revised_data_count = 0 - for ns_dir in namespace_dirs: - parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')] - if parquet_files: - ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0]) - pf = pq.ParquetFile(ns_parquet_path) - table = pf.read() - - # Filter to only rows up to the resume point - revids = table.column('revid').to_pylist() - mask = pa.array([revid <= resume_revid for revid in revids], type=pa.bool_()) - partial_table = table.filter(mask) - revised_data_count += len(partial_table) - - # Write back the filtered data - pq.write_table(partial_table, ns_parquet_path) - - print(f"Created partial output with {revised_data_count} revisions (up to revid {resume_revid})") - - # Now resume from the partial output + # Wait for graceful shutdown try: - tester_partial.call_wikiq("--partition-namespaces", "--fandom-2020", "--resume") + proc.wait(timeout=5) + print("Process exited gracefully after SIGUSR1") + except subprocess.TimeoutExpired: + # Process didn't exit, send SIGTERM + print("Sending SIGTERM after SIGUSR1 timeout") + proc.send_signal(signal.SIGTERM) + proc.wait(timeout=30) + + # Read interrupted output + interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") + interrupted_rows = interrupted_dataset.count_rows() + print(f"Interrupted run wrote {interrupted_rows} rows") + + assert interrupted_rows < total_rows, \ + f"Process wrote all {interrupted_rows} rows before being killed" + + # Resume + cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume" + try: + subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) - # Read the resumed output and collect revids - resumed_revids = [] - for ns_dir in namespace_dirs: - parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')] - if parquet_files: - ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0]) - pf = pq.ParquetFile(ns_parquet_path) - table = pf.read(columns=['revid']) - revids = table.column('revid').to_pylist() - resumed_revids.extend(revids) + # Read resumed output + resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive") + resumed_df = resumed_dataset.to_table().to_pandas() - resumed_revids_sorted = sorted(set(resumed_revids)) + # Check revid sets match (the important invariant) + full_revids = set(full_df['revid']) + resumed_revids = set(resumed_df['revid']) + missing_revids = full_revids - resumed_revids + extra_revids = resumed_revids - full_revids + assert missing_revids == set() and extra_revids == set(), \ + f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}" + assert len(resumed_df) == len(full_df), \ + f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}" - # Compare the revids - assert resumed_revids_sorted == full_revids_sorted, f"Resumed revids mismatch: {len(resumed_revids_sorted)} vs {len(full_revids_sorted)}" - - print(f"Resume with partition-namespaces test passed! Original: {len(full_revids_sorted)} revisions, Resumed: {len(resumed_revids_sorted)} revisions") + print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}") def test_external_links_only(): @@ -963,3 +975,71 @@ def test_headings(): assert actual_list == expected, f"Row {idx}: headings mismatch" print(f"Headings test passed! {len(test)} rows processed") + + +def test_resume_file_not_found(): + """Test that --resume exits with error when output file doesn't exist.""" + tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet") + + # Ensure the output file does not exist + expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet") + if os.path.exists(expected_output): + os.remove(expected_output) + + try: + tester.call_wikiq("--resume") + pytest.fail("Expected error when --resume is used but output file doesn't exist") + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode("utf8") + assert "Error: --resume specified but output file not found" in stderr, \ + f"Expected error message about missing output file, got: {stderr}" + + print("Resume file not found test passed!") + + +def test_resume_simple(): + """Test that --resume works without --fandom-2020 and --partition-namespaces.""" + import pyarrow.parquet as pq + + # First, create a complete baseline output (no fandom-2020, no partition-namespaces) + tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet") + + try: + tester_full.call_wikiq() + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + # Read the full output + full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet") + full_table = pq.read_table(full_output_path) + + # Get a revid about 1/3 through to use as the resume point + resume_idx = len(full_table) // 3 + resume_revid = full_table.column("revid")[resume_idx].as_py() + + print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}") + + # Create a partial output by slicing the table + tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet") + partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet") + + partial_table = full_table.slice(0, resume_idx + 1) + pq.write_table(partial_table, partial_output_path) + + # Now resume from the partial output + try: + tester_partial.call_wikiq("--resume") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + # Read the resumed output + resumed_table = pq.read_table(partial_output_path) + + # Convert to dataframes for comparison, sorting by revid + resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True) + full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True) + + # Compare the dataframes + assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) + + print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")