Add per-namespace resume support for partitioned parquet output.

- Implement per-namespace resume points (dict mapping namespace -> (pageid, revid))
  to correctly handle interleaved dump ordering in partitioned output
- Extract resume functionality to dedicated resume.py module
- Add graceful shutdown handling via shutdown_requested flag (CLI-level only)
- Use lazy ParquetWriter creation to avoid empty files on early exit
- Refactor writing logic to _write_batch() helper method
- Simplify control flow by replacing continue statements with should_write flag
This commit is contained in:
Nathan TeBlunthuis 2025-12-06 06:56:19 -08:00
parent d69d8b0df2
commit 577ddc87f5
3 changed files with 632 additions and 325 deletions

View File

@ -8,6 +8,7 @@ import gc
import json import json
import os.path import os.path
import re import re
import signal
import sys import sys
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
@ -28,6 +29,11 @@ import wikiq.tables as tables
from wikiq.tables import RevisionTable from wikiq.tables import RevisionTable
from wikiq.wiki_diff_matcher import WikiDiffMatcher from wikiq.wiki_diff_matcher import WikiDiffMatcher
from wikiq.wikitext_parser import WikitextParser from wikiq.wikitext_parser import WikitextParser
from wikiq.resume import (
get_resume_point,
setup_resume_temp_output,
finalize_resume_merge,
)
TO_ENCODE = ("title", "editor") TO_ENCODE = ("title", "editor")
PERSISTENCE_RADIUS = 7 PERSISTENCE_RADIUS = 7
@ -244,7 +250,7 @@ class WikiqParser:
output_parquet: bool = True, output_parquet: bool = True,
batch_size: int = 1024, batch_size: int = 1024,
partition_namespaces: bool = False, partition_namespaces: bool = False,
resume_from_revid: int = None, resume_point: Union[tuple, dict, None] = None,
external_links: bool = False, external_links: bool = False,
citations: bool = False, citations: bool = False,
wikilinks: bool = False, wikilinks: bool = False,
@ -254,7 +260,10 @@ class WikiqParser:
""" """
Parameters: Parameters:
persist : what persistence method to use. Takes a PersistMethod value persist : what persistence method to use. Takes a PersistMethod value
resume_from_revid : if set, skip all revisions up to and including this revid resume_point : if set, either a (pageid, revid) tuple for single-file output,
or a dict mapping namespace -> (pageid, revid) for partitioned output.
For single-file: skip all revisions up to
and including this point
""" """
self.input_file = input_file self.input_file = input_file
@ -265,12 +274,13 @@ class WikiqParser:
self.diff = diff self.diff = diff
self.text = text self.text = text
self.partition_namespaces = partition_namespaces self.partition_namespaces = partition_namespaces
self.resume_from_revid = resume_from_revid self.resume_point = resume_point
self.external_links = external_links self.external_links = external_links
self.citations = citations self.citations = citations
self.wikilinks = wikilinks self.wikilinks = wikilinks
self.templates = templates self.templates = templates
self.headings = headings self.headings = headings
self.shutdown_requested = False
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
else: else:
@ -299,6 +309,27 @@ class WikiqParser:
else: else:
self.output_file = open(output_file, "wb") self.output_file = open(output_file, "wb")
def request_shutdown(self):
"""Request graceful shutdown. The process() method will exit after completing the current batch."""
self.shutdown_requested = True
def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace=None):
"""Write a batch of rows to the appropriate writer.
For partitioned output, creates writer lazily if needed.
Returns the writer used (for non-partitioned output, same as input).
"""
if self.partition_namespaces and namespace is not None:
if namespace not in pq_writers:
ns_path = ns_paths[namespace]
Path(ns_path).parent.mkdir(exist_ok=True, parents=True)
pq_writers[namespace] = pq.ParquetWriter(
ns_path, schema, flavor="spark", sorting_columns=sorting_cols
)
writer = pq_writers[namespace]
writer.write(pa.record_batch(row_buffer, schema=schema))
return writer
def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]: def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]:
if (patterns is not None and labels is not None) and ( if (patterns is not None and labels is not None) and (
len(patterns) == len(labels) len(patterns) == len(labels)
@ -358,26 +389,22 @@ class WikiqParser:
# input_filename) # input_filename)
# Track whether we've passed the resume point # Track whether we've passed the resume point
found_resume_point = self.resume_from_revid is None # For partitioned output, this is a dict mapping namespace -> bool
if self.resume_point is None:
found_resume_point = True
elif self.partition_namespaces:
found_resume_point = {}
else:
found_resume_point = False
# When resuming with parquet, write new data to temp file/directory and merge at the end # When resuming with parquet, write new data to temp file/directory and merge at the end
original_output_file = None original_output_file = None
temp_output_file = None temp_output_file = None
if self.resume_from_revid is not None and self.output_parquet: original_partition_dir = None
if isinstance(self.output_file, str) and os.path.exists(self.output_file): if self.resume_point is not None and self.output_parquet:
original_output_file = self.output_file original_output_file, temp_output_file, original_partition_dir = \
# For partitioned namespaces, create a temp directory; for single files, create a temp file path setup_resume_temp_output(self.output_file, self.partition_namespaces)
temp_output_file = self.output_file + ".resume_temp" if temp_output_file is not None:
# Remove temp file/dir if it exists from a previous failed run
if os.path.exists(temp_output_file):
import shutil
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
# For partitioned namespaces, create the directory now; for single files it will be created by ParquetWriter
if self.partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True)
self.output_file = temp_output_file self.output_file = temp_output_file
# Construct dump file iterator # Construct dump file iterator
@ -485,6 +512,8 @@ class WikiqParser:
flavor="spark", flavor="spark",
sorting_columns=sorting_cols, sorting_columns=sorting_cols,
) )
ns_paths = {}
pq_writers = {}
else: else:
output_path = Path(self.output_file) output_path = Path(self.output_file)
if self.namespace_filter is not None: if self.namespace_filter is not None:
@ -495,14 +524,9 @@ class WikiqParser:
ns: (output_path.parent / f"namespace={ns}") / output_path.name ns: (output_path.parent / f"namespace={ns}") / output_path.name
for ns in namespaces for ns in namespaces
} }
for path in ns_paths.values(): # Writers are created lazily when first needed to avoid empty files on early exit
Path(path).parent.mkdir(exist_ok=True, parents=True) pq_writers = {}
pq_writers = { writer = None # Not used for partitioned output
ns: pq.ParquetWriter(
path, schema, flavor="spark", sorting_columns=sorting_cols
)
for ns, path in ns_paths.items()
}
else: else:
writer = pacsv.CSVWriter( writer = pacsv.CSVWriter(
@ -510,6 +534,9 @@ class WikiqParser:
schema, schema,
write_options=pacsv.WriteOptions(delimiter="\t"), write_options=pacsv.WriteOptions(delimiter="\t"),
) )
ns_paths = {}
pq_writers = {}
sorting_cols = None
regex_matches = {} regex_matches = {}
@ -522,6 +549,42 @@ class WikiqParser:
if page.mwpage.namespace not in self.namespace_filter: if page.mwpage.namespace not in self.namespace_filter:
continue continue
# Resume logic: skip pages that come before the resume point.
# For partitioned output, each namespace has its own resume point.
is_resume_page = False
page_resume_point = None
if self.resume_point is not None:
page_id = page.mwpage.id
page_ns = page.mwpage.namespace
if self.partition_namespaces:
# Per-namespace resume: check if we've passed this namespace's resume point
if found_resume_point.get(page_ns, False):
pass # Already past resume point for this namespace
elif page_ns not in self.resume_point:
# No resume point for this namespace, process normally
found_resume_point[page_ns] = True
else:
resume_pageid, resume_revid = self.resume_point[page_ns]
if page_id < resume_pageid:
continue
elif page_id == resume_pageid:
is_resume_page = True
page_resume_point = (resume_pageid, resume_revid)
else:
found_resume_point[page_ns] = True
else:
# Single-file resume: global resume point
if not found_resume_point:
resume_pageid, resume_revid = self.resume_point
if page_id < resume_pageid:
continue
elif page_id == resume_pageid:
is_resume_page = True
page_resume_point = (resume_pageid, resume_revid)
else:
found_resume_point = True
# Disable detecting reverts if radius is 0. # Disable detecting reverts if radius is 0.
if self.revert_radius > 0: if self.revert_radius > 0:
reverts_column.rev_detector = mwreverts.Detector( reverts_column.rev_detector = mwreverts.Detector(
@ -602,28 +665,6 @@ class WikiqParser:
n_revs = 0 n_revs = 0
# If we're resuming and haven't found the resume point yet, check this batch
skip_batch = False
if not found_resume_point and self.resume_from_revid is not None:
batch_has_resume_point = False
for revs in batch:
revs_list = list(revs)
for rev in revs_list:
if rev.id == self.resume_from_revid:
batch_has_resume_point = True
found_resume_point = True
print(f"Found resume point at revid {self.resume_from_revid}", file=sys.stderr)
break
if batch_has_resume_point:
break
# If this batch doesn't contain the resume point, skip it entirely
if not batch_has_resume_point:
skip_batch = True
if skip_batch:
continue
for revs in batch: for revs in batch:
# Revisions may or may not be grouped into lists of contiguous revisions by the # Revisions may or may not be grouped into lists of contiguous revisions by the
# same user. We call these "edit sessions". Otherwise revs is a list containing # same user. We call these "edit sessions". Otherwise revs is a list containing
@ -650,6 +691,15 @@ class WikiqParser:
regex_matches[k] = [] regex_matches[k] = []
regex_matches[k].append(v) regex_matches[k].append(v)
# Check for shutdown after each revision
if self.shutdown_requested:
break
# If shutdown requested, skip all remaining processing and close writers
if self.shutdown_requested:
print("Shutdown requested, closing writers...", file=sys.stderr)
break
# Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them. # Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them.
batch_row_buffer = table.pop() batch_row_buffer = table.pop()
if self.persist != PersistMethod.none: if self.persist != PersistMethod.none:
@ -790,31 +840,39 @@ class WikiqParser:
if not self.text and self.persist != PersistMethod.none: if not self.text and self.persist != PersistMethod.none:
del row_buffer["text"] del row_buffer["text"]
# If we just found the resume point in this batch, filter to only write revisions after it # Filter for resume logic if on resume page
if self.resume_from_revid is not None: should_write = True
if is_resume_page:
_, resume_revid = page_resume_point
revids = row_buffer["revid"] revids = row_buffer["revid"]
# Find the index of the resume revid resume_idx = next((i for i, r in enumerate(revids) if r == resume_revid), None)
resume_idx = None
for idx, revid in enumerate(revids):
if revid == self.resume_from_revid:
resume_idx = idx
break
if resume_idx is not None: if resume_idx is not None:
# Mark resume point as found
if self.partition_namespaces:
found_resume_point[page.mwpage.namespace] = True
else:
found_resume_point = True
is_resume_page = False
# Only write revisions after the resume point # Only write revisions after the resume point
if resume_idx + 1 < len(revids): if resume_idx + 1 < len(revids):
row_buffer = {k: v[resume_idx + 1:] for k, v in row_buffer.items()} row_buffer = {k: v[resume_idx + 1:] for k, v in row_buffer.items()}
print(f"Resuming output starting at revid {row_buffer['revid'][0]}", file=sys.stderr) print(f"Resuming output starting at revid {row_buffer['revid'][0]}", file=sys.stderr)
else: else:
# The resume point was the last revision in this batch, skip writing should_write = False
continue else:
should_write = False
# Only write if there are rows to write # Write batch if there are rows
if len(row_buffer.get("revid", [])) > 0: if should_write and len(row_buffer.get("revid", [])) > 0:
if self.partition_namespaces is True: namespace = page.mwpage.namespace if self.partition_namespaces else None
writer = pq_writers[page.mwpage.namespace] self._write_batch(row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace)
writer.write(pa.record_batch(row_buffer, schema=schema))
gc.collect() gc.collect()
# If shutdown was requested, break from page loop
if self.shutdown_requested:
break
page_count += 1 page_count += 1
print( print(
@ -829,79 +887,12 @@ class WikiqParser:
# If we were resuming, merge the original file with the new temp file # If we were resuming, merge the original file with the new temp file
if original_output_file is not None and temp_output_file is not None: if original_output_file is not None and temp_output_file is not None:
print("Merging resumed data with existing output...", file=sys.stderr) finalize_resume_merge(
try: original_output_file,
# Check if we're merging partitioned namespaces or single files temp_output_file,
if os.path.isdir(original_output_file): self.partition_namespaces,
# Merge partitioned namespace directories original_partition_dir
self._merge_partitioned_namespaces(original_output_file, temp_output_file) )
else:
# Merge single parquet files
merged_output_file = original_output_file + ".merged"
merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
# Replace the original file with the merged file
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
# Clean up the temp file/directory
if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
import shutil
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr)
except Exception as e:
print(f"Error merging resume data: {e}", file=sys.stderr)
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
raise
def _merge_partitioned_namespaces(self, original_output_dir, temp_output_dir):
"""
Merge partitioned namespace directories.
For each namespace partition in the temp directory, merge its parquet files with the original.
"""
import shutil
# Get all namespace directories from temp
temp_namespace_dirs = [d for d in os.listdir(temp_output_dir) if d.startswith('namespace=')]
for ns_dir in temp_namespace_dirs:
temp_ns_path = os.path.join(temp_output_dir, ns_dir)
original_ns_path = os.path.join(original_output_dir, ns_dir)
# Find parquet files in the temp namespace directory
temp_parquet_files = [f for f in os.listdir(temp_ns_path) if f.endswith('.parquet')]
if not temp_parquet_files:
continue
temp_parquet_path = os.path.join(temp_ns_path, temp_parquet_files[0])
# Check if the namespace partition exists in the original directory
if os.path.exists(original_ns_path):
# Namespace partition exists, merge the files
original_parquet_files = [f for f in os.listdir(original_ns_path) if f.endswith('.parquet')]
if not original_parquet_files:
# No parquet file in original, just copy the temp file
shutil.copy(temp_parquet_path, os.path.join(original_ns_path, temp_parquet_files[0]))
else:
original_parquet_path = os.path.join(original_ns_path, original_parquet_files[0])
merged_parquet_path = original_parquet_path + ".merged"
# Merge the files
merge_parquet_files(original_parquet_path, temp_parquet_path, merged_parquet_path)
# Replace the original file with the merged file
os.remove(original_parquet_path)
os.rename(merged_parquet_path, original_parquet_path)
else:
# Namespace partition doesn't exist in original, create it
shutil.copytree(temp_ns_path, original_ns_path)
def match_archive_suffix(input_filename): def match_archive_suffix(input_filename):
if re.match(r".*\.7z$", input_filename): if re.match(r".*\.7z$", input_filename):
@ -942,111 +933,6 @@ def open_output_file(input_filename):
return output_file return output_file
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by copying all row groups from original and temp into merged.
"""
original_pq = pq.ParquetFile(original_path)
temp_pq = pq.ParquetFile(temp_path)
merged_writer = None
# Copy all row groups from the original file
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
def get_last_revid_from_parquet(output_file):
"""
Read the last revid from a parquet file or partitioned namespace directory.
Returns None if the file doesn't exist or is empty.
Handles both single files and partitioned namespace structures (namespace=*/file.parquet).
For partitioned namespaces, finds the most recently modified partition and reads from it.
"""
try:
if not os.path.exists(output_file):
return None
# Check if this is a partitioned namespace directory
if os.path.isdir(output_file):
# Find all namespace=* subdirectories
namespace_dirs = [d for d in os.listdir(output_file) if d.startswith('namespace=')]
if not namespace_dirs:
return None
# Find the most recently modified namespace partition
most_recent_ns = None
most_recent_mtime = -1
for ns_dir in namespace_dirs:
ns_path = os.path.join(output_file, ns_dir)
mtime = os.path.getmtime(ns_path)
if mtime > most_recent_mtime:
most_recent_mtime = mtime
most_recent_ns = ns_path
if most_recent_ns is None:
return None
# Find the parquet file in the most recent namespace directory
parquet_files = [f for f in os.listdir(most_recent_ns) if f.endswith('.parquet')]
if not parquet_files:
return None
parquet_path = os.path.join(most_recent_ns, parquet_files[0])
parquet_file = pq.ParquetFile(parquet_path)
if parquet_file.num_row_groups == 0:
return None
# Read only the last row group, and only the revid column
last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid'])
if last_row_group.num_rows == 0:
return None
# Get the last revid from this row group
last_revid = last_row_group.column('revid')[-1].as_py()
return last_revid
else:
# Single parquet file
parquet_file = pq.ParquetFile(output_file)
if parquet_file.num_row_groups == 0:
return None
# Read only the last row group, and only the revid column
last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid'])
if last_row_group.num_rows == 0:
return None
# Get the last revid from this row group
last_revid = last_row_group.column('revid')[-1].as_py()
return last_revid
except Exception as e:
print(f"Error reading last revid from {output_file}: {e}", file=sys.stderr)
return None
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Parse MediaWiki XML database dumps into tab delimited data." description="Parse MediaWiki XML database dumps into tab delimited data."
@ -1291,16 +1177,29 @@ def main():
output_file = output output_file = output
# Handle resume functionality # Handle resume functionality
resume_from_revid = None resume_point = None
if args.resume: if args.resume:
if output_parquet and not args.stdout: if output_parquet and not args.stdout:
resume_from_revid = get_last_revid_from_parquet(output_file) resume_point = get_resume_point(output_file, args.partition_namespaces)
if resume_from_revid is not None: if resume_point is not None:
print(f"Resuming from last written revid: {resume_from_revid}", file=sys.stderr) if args.partition_namespaces:
# Dict mapping namespace -> (pageid, revid)
ns_list = sorted(resume_point.keys())
print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr)
for ns in ns_list:
pageid, revid = resume_point[ns]
print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr)
else:
pageid, revid = resume_point
print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr)
else: else:
print("Resume requested but no existing output file found, starting from beginning", file=sys.stderr) if args.partition_namespaces:
partition_dir = os.path.dirname(output_file)
sys.exit(f"Error: --resume specified but partitioned output not found in: {partition_dir}")
else:
sys.exit(f"Error: --resume specified but output file not found: {output_file}")
else: else:
print("Warning: --resume only works with parquet output (not stdout or TSV)", file=sys.stderr) sys.exit("Error: --resume only works with parquet output (not stdout or TSV)")
wikiq = WikiqParser( wikiq = WikiqParser(
input_file, input_file,
@ -1318,7 +1217,7 @@ def main():
output_parquet=output_parquet, output_parquet=output_parquet,
partition_namespaces=args.partition_namespaces, partition_namespaces=args.partition_namespaces,
batch_size=args.batch_size, batch_size=args.batch_size,
resume_from_revid=resume_from_revid, resume_point=resume_point,
external_links=args.external_links, external_links=args.external_links,
citations=args.citations, citations=args.citations,
wikilinks=args.wikilinks, wikilinks=args.wikilinks,
@ -1326,7 +1225,23 @@ def main():
headings=args.headings, headings=args.headings,
) )
wikiq.process() # Register signal handlers for graceful shutdown (CLI only)
def handle_shutdown(signum, frame):
sig_name = signal.Signals(signum).name
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
wikiq.request_shutdown()
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
try:
wikiq.process()
finally:
# Restore original signal handlers
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1)
# close things # close things
input_file.close() input_file.close()
@ -1350,7 +1265,7 @@ def main():
diff=args.diff, diff=args.diff,
text=args.text, text=args.text,
batch_size=args.batch_size, batch_size=args.batch_size,
resume_from_revid=None, resume_point=None,
external_links=args.external_links, external_links=args.external_links,
citations=args.citations, citations=args.citations,
wikilinks=args.wikilinks, wikilinks=args.wikilinks,
@ -1358,7 +1273,23 @@ def main():
headings=args.headings, headings=args.headings,
) )
wikiq.process() # Register signal handlers for graceful shutdown (CLI only)
def handle_shutdown(signum, frame):
sig_name = signal.Signals(signum).name
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
wikiq.request_shutdown()
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
try:
wikiq.process()
finally:
# Restore original signal handlers
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1)
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your" # stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",") # stop_words = stop_words.split(",")

296
src/wikiq/resume.py Normal file
View File

@ -0,0 +1,296 @@
"""
Checkpoint and resume functionality for wikiq parquet output.
This module handles:
- Finding resume points in existing parquet output
- Merging resumed data with existing output (streaming, memory-efficient)
"""
import os
import sys
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pyarrow.compute as pc
def get_resume_point(output_file, partition_namespaces=False):
"""
Find the resume point(s) from existing parquet output.
Args:
output_file: Path to the output file. For single files, this is the parquet file path.
For partitioned namespaces, this is the path like dir/dump.parquet where
namespace=* subdirectories are in the parent dir.
partition_namespaces: Whether the output uses namespace partitioning.
Returns:
For single files: A tuple (pageid, revid) for the row with the highest pageid,
or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid) for each partition,
or None if no partitions exist.
"""
try:
if partition_namespaces:
return _get_resume_point_partitioned(output_file)
else:
return _get_resume_point_single_file(output_file)
except Exception as e:
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
return None
def _get_resume_point_partitioned(output_file):
"""Find per-namespace resume points from partitioned output.
Returns a dict mapping namespace -> (max_pageid, max_revid) for each partition.
This allows resume to correctly handle cases where different namespaces have
different progress due to interleaved dump ordering.
"""
partition_dir = os.path.dirname(output_file)
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
return None
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
if not namespace_dirs:
return None
resume_points = {}
for ns_dir in namespace_dirs:
ns = int(ns_dir.split('=')[1])
ns_path = os.path.join(partition_dir, ns_dir)
# Find parquet files in this namespace directory
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet')]
if not parquet_files:
continue
# Read all parquet files in this namespace
for pq_file in parquet_files:
pq_path = os.path.join(ns_path, pq_file)
try:
pf = pq.ParquetFile(pq_path)
table = pf.read(columns=['articleid', 'revid'])
if table.num_rows == 0:
continue
max_pageid = pc.max(table['articleid']).as_py()
mask = pc.equal(table['articleid'], max_pageid)
max_revid = pc.max(pc.filter(table['revid'], mask)).as_py()
# Keep the highest pageid for this namespace
if ns not in resume_points or max_pageid > resume_points[ns][0]:
resume_points[ns] = (max_pageid, max_revid)
except Exception as e:
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
continue
return resume_points if resume_points else None
def _get_resume_point_single_file(output_file):
"""Find resume point from a single parquet file."""
if not os.path.exists(output_file):
return None
if os.path.isdir(output_file):
return None
# Find the row with the highest pageid
pf = pq.ParquetFile(output_file)
table = pf.read(columns=['articleid', 'revid'])
if table.num_rows == 0:
return None
max_pageid = pc.max(table['articleid']).as_py()
# Filter to row(s) with max pageid and get max revid
mask = pc.equal(table['articleid'], max_pageid)
max_revid = pc.max(pc.filter(table['revid'], mask)).as_py()
return (max_pageid, max_revid)
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by streaming row groups from original and temp into merged.
This is memory-efficient: only one row group is loaded at a time.
Returns True if merged file was created, False if both sources were empty.
"""
original_pq = pq.ParquetFile(original_path)
temp_pq = pq.ParquetFile(temp_path)
merged_writer = None
# Copy all row groups from the original file
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
return True
return False
def merge_partitioned_namespaces(partition_dir, temp_suffix):
"""
Merge partitioned namespace directories after resume.
For partitioned namespaces, temp files are written alongside the original files
in each namespace directory with the temp suffix appended to the filename.
E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp
Args:
partition_dir: The partition directory containing namespace=* subdirs
temp_suffix: The suffix appended to temp files (e.g., '.resume_temp')
"""
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
for ns_dir in namespace_dirs:
ns_path = os.path.join(partition_dir, ns_dir)
# Find all files in this namespace directory
files = os.listdir(ns_path)
# Find temp files (files ending with the temp suffix)
temp_files = [f for f in files if f.endswith(temp_suffix)]
for temp_file in temp_files:
temp_path = os.path.join(ns_path, temp_file)
# Original file is the temp file without the suffix
original_file = temp_file[:-len(temp_suffix)]
original_path = os.path.join(ns_path, original_file)
if os.path.exists(original_path):
# Merge the files
merged_path = original_path + ".merged"
merged = merge_parquet_files(original_path, temp_path, merged_path)
if merged:
# Replace the original file with the merged file
os.remove(original_path)
os.rename(merged_path, original_path)
os.remove(temp_path)
else:
# Both files were empty, just remove them
os.remove(original_path)
os.remove(temp_path)
else:
# No original file, rename temp to original
os.rename(temp_path, original_path)
def finalize_resume_merge(
original_output_file,
temp_output_file,
partition_namespaces,
original_partition_dir
):
"""
Finalize the resume by merging temp output with original output.
Args:
original_output_file: Path to the original output file
temp_output_file: Path to the temp output file written during resume
partition_namespaces: Whether using partitioned namespace output
original_partition_dir: The partition directory (for partitioned output)
Raises:
Exception: If merge fails (temp file is preserved for recovery)
"""
import shutil
print("Merging resumed data with existing output...", file=sys.stderr)
try:
if partition_namespaces and original_partition_dir is not None:
# For partitioned namespaces, temp files are written alongside originals
# with '.resume_temp' suffix in each namespace directory.
merge_partitioned_namespaces(original_partition_dir, ".resume_temp")
# Clean up the empty temp directory we created
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
# Merge single parquet files
merged_output_file = original_output_file + ".merged"
merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
# Replace the original file with the merged file
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
# Clean up the temp file
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr)
except Exception as e:
print(f"Error merging resume data: {e}", file=sys.stderr)
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
raise
def setup_resume_temp_output(output_file, partition_namespaces):
"""
Set up temp output for resume mode.
Args:
output_file: The original output file path
partition_namespaces: Whether using partitioned namespace output
Returns:
Tuple of (original_output_file, temp_output_file, original_partition_dir)
or (None, None, None) if no existing output to resume from.
"""
import shutil
original_output_file = None
temp_output_file = None
original_partition_dir = None
# For partitioned namespaces, check if the partition directory exists
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_exists = os.path.isdir(partition_dir) and any(
d.startswith('namespace=') for d in os.listdir(partition_dir)
)
if output_exists:
original_partition_dir = partition_dir
else:
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
if output_exists:
original_output_file = output_file
temp_output_file = output_file + ".resume_temp"
# Remove temp file/dir if it exists from a previous failed run
if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
# For partitioned namespaces, create an empty temp directory
# (actual temp files go in namespace=* dirs with .resume_temp suffix)
if partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True)
return original_output_file, temp_output_file, original_partition_dir

View File

@ -1,6 +1,7 @@
import os import os
import shutil import shutil
import subprocess import subprocess
import sys
import tracemalloc import tracemalloc
from io import StringIO from io import StringIO
from typing import Final, Union from typing import Final, Union
@ -539,97 +540,108 @@ def test_resume_with_diff():
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces(): def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces.""" """Test that --resume works correctly with --partition-namespaces.
import pyarrow.parquet as pq
# First, create a complete baseline output with partition-namespaces Interrupts wikiq partway through processing, then resumes and verifies
tester_full = WikiqTester(SAILORMOON, "resume_partition_full", in_compression="7z", out_format="parquet") the result matches an uninterrupted run. Uses --flush-per-batch to ensure
data is written to disk after each batch, making interruption deterministic.
"""
import signal
import time
import pyarrow.dataset as ds
# Use separate subdirectories for full and partial runs to isolate them
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
# Clean up any existing output directories from previous runs
for output_dir in [full_dir, partial_dir]:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
# Paths within each isolated directory
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
# Run wikiq fully to get baseline output
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
try: try:
tester_full.call_wikiq("--partition-namespaces", "--fandom-2020") subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8")) pytest.fail(exc.stderr.decode("utf8"))
# Read the full output from the partitioned directory # Read full output
full_output_dir = tester_full.output full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
namespace_dirs = [d for d in os.listdir(full_output_dir) if d.startswith('namespace=')] full_df = full_dataset.to_table().to_pandas()
total_rows = len(full_df)
print(f"Full run produced {total_rows} rows")
if not namespace_dirs: # Start wikiq for the interrupted run (use list args so SIGTERM goes to Python)
pytest.fail("No namespace directories found in output") batch_size = 10
cmd_partial = [
sys.executable, WIKIQ, input_file,
"-o", partial_output,
"--batch-size", str(batch_size),
"--partition-namespaces"
]
print(f"Starting: {' '.join(cmd_partial)}")
# Collect all revisions from all namespaces proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
full_revids = []
for ns_dir in sorted(namespace_dirs):
parquet_files = [f for f in os.listdir(os.path.join(full_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(full_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
full_revids.extend(revids)
full_revids_sorted = sorted(set(full_revids)) # Wait a short time to allow some processing
total_revisions = len(full_revids_sorted) interrupt_delay = 5 # seconds - enough for some pages but not all
time.sleep(interrupt_delay)
# Get a revid about 1/3 through to use as the resume point if proc.poll() is not None:
resume_idx = total_revisions // 3 pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
resume_revid = full_revids_sorted[resume_idx]
print(f"Total revisions: {total_revisions}, Resume point: {resume_idx}, Resume revid: {resume_revid}") # Simulate SLURM job termination: send SIGUSR1 first (early warning),
# then wait for graceful shutdown, then SIGTERM if still running
print(f"Sending SIGUSR1 after {interrupt_delay}s")
proc.send_signal(signal.SIGUSR1)
# Create a partial output by manually creating the partitioned structure # Wait for graceful shutdown
tester_partial = WikiqTester(SAILORMOON, "resume_partition_partial", in_compression="7z", out_format="parquet")
partial_output_dir = tester_partial.output
# Copy the full partitioned output to the partial directory
for ns_dir in namespace_dirs:
src_ns_path = os.path.join(full_output_dir, ns_dir)
dst_ns_path = os.path.join(partial_output_dir, ns_dir)
shutil.copytree(src_ns_path, dst_ns_path)
# Now filter each namespace file to only include revisions up to resume_idx
revised_data_count = 0
for ns_dir in namespace_dirs:
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read()
# Filter to only rows up to the resume point
revids = table.column('revid').to_pylist()
mask = pa.array([revid <= resume_revid for revid in revids], type=pa.bool_())
partial_table = table.filter(mask)
revised_data_count += len(partial_table)
# Write back the filtered data
pq.write_table(partial_table, ns_parquet_path)
print(f"Created partial output with {revised_data_count} revisions (up to revid {resume_revid})")
# Now resume from the partial output
try: try:
tester_partial.call_wikiq("--partition-namespaces", "--fandom-2020", "--resume") proc.wait(timeout=5)
print("Process exited gracefully after SIGUSR1")
except subprocess.TimeoutExpired:
# Process didn't exit, send SIGTERM
print("Sending SIGTERM after SIGUSR1 timeout")
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=30)
# Read interrupted output
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
interrupted_rows = interrupted_dataset.count_rows()
print(f"Interrupted run wrote {interrupted_rows} rows")
assert interrupted_rows < total_rows, \
f"Process wrote all {interrupted_rows} rows before being killed"
# Resume
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8")) pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output and collect revids # Read resumed output
resumed_revids = [] resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
for ns_dir in namespace_dirs: resumed_df = resumed_dataset.to_table().to_pandas()
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
resumed_revids.extend(revids)
resumed_revids_sorted = sorted(set(resumed_revids)) # Check revid sets match (the important invariant)
full_revids = set(full_df['revid'])
resumed_revids = set(resumed_df['revid'])
missing_revids = full_revids - resumed_revids
extra_revids = resumed_revids - full_revids
assert missing_revids == set() and extra_revids == set(), \
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
assert len(resumed_df) == len(full_df), \
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
# Compare the revids print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
assert resumed_revids_sorted == full_revids_sorted, f"Resumed revids mismatch: {len(resumed_revids_sorted)} vs {len(full_revids_sorted)}"
print(f"Resume with partition-namespaces test passed! Original: {len(full_revids_sorted)} revisions, Resumed: {len(resumed_revids_sorted)} revisions")
def test_external_links_only(): def test_external_links_only():
@ -963,3 +975,71 @@ def test_headings():
assert actual_list == expected, f"Row {idx}: headings mismatch" assert actual_list == expected, f"Row {idx}: headings mismatch"
print(f"Headings test passed! {len(test)} rows processed") print(f"Headings test passed! {len(test)} rows processed")
def test_resume_file_not_found():
"""Test that --resume exits with error when output file doesn't exist."""
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
# Ensure the output file does not exist
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
if os.path.exists(expected_output):
os.remove(expected_output)
try:
tester.call_wikiq("--resume")
pytest.fail("Expected error when --resume is used but output file doesn't exist")
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode("utf8")
assert "Error: --resume specified but output file not found" in stderr, \
f"Expected error message about missing output file, got: {stderr}"
print("Resume file not found test passed!")
def test_resume_simple():
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
import pyarrow.parquet as pq
# First, create a complete baseline output (no fandom-2020, no partition-namespaces)
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get a revid about 1/3 through to use as the resume point
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by slicing the table
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")