27 Commits

Author SHA1 Message Date
Nathan TeBlunthuis
6a4bf81e1a add test for two wikiq jobs in the same directory. 2025-12-19 11:50:56 -08:00
Nathan TeBlunthuis
38dabd0547 only merge the correct partitioned files. 2025-12-19 11:47:18 -08:00
Nathan TeBlunthuis
006feb795c fix interruption handling by breaking the diff loop. 2025-12-18 18:00:30 -08:00
Nathan TeBlunthuis
d7f5abef2d resume starts fresh if the first run didn't happen 2025-12-13 15:41:44 -08:00
Nathan TeBlunthuis
2c54425726 use the wikidiff2 diff timeout instead of async. 2025-12-13 14:29:16 -08:00
Nathan TeBlunthuis
5d1a246898 don't try to remove files that don't exist. 2025-12-13 11:57:47 -08:00
Nathan TeBlunthuis
70a10db228 save work after a time limit. 2025-12-11 08:30:32 -08:00
Nathan TeBlunthuis
1001c780fa start fresh if output and resume are both broken. 2025-12-10 21:20:52 -08:00
Nathan TeBlunthuis
6b4f3939a5 more work on resuming. 2025-12-10 21:07:52 -08:00
Nathan TeBlunthuis
c3d31b4ab5 handle case when we have a valid resume file, but a corrupted original. 2025-12-10 20:33:04 -08:00
Nathan TeBlunthuis
f4a9491ff2 improve print debugging. 2025-12-10 19:50:47 -08:00
Nathan TeBlunthuis
c6e96c2f54 try/catch opening original file in resume. 2025-12-10 19:49:29 -08:00
Nathan TeBlunthuis
f427291fd8 add logic for resuming after a resume. 2025-12-10 19:26:54 -08:00
Nathan TeBlunthuis
d1fc094c96 don't put checkpoint files inside namespace directories. 2025-12-07 06:24:04 -08:00
Nathan TeBlunthuis
783f5fd8bc improve resume logic. 2025-12-07 06:06:26 -08:00
Nathan TeBlunthuis
577ddc87f5 Add per-namespace resume support for partitioned parquet output.
- Implement per-namespace resume points (dict mapping namespace -> (pageid, revid))
  to correctly handle interleaved dump ordering in partitioned output
- Extract resume functionality to dedicated resume.py module
- Add graceful shutdown handling via shutdown_requested flag (CLI-level only)
- Use lazy ParquetWriter creation to avoid empty files on early exit
- Refactor writing logic to _write_batch() helper method
- Simplify control flow by replacing continue statements with should_write flag
2025-12-06 06:56:19 -08:00
Nathan TeBlunthuis
d69d8b0df2 fix baseline output for new columns. 2025-12-02 19:22:08 -08:00
Nathan TeBlunthuis
5ce9808b50 add templates and headings to wikiq. 2025-12-02 17:51:08 -08:00
Nathan TeBlunthuis
d3517ed5ca extract wikilinks. 2025-12-02 14:09:29 -08:00
Nathan TeBlunthuis
329341efb6 improve tests. 2025-12-02 13:52:12 -08:00
Nathan TeBlunthuis
76626a2785 Start working on adding columns from mwparserfromhell. 2025-12-02 12:26:03 -08:00
Nathan TeBlunthuis
b46f98a875 make --resume work with partitioned namespaces. 2025-12-01 07:19:52 -08:00
Nathan TeBlunthuis
3c26185739 enable --resuming from interrupted jobs. 2025-11-30 20:36:31 -08:00
Nathan TeBlunthuis
95b33123e3 revert previous and decrease timeout. 2025-11-28 20:29:51 -08:00
Nathan TeBlunthuis
5c4fc6d5a0 let cache capacity be large. 2025-11-28 19:22:43 -08:00
Nathan TeBlunthuis
77c7d2ba97 Merge branch 'compute-diffs' of gitea:collective/mediawiki_dump_tools into compute-diffs 2025-11-24 11:03:24 -08:00
Nathan TeBlunthuis
c40930d7d2 use ssh for gitea. 2025-11-24 11:01:36 -08:00
11 changed files with 2130 additions and 126 deletions

View File

@@ -8,6 +8,7 @@ dependencies = [
"deltas>=0.7.0",
"mediawiki-utilities>=0.4.18",
"more-itertools>=10.7.0",
"mwparserfromhell>=0.6.0",
"mwpersistence>=0.2.4",
"mwreverts>=0.1.5",
"mwtypes>=0.4.0",
@@ -33,7 +34,7 @@ packages = ["src/wikiq"]
yamlconf = { git = "https://github.com/groceryheist/yamlconf" }
mwxml = { git = "https://github.com/groceryheist/python-mwxml" }
deltas = { git = "https://github.com/groceryheist/deltas" }
pywikidiff2 = { git = "https://gitea.communitydata.science/groceryheist/pywikidiff2@1cd3b13aea89a8abb0cee38dfd7cc6ab4d0980f9" }
pywikidiff2 = { git = "ssh://gitea@gitea.communitydata.science:2200/groceryheist/pywikidiff2.git"}
[dependency-groups]
dev = [

View File

@@ -8,7 +8,10 @@ import gc
import json
import os.path
import re
import signal
import sys
import threading
import time
from collections import deque
from hashlib import sha1
from io import TextIOWrapper
@@ -23,14 +26,21 @@ import pywikidiff2
from deltas.tokenizers import wikitext_split
from more_itertools import ichunked
from mwxml import Dump
import asyncio
import wikiq.tables as tables
from wikiq.tables import RevisionTable
from wikiq.wiki_diff_matcher import WikiDiffMatcher
from wikiq.wikitext_parser import WikitextParser
from wikiq.resume import (
get_resume_point,
setup_resume_temp_output,
finalize_resume_merge,
get_checkpoint_path,
cleanup_interrupted_resume,
)
TO_ENCODE = ("title", "editor")
PERSISTENCE_RADIUS = 7
DIFF_TIMEOUT = 60
DIFF_TIMEOUT_MS = 60000
from pathlib import Path
import pyarrow as pa
@@ -47,15 +57,10 @@ class PersistMethod:
wikidiff2 = 4
async def diff_async(differ, last_text, text):
try:
loop = asyncio.get_running_loop()
return await asyncio.wait_for(
asyncio.to_thread(differ.inline_json_diff, last_text, text),
timeout=DIFF_TIMEOUT
)
except TimeoutError as e:
return None
def diff_with_timeout(differ, last_text, text):
"""Returns (result, timed_out) tuple using native pywikidiff2 timeout."""
result = differ.inline_json_diff(last_text, text, timeout_ms=DIFF_TIMEOUT_MS)
return result, differ.timed_out()
def calculate_persistence(tokens_added):
@@ -241,10 +246,21 @@ class WikiqParser:
output_parquet: bool = True,
batch_size: int = 1024,
partition_namespaces: bool = False,
resume_point: Union[tuple, dict, None] = None,
external_links: bool = False,
citations: bool = False,
wikilinks: bool = False,
templates: bool = False,
headings: bool = False,
time_limit_seconds: Union[float, None] = None,
):
"""
Parameters:
persist : what persistence method to use. Takes a PersistMethod value
resume_point : if set, either a (pageid, revid) tuple for single-file output,
or a dict mapping namespace -> (pageid, revid) for partitioned output.
For single-file: skip all revisions up to
and including this point
"""
self.input_file = input_file
@@ -255,6 +271,14 @@ class WikiqParser:
self.diff = diff
self.text = text
self.partition_namespaces = partition_namespaces
self.resume_point = resume_point
self.external_links = external_links
self.citations = citations
self.wikilinks = wikilinks
self.templates = templates
self.headings = headings
self.shutdown_requested = False
self.time_limit_seconds = time_limit_seconds
if namespaces is not None:
self.namespace_filter = set(namespaces)
else:
@@ -283,6 +307,86 @@ class WikiqParser:
else:
self.output_file = open(output_file, "wb")
# Checkpoint file for tracking resume point
self.checkpoint_file = None
self.checkpoint_state = {} # namespace -> (pageid, revid) or None -> (pageid, revid)
def request_shutdown(self):
"""Request graceful shutdown. The process() method will exit after completing the current batch."""
self.shutdown_requested = True
def _time_limit_expired(self):
"""Timer callback when time limit is reached."""
hours = self.time_limit_seconds / 3600
print(f"Time limit of {hours:.2f} hours reached, requesting shutdown...", file=sys.stderr)
self.request_shutdown()
def _start_time_limit_timer(self):
"""Start a background timer to trigger shutdown when time limit is reached."""
if self.time_limit_seconds is None:
return None
timer = threading.Timer(self.time_limit_seconds, self._time_limit_expired)
timer.daemon = True
timer.start()
return timer
def _cancel_time_limit_timer(self, timer):
"""Cancel the time limit timer if it's still running."""
if timer is not None:
timer.cancel()
def _open_checkpoint(self, output_file):
"""Open checkpoint file for writing. Keeps file open for performance."""
if not self.output_parquet or output_file == sys.stdout.buffer:
return
checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces)
Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True)
self.checkpoint_file = open(checkpoint_path, 'w')
print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr)
def _update_checkpoint(self, pageid, revid, namespace=None):
"""Update checkpoint state and write to file."""
if self.checkpoint_file is None:
return
if self.partition_namespaces:
self.checkpoint_state[namespace] = {"pageid": pageid, "revid": revid}
else:
self.checkpoint_state = {"pageid": pageid, "revid": revid}
self.checkpoint_file.seek(0)
self.checkpoint_file.truncate()
json.dump(self.checkpoint_state, self.checkpoint_file)
self.checkpoint_file.flush()
def _close_checkpoint(self, delete=False):
"""Close checkpoint file, optionally deleting it."""
if self.checkpoint_file is None:
return
checkpoint_path = self.checkpoint_file.name
self.checkpoint_file.close()
self.checkpoint_file = None
if delete and os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
print(f"Checkpoint file deleted (processing complete): {checkpoint_path}", file=sys.stderr)
else:
print(f"Checkpoint file preserved for resume: {checkpoint_path}", file=sys.stderr)
def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace=None):
"""Write a batch of rows to the appropriate writer.
For partitioned output, creates writer lazily if needed.
Returns the writer used (for non-partitioned output, same as input).
"""
if self.partition_namespaces and namespace is not None:
if namespace not in pq_writers:
ns_path = ns_paths[namespace]
Path(ns_path).parent.mkdir(exist_ok=True, parents=True)
pq_writers[namespace] = pq.ParquetWriter(
ns_path, schema, flavor="spark", sorting_columns=sorting_cols
)
writer = pq_writers[namespace]
writer.write(pa.record_batch(row_buffer, schema=schema))
return writer
def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]:
if (patterns is not None and labels is not None) and (
len(patterns) == len(labels)
@@ -336,10 +440,32 @@ class WikiqParser:
return default_ns
def process(self):
# create a regex that creates the output filename
# output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$',
# r'output/wikiq-\1-\2.tsv',
# input_filename)
# Start time limit timer if configured
time_limit_timer = self._start_time_limit_timer()
# Track whether we've passed the resume point
# For partitioned output, this is a dict mapping namespace -> bool
if self.resume_point is None:
found_resume_point = True
elif self.partition_namespaces:
found_resume_point = {}
else:
found_resume_point = False
# When resuming with parquet, write new data to temp file/directory and merge at the end
original_output_file = None
temp_output_file = None
original_partition_dir = None
if self.resume_point is not None and self.output_parquet:
original_output_file, temp_output_file, original_partition_dir = \
setup_resume_temp_output(self.output_file, self.partition_namespaces)
if temp_output_file is not None:
self.output_file = temp_output_file
# Open checkpoint file for tracking resume point
# Use original_output_file if resuming, otherwise self.output_file
checkpoint_output = original_output_file if original_output_file else self.output_file
self._open_checkpoint(checkpoint_output)
# Construct dump file iterator
dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
@@ -371,6 +497,29 @@ class WikiqParser:
if self.collapse_user:
table.columns.append(tables.RevisionCollapsed())
# Create shared parser if any wikitext feature is enabled
if self.external_links or self.citations or self.wikilinks or self.templates or self.headings:
wikitext_parser = WikitextParser()
if self.external_links:
table.columns.append(tables.RevisionExternalLinks(wikitext_parser))
if self.citations:
table.columns.append(tables.RevisionCitations(wikitext_parser))
if self.wikilinks:
table.columns.append(tables.RevisionWikilinks(wikitext_parser))
if self.templates:
table.columns.append(tables.RevisionTemplates(wikitext_parser))
if self.headings:
table.columns.append(tables.RevisionHeadings(wikitext_parser))
# Add parser timeout tracking if any wikitext feature is enabled
if self.external_links or self.citations or self.wikilinks or self.templates or self.headings:
table.columns.append(tables.RevisionParserTimeout(wikitext_parser))
# extract list of namespaces
self.namespaces = {
ns.name: ns.id for ns in dump.mwiterator.site_info.namespaces
@@ -388,6 +537,7 @@ class WikiqParser:
from wikiq.diff_pyarrow_schema import diff_field
schema = schema.append(diff_field)
schema = schema.append(pa.field("diff_timeout", pa.bool_()))
if self.diff and self.persist == PersistMethod.none:
table.columns.append(tables.RevisionText())
@@ -422,6 +572,8 @@ class WikiqParser:
flavor="spark",
sorting_columns=sorting_cols,
)
ns_paths = {}
pq_writers = {}
else:
output_path = Path(self.output_file)
if self.namespace_filter is not None:
@@ -432,14 +584,9 @@ class WikiqParser:
ns: (output_path.parent / f"namespace={ns}") / output_path.name
for ns in namespaces
}
for path in ns_paths.values():
Path(path).parent.mkdir(exist_ok=True, parents=True)
pq_writers = {
ns: pq.ParquetWriter(
path, schema, flavor="spark", sorting_columns=sorting_cols
)
for ns, path in ns_paths.items()
}
# Writers are created lazily when first needed to avoid empty files on early exit
pq_writers = {}
writer = None # Not used for partitioned output
else:
writer = pacsv.CSVWriter(
@@ -447,6 +594,9 @@ class WikiqParser:
schema,
write_options=pacsv.WriteOptions(delimiter="\t"),
)
ns_paths = {}
pq_writers = {}
sorting_cols = None
regex_matches = {}
@@ -459,6 +609,42 @@ class WikiqParser:
if page.mwpage.namespace not in self.namespace_filter:
continue
# Resume logic: skip pages that come before the resume point.
# For partitioned output, each namespace has its own resume point.
is_resume_page = False
page_resume_point = None
if self.resume_point is not None:
page_id = page.mwpage.id
page_ns = page.mwpage.namespace
if self.partition_namespaces:
# Per-namespace resume: check if we've passed this namespace's resume point
if found_resume_point.get(page_ns, False):
pass # Already past resume point for this namespace
elif page_ns not in self.resume_point:
# No resume point for this namespace, process normally
found_resume_point[page_ns] = True
else:
resume_pageid, resume_revid = self.resume_point[page_ns]
if page_id < resume_pageid:
continue
elif page_id == resume_pageid:
is_resume_page = True
page_resume_point = (resume_pageid, resume_revid)
else:
found_resume_point[page_ns] = True
else:
# Single-file resume: global resume point
if not found_resume_point:
resume_pageid, resume_revid = self.resume_point
if page_id < resume_pageid:
continue
elif page_id == resume_pageid:
is_resume_page = True
page_resume_point = (resume_pageid, resume_revid)
else:
found_resume_point = True
# Disable detecting reverts if radius is 0.
if self.revert_radius > 0:
reverts_column.rev_detector = mwreverts.Detector(
@@ -565,6 +751,15 @@ class WikiqParser:
regex_matches[k] = []
regex_matches[k].append(v)
# Check for shutdown after each revision
if self.shutdown_requested:
break
# If shutdown requested, skip all remaining processing and close writers
if self.shutdown_requested:
print("Shutdown requested, closing writers...", file=sys.stderr)
break
# Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them.
batch_row_buffer = table.pop()
if self.persist != PersistMethod.none:
@@ -678,13 +873,20 @@ class WikiqParser:
if self.diff:
last_text = last_rev_text
new_diffs = []
diff_timeouts = []
for i, text in enumerate(row_buffer["text"]):
diff = asyncio.run(diff_async(differ, last_text, text))
if diff is None:
if self.shutdown_requested:
break
diff, timed_out = diff_with_timeout(differ, last_text, text)
if timed_out:
print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr)
diff = fast_differ.inline_json_diff(last_text, text)
new_diffs.append(diff)
diff_timeouts.append(timed_out)
last_text = text
if self.shutdown_requested:
print("Shutdown requested, closing writers...", file=sys.stderr)
break
row_buffer["diff"] = [
[
entry
@@ -693,6 +895,7 @@ class WikiqParser:
]
for diff in new_diffs
]
row_buffer["diff_timeout"] = diff_timeouts
# end persistence logic
if self.diff or self.persist != PersistMethod.none:
@@ -702,12 +905,48 @@ class WikiqParser:
if not self.text and self.persist != PersistMethod.none:
del row_buffer["text"]
if self.partition_namespaces is True:
writer = pq_writers[page.mwpage.namespace]
writer.write(pa.record_batch(row_buffer, schema=schema))
# Filter for resume logic if on resume page
should_write = True
if is_resume_page:
_, resume_revid = page_resume_point
revids = row_buffer["revid"]
resume_idx = next((i for i, r in enumerate(revids) if r == resume_revid), None)
if resume_idx is not None:
# Mark resume point as found
if self.partition_namespaces:
found_resume_point[page.mwpage.namespace] = True
else:
found_resume_point = True
is_resume_page = False
# Only write revisions after the resume point
if resume_idx + 1 < len(revids):
row_buffer = {k: v[resume_idx + 1:] for k, v in row_buffer.items()}
print(f"Resuming output starting at revid {row_buffer['revid'][0]}", file=sys.stderr)
else:
should_write = False
else:
should_write = False
# Write batch if there are rows
if should_write and len(row_buffer.get("revid", [])) > 0:
namespace = page.mwpage.namespace if self.partition_namespaces else None
self._write_batch(row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace)
# Update checkpoint with last written position
last_pageid = row_buffer["articleid"][-1]
last_revid = row_buffer["revid"][-1]
self._update_checkpoint(last_pageid, last_revid, namespace)
gc.collect()
# If shutdown was requested, break from page loop
if self.shutdown_requested:
break
page_count += 1
# Cancel time limit timer
self._cancel_time_limit_timer(time_limit_timer)
print(
"Done: %s revisions and %s pages." % (rev_count, page_count),
file=sys.stderr,
@@ -718,6 +957,17 @@ class WikiqParser:
else:
writer.close()
# If we were resuming, merge the original file with the new temp file
if original_output_file is not None and temp_output_file is not None:
finalize_resume_merge(
original_output_file,
temp_output_file,
self.partition_namespaces,
original_partition_dir
)
# Close checkpoint file; delete it only if we completed without interruption
self._close_checkpoint(delete=not self.shutdown_requested)
def match_archive_suffix(input_filename):
if re.match(r".*\.7z$", input_filename):
@@ -886,6 +1136,46 @@ def main():
help="Output the text of the revision.",
)
parser.add_argument(
"--external-links",
dest="external_links",
action="store_true",
default=False,
help="Extract external links from each revision using mwparserfromhell.",
)
parser.add_argument(
"--citations",
dest="citations",
action="store_true",
default=False,
help="Extract citations (ref tags and cite templates) from each revision.",
)
parser.add_argument(
"--wikilinks",
dest="wikilinks",
action="store_true",
default=False,
help="Extract internal wikilinks from each revision.",
)
parser.add_argument(
"--templates",
dest="templates",
action="store_true",
default=False,
help="Extract templates with their parameters from each revision.",
)
parser.add_argument(
"--headings",
dest="headings",
action="store_true",
default=False,
help="Extract section headings from each revision.",
)
parser.add_argument(
"-PNS",
"--partition-namespaces",
@@ -910,6 +1200,21 @@ def main():
help="How many revisions to process in each batch. This ends up being the Parquet row group size",
)
parser.add_argument(
"--resume",
dest="resume",
action="store_true",
help="Resume processing from the last successfully written revision in the output file.",
)
parser.add_argument(
"--time-limit",
dest="time_limit",
type=float,
default=0,
help="Time limit in hours before graceful shutdown. Set to 0 to disable (default).",
)
args = parser.parse_args()
# set persistence method
@@ -933,9 +1238,7 @@ def main():
print(args, file=sys.stderr)
if len(args.dumpfiles) > 0:
for filename in args.dumpfiles:
input_file = open_input_file(filename, args.fandom_2020)
# open directory for output
# Determine output file path before opening input (so resume errors are caught early)
if args.output:
output = args.output[0]
else:
@@ -943,17 +1246,69 @@ def main():
output_parquet = output.endswith(".parquet")
print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout:
# Parquet libraries need a binary output, so just sys.stdout doesn't work.
output_file = sys.stdout.buffer
elif os.path.isdir(output) or output_parquet:
filename = os.path.join(output, os.path.basename(filename))
output_file = get_output_filename(filename, parquet=output_parquet)
output_filename = os.path.join(output, os.path.basename(filename))
output_file = get_output_filename(output_filename, parquet=output_parquet)
else:
output_file = output
# Handle resume functionality before opening input file
resume_point = None
start_fresh = False
if args.resume:
if output_parquet and not args.stdout:
# First, merge any leftover temp files from a previous interrupted run
cleanup_result = cleanup_interrupted_resume(output_file, args.partition_namespaces)
if cleanup_result == "start_fresh":
# All data was corrupted, start from beginning
start_fresh = True
print("Starting fresh due to data corruption.", file=sys.stderr)
else:
resume_point = get_resume_point(output_file, args.partition_namespaces)
if resume_point is not None:
if args.partition_namespaces:
ns_list = sorted(resume_point.keys())
print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr)
for ns in ns_list:
pageid, revid = resume_point[ns]
print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr)
else:
pageid, revid = resume_point
print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr)
else:
# resume_point is None - check if file exists but is corrupt
if args.partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
corrupt_files = []
if os.path.isdir(partition_dir):
for d in os.listdir(partition_dir):
if d.startswith('namespace='):
filepath = os.path.join(partition_dir, d, output_filename)
if os.path.exists(filepath):
corrupt_files.append(filepath)
if corrupt_files:
print("Output files exist but are corrupt, deleting and starting fresh.", file=sys.stderr)
for filepath in corrupt_files:
os.remove(filepath)
start_fresh = True
else:
if os.path.exists(output_file):
# File exists but is corrupt - start fresh
print(f"Output file {output_file} exists but is corrupt, starting fresh.", file=sys.stderr)
os.remove(output_file)
start_fresh = True
else:
sys.exit("Error: --resume only works with parquet output (not stdout or TSV)")
# Now open the input file
print("Processing file: %s" % filename, file=sys.stderr)
input_file = open_input_file(filename, args.fandom_2020)
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
wikiq = WikiqParser(
input_file,
output_file,
@@ -970,14 +1325,44 @@ def main():
output_parquet=output_parquet,
partition_namespaces=args.partition_namespaces,
batch_size=args.batch_size,
resume_point=resume_point,
external_links=args.external_links,
citations=args.citations,
wikilinks=args.wikilinks,
templates=args.templates,
headings=args.headings,
time_limit_seconds=time_limit_seconds,
)
# Register signal handlers for graceful shutdown (CLI only)
def handle_shutdown(signum, frame):
sig_name = signal.Signals(signum).name
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
wikiq.request_shutdown()
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
original_sigusr2 = signal.signal(signal.SIGUSR2, handle_shutdown)
try:
wikiq.process()
finally:
# Restore original signal handlers
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1)
signal.signal(signal.SIGUSR2, original_sigusr2)
# close things
input_file.close()
else:
if args.resume:
print("Warning: --resume cannot be used with stdin/stdout", file=sys.stderr)
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
wikiq = WikiqParser(
sys.stdin,
sys.stdout,
@@ -993,9 +1378,32 @@ def main():
diff=args.diff,
text=args.text,
batch_size=args.batch_size,
resume_point=None,
external_links=args.external_links,
citations=args.citations,
wikilinks=args.wikilinks,
templates=args.templates,
headings=args.headings,
time_limit_seconds=time_limit_seconds,
)
# Register signal handlers for graceful shutdown (CLI only)
def handle_shutdown(signum, frame):
sig_name = signal.Signals(signum).name
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
wikiq.request_shutdown()
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
try:
wikiq.process()
finally:
# Restore original signal handlers
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1)
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
# stop_words = stop_words.split(",")

542
src/wikiq/resume.py Normal file
View File

@@ -0,0 +1,542 @@
"""
Checkpoint and resume functionality for wikiq parquet output.
This module handles:
- Finding resume points in existing parquet output
- Merging resumed data with existing output (streaming, memory-efficient)
- Checkpoint file management for fast resume point lookup
"""
import json
import os
import sys
import pyarrow.parquet as pq
def cleanup_interrupted_resume(output_file, partition_namespaces):
"""
Merge any leftover .resume_temp files from a previous interrupted run.
This should be called BEFORE get_resume_point() so the resume point
is calculated from the merged data.
Returns:
None - no temp files found or normal merge completed
"start_fresh" - both original and temp were corrupted and deleted
"""
import shutil
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
temp_suffix = ".resume_temp"
if not os.path.isdir(partition_dir):
return
has_old_temp_files = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
if os.path.exists(temp_path):
has_old_temp_files = True
break
if has_old_temp_files:
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
# Check if any valid data remains after merge
has_valid_data = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
ns_path = os.path.join(partition_dir, ns_dir)
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')]
if parquet_files:
has_valid_data = True
break
if had_corruption and not has_valid_data:
# All data was corrupted, remove checkpoint and start fresh
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
print("All partitioned files were corrupted, will start fresh.", file=sys.stderr)
return "start_fresh"
print("Previous temp files merged successfully.", file=sys.stderr)
else:
temp_output_file = output_file + ".resume_temp"
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr)
merged_path = output_file + ".merged"
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
if merged == "original_only":
# Temp file was invalid, just remove it
os.remove(temp_output_file)
elif merged == "temp_only":
# Original was corrupted or missing, use temp as new base
if os.path.exists(output_file):
os.remove(output_file)
os.rename(temp_output_file, output_file)
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
elif merged == "both_invalid":
# Both files corrupted or missing, remove both and start fresh
if os.path.exists(output_file):
os.remove(output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
# Also remove stale checkpoint file
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
print("Both files were corrupted, will start fresh.", file=sys.stderr)
return "start_fresh"
elif merged == "merged":
os.remove(output_file)
os.rename(merged_path, output_file)
os.remove(temp_output_file)
print("Previous temp file merged successfully.", file=sys.stderr)
else:
# Both empty - unusual
os.remove(temp_output_file)
def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file.
For partitioned output, the checkpoint is placed outside the partition directory
to avoid pyarrow trying to read it as a parquet file. The filename includes
the output filename to keep it unique per input file (for parallel jobs).
"""
if partition_namespaces:
# output_file is like partition_dir/output.parquet
# checkpoint should be at parent level: parent/output.parquet.checkpoint
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
parent_dir = os.path.dirname(partition_dir)
return os.path.join(parent_dir, output_filename + ".checkpoint")
return str(output_file) + ".checkpoint"
def read_checkpoint(output_file, partition_namespaces=False):
"""
Read resume point from checkpoint file if it exists.
Checkpoint format:
Single file: {"pageid": 54, "revid": 325}
Partitioned: {"0": {"pageid": 54, "revid": 325}, "1": {"pageid": 123, "revid": 456}}
Returns:
For single files: A tuple (pageid, revid), or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid), or None.
"""
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if not os.path.exists(checkpoint_path):
return None
try:
with open(checkpoint_path, 'r') as f:
data = json.load(f)
if not data:
return None
# Single-file format: {"pageid": ..., "revid": ...}
if "pageid" in data and "revid" in data:
return (data["pageid"], data["revid"])
# Partitioned format: {"0": {"pageid": ..., "revid": ...}, ...}
result = {}
for key, value in data.items():
result[int(key)] = (value["pageid"], value["revid"])
return result if result else None
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
return None
def get_resume_point(output_file, partition_namespaces=False):
"""
Find the resume point(s) from existing parquet output.
First checks for a checkpoint file (fast), then falls back to scanning
the parquet output (slow, for backwards compatibility).
Args:
output_file: Path to the output file. For single files, this is the parquet file path.
For partitioned namespaces, this is the path like dir/dump.parquet where
namespace=* subdirectories are in the parent dir.
partition_namespaces: Whether the output uses namespace partitioning.
Returns:
For single files: A tuple (pageid, revid) for the row with the highest pageid,
or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid) for each partition,
or None if no partitions exist.
"""
# First try checkpoint file (fast)
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
checkpoint_result = read_checkpoint(output_file, partition_namespaces)
if checkpoint_result is not None:
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
return checkpoint_result
# Fall back to scanning parquet (slow, for backwards compatibility)
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
try:
if partition_namespaces:
return _get_resume_point_partitioned(output_file)
else:
return _get_resume_point_single_file(output_file)
except Exception as e:
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
return None
def _get_last_row_resume_point(pq_path):
"""Get resume point by reading only the last row group of a parquet file.
Since data is written in page/revision order, the last row group contains
the highest pageid/revid, and the last row in that group is the resume point.
"""
pf = pq.ParquetFile(pq_path)
if pf.metadata.num_row_groups == 0:
return None
last_rg_idx = pf.metadata.num_row_groups - 1
table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid'])
if table.num_rows == 0:
return None
max_pageid = table['articleid'][-1].as_py()
max_revid = table['revid'][-1].as_py()
return (max_pageid, max_revid)
def _get_resume_point_partitioned(output_file):
"""Find per-namespace resume points from partitioned output.
Only looks for the specific output file in each namespace directory.
Returns a dict mapping namespace -> (max_pageid, max_revid) for each partition
where the output file exists.
Args:
output_file: Path like 'dir/output.parquet' where namespace=* subdirectories
contain files named 'output.parquet'.
"""
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
return None
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
if not namespace_dirs:
return None
resume_points = {}
for ns_dir in namespace_dirs:
ns = int(ns_dir.split('=')[1])
pq_path = os.path.join(partition_dir, ns_dir, output_filename)
if not os.path.exists(pq_path):
continue
try:
result = _get_last_row_resume_point(pq_path)
if result is not None:
resume_points[ns] = result
except Exception as e:
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
continue
return resume_points if resume_points else None
def _get_resume_point_single_file(output_file):
"""Find resume point from a single parquet file."""
if not os.path.exists(output_file):
return None
if os.path.isdir(output_file):
return None
return _get_last_row_resume_point(output_file)
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by streaming row groups from original and temp into merged.
This is memory-efficient: only one row group is loaded at a time.
Returns:
"merged" - merged file was created from both sources
"original_only" - temp was invalid, keep original unchanged
"temp_only" - original was corrupted but temp is valid, use temp
"both_invalid" - both files invalid, delete both and start fresh
False - both files were valid but empty
"""
original_valid = False
temp_valid = False
original_pq = None
temp_pq = None
try:
original_pq = pq.ParquetFile(original_path)
original_valid = True
except Exception as e:
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
try:
if not os.path.exists(temp_path):
print(f"Note: Temp file {temp_path} does not exist (namespace had no records after resume point)", file=sys.stderr)
else:
temp_pq = pq.ParquetFile(temp_path)
temp_valid = True
except Exception:
print(f"Note: No new data in temp file {temp_path} (file exists but is invalid)", file=sys.stderr)
if not original_valid and not temp_valid:
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
return "both_invalid"
if not original_valid and temp_valid:
print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr)
return "temp_only"
if original_valid and not temp_valid:
return "original_only"
merged_writer = None
# Copy all row groups from the original file
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
return "merged"
return False
def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
"""
Merge partitioned namespace directories after resume.
For partitioned namespaces, temp files are written alongside the original files
in each namespace directory with the temp suffix appended to the filename.
E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp
Args:
partition_dir: The partition directory containing namespace=* subdirs
temp_suffix: The suffix appended to temp files (e.g., '.resume_temp')
file_filter: Only process temp files matching this base name
(e.g., 'enwiki-20250123-pages-meta-history24-p53238682p53445302.parquet')
Returns:
True if at least one namespace has valid data after merge
False if all namespaces ended up with corrupted/deleted data
"""
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
had_corruption = False
expected_temp = file_filter + temp_suffix
for ns_dir in namespace_dirs:
ns_path = os.path.join(partition_dir, ns_dir)
temp_path = os.path.join(ns_path, expected_temp)
if not os.path.exists(temp_path):
continue
# Original file is the temp file without the suffix
original_file = file_filter
original_path = os.path.join(ns_path, original_file)
if os.path.exists(original_path):
# Merge the files
merged_path = original_path + ".merged"
merged = merge_parquet_files(original_path, temp_path, merged_path)
if merged == "original_only":
# Temp file was invalid (no new data), keep original unchanged
if os.path.exists(temp_path):
os.remove(temp_path)
elif merged == "temp_only":
# Original was corrupted, use temp as new base
os.remove(original_path)
os.rename(temp_path, original_path)
elif merged == "both_invalid":
# Both files corrupted, remove both
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
elif merged == "merged":
# Replace the original file with the merged file
os.remove(original_path)
os.rename(merged_path, original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
# Both files were empty (False), just remove them
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
# No original file, rename temp to original only if valid
try:
pq.ParquetFile(temp_path)
os.rename(temp_path, original_path)
except Exception:
# Temp file invalid or missing, just remove it if it exists
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
return had_corruption
def finalize_resume_merge(
original_output_file,
temp_output_file,
partition_namespaces,
original_partition_dir
):
"""
Finalize the resume by merging temp output with original output.
Args:
original_output_file: Path to the original output file
temp_output_file: Path to the temp output file written during resume
partition_namespaces: Whether using partitioned namespace output
original_partition_dir: The partition directory (for partitioned output)
Raises:
Exception: If merge fails (temp file is preserved for recovery)
"""
import shutil
print("Merging resumed data with existing output...", file=sys.stderr)
try:
if partition_namespaces and original_partition_dir is not None:
# For partitioned namespaces, temp files are written alongside originals
# with '.resume_temp' suffix in each namespace directory.
# Only merge temp files for the current dump file, not other concurrent jobs.
file_filter = os.path.basename(original_output_file)
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
# Clean up the empty temp directory we created
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
# Merge single parquet files
merged_output_file = original_output_file + ".merged"
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
if merged == "original_only":
# Temp file was invalid (no new data), keep original unchanged
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "temp_only":
# Original was corrupted, use temp as new base
os.remove(original_output_file)
os.rename(temp_output_file, original_output_file)
elif merged == "both_invalid":
# Both files corrupted, remove both
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "merged":
# Replace the original file with the merged file
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
else:
# Both files were empty (False) - unusual, but clean up
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr)
except Exception as e:
print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr)
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
raise
def setup_resume_temp_output(output_file, partition_namespaces):
"""
Set up temp output for resume mode.
Args:
output_file: The original output file path
partition_namespaces: Whether using partitioned namespace output
Returns:
Tuple of (original_output_file, temp_output_file, original_partition_dir)
or (None, None, None) if no existing output to resume from.
"""
import shutil
original_output_file = None
temp_output_file = None
original_partition_dir = None
# For partitioned namespaces, check if the specific output file exists in any namespace
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
output_exists = False
if os.path.isdir(partition_dir):
for d in os.listdir(partition_dir):
if d.startswith('namespace='):
if os.path.exists(os.path.join(partition_dir, d, output_filename)):
output_exists = True
break
if output_exists:
original_partition_dir = partition_dir
else:
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
if output_exists:
original_output_file = output_file
temp_output_file = output_file + ".resume_temp"
# Note: cleanup_interrupted_resume() should have been called before this
# to merge any leftover temp files from a previous interrupted run.
# Here we just clean up any remaining temp directory markers.
if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
if partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True)
return original_output_file, temp_output_file, original_partition_dir

View File

@@ -2,7 +2,7 @@ import sys
from abc import abstractmethod, ABC
from datetime import datetime, timezone
from hashlib import sha1
from typing import Generic, TypeVar, Union
from typing import Generic, TypeVar, Union, TYPE_CHECKING
import mwreverts
import mwtypes
@@ -10,6 +10,9 @@ import mwxml
import pyarrow as pa
if TYPE_CHECKING:
from wikiq.wikitext_parser import WikitextParser
T = TypeVar('T')
@@ -217,3 +220,108 @@ class RevisionText(RevisionField[str]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
revision = revisions[-1]
return revision.text
class RevisionExternalLinks(RevisionField[Union[list[str], None]]):
"""Extract all external links from revision text."""
field = pa.field("external_links", pa.list_(pa.string()), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_external_links(revision.text)
class RevisionCitations(RevisionField[Union[list[str], None]]):
"""Extract citations from ref tags and cite templates."""
field = pa.field("citations", pa.list_(pa.string()), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_citations(revision.text)
class RevisionWikilinks(RevisionField[Union[list[dict], None]]):
"""Extract all internal wikilinks from revision text."""
# Struct type with title and optional display text
field = pa.field("wikilinks", pa.list_(pa.struct([
pa.field("title", pa.string()),
pa.field("text", pa.string(), nullable=True),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_wikilinks(revision.text)
class RevisionTemplates(RevisionField[Union[list[dict], None]]):
"""Extract all templates from revision text."""
# Struct type with name and params map
field = pa.field("templates", pa.list_(pa.struct([
pa.field("name", pa.string()),
pa.field("params", pa.map_(pa.string(), pa.string())),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_templates(revision.text)
class RevisionHeadings(RevisionField[Union[list[dict], None]]):
"""Extract all section headings from revision text."""
# Struct type with level and text
field = pa.field("headings", pa.list_(pa.struct([
pa.field("level", pa.int8()),
pa.field("text", pa.string()),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_headings(revision.text)
class RevisionParserTimeout(RevisionField[bool]):
"""Track whether the wikitext parser timed out for this revision."""
field = pa.field("parser_timeout", pa.bool_())
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
return self.wikitext_parser.last_parse_timed_out

View File

@@ -0,0 +1,126 @@
"""Shared wikitext parser with caching to avoid duplicate parsing."""
from __future__ import annotations
import asyncio
import mwparserfromhell
PARSER_TIMEOUT = 60 # seconds
class WikitextParser:
"""Caches parsed wikicode to avoid re-parsing the same text."""
CITE_TEMPLATES = {
'cite web', 'cite book', 'cite journal', 'cite news',
'cite magazine', 'cite conference', 'cite encyclopedia',
'cite report', 'cite thesis', 'cite press release',
'citation', 'sfn', 'harvnb', 'harv'
}
def __init__(self):
self._cached_text: str | None = None
self._cached_wikicode = None
self.last_parse_timed_out: bool = False
async def _parse_async(self, text: str):
"""Parse wikitext with timeout protection."""
try:
result = await asyncio.wait_for(
asyncio.to_thread(mwparserfromhell.parse, text),
timeout=PARSER_TIMEOUT
)
return result, False
except TimeoutError:
return None, True
def _get_wikicode(self, text: str):
"""Parse text and cache result. Returns cached result if text unchanged."""
if text != self._cached_text:
self._cached_text = text
self._cached_wikicode, self.last_parse_timed_out = asyncio.run(self._parse_async(text))
return self._cached_wikicode
def extract_external_links(self, text: str | None) -> list[str] | None:
"""Extract all external link URLs from wikitext."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
return [str(link.url) for link in wikicode.filter_external_links()]
except Exception:
return None
def extract_citations(self, text: str | None) -> list[str] | None:
"""Extract citations from ref tags and cite templates."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
citations = []
# Extract ref tag contents
for tag in wikicode.filter_tags():
if tag.tag.lower() == 'ref':
content = str(tag.contents).strip()
if content:
citations.append(f"ref:{content}")
# Extract cite templates
for template in wikicode.filter_templates():
template_name = str(template.name).strip().lower()
if any(template_name.startswith(cite) for cite in self.CITE_TEMPLATES):
citations.append(f"template:{str(template)}")
return citations
except Exception:
return None
def extract_wikilinks(self, text: str | None) -> list[dict[str, str | None]] | None:
"""Extract all internal wikilinks with title and display text."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for link in wikicode.filter_wikilinks():
title = str(link.title).strip()
# text is None if no pipe, otherwise the display text
display_text = str(link.text).strip() if link.text else None
result.append({"title": title, "text": display_text})
return result
except Exception:
return None
def extract_templates(self, text: str | None) -> list[dict] | None:
"""Extract all templates with their names and parameters."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for template in wikicode.filter_templates():
name = str(template.name).strip()
params = {}
for param in template.params:
param_name = str(param.name).strip()
param_value = str(param.value).strip()
params[param_name] = param_value
result.append({"name": name, "params": params})
return result
except Exception:
return None
def extract_headings(self, text: str | None) -> list[dict] | None:
"""Extract all section headings with their levels."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for heading in wikicode.filter_headings():
level = heading.level
heading_text = str(heading.title).strip()
result.append({"level": level, "text": heading_text})
return result
except Exception:
return None

View File

@@ -1,110 +1,48 @@
import os
import shutil
import subprocess
import sys
import tracemalloc
from io import StringIO
from typing import Final, Union
import pytest
import numpy as np
import pandas as pd
import pyarrow as pa
import pytest
from pandas import DataFrame
from pandas.testing import assert_frame_equal, assert_series_equal
# Make references to files and wikiq relative to this file, not to the current working directory.
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
from wikiq_test_utils import (
BASELINE_DIR,
IKWIKI,
REGEXTEST,
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
TWINPEAKS,
WIKIQ,
WikiqTester,
)
def setup():
tracemalloc.start()
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
if not os.path.exists(TEST_OUTPUT_DIR):
os.mkdir(TEST_OUTPUT_DIR)
# Always run setup, even if this is executed via "python -m unittest" rather
# than as __main__.
setup()
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
# If case_name is unset, there are no relevant baseline or test files.
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
def test_WP_noargs():
tester = WikiqTester(IKWIKI, "noargs")
@@ -439,3 +377,338 @@ def test_parquet():
pytest.fail(exc)
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
def test_external_links_only():
"""Test that --external-links extracts external links correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "external_links_only", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify external_links column exists
assert "external_links" in test.columns, "external_links column should exist"
# Verify citations column does NOT exist
assert "citations" not in test.columns, "citations column should NOT exist when only --external-links is used"
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
# Verify that extracted URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify extraction matches mwparserfromhell for a sample of rows with text
rows_with_links = test[test["external_links"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_links) > 0:
# Test up to 5 rows
sample = rows_with_links.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"])
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
print(f"External links only test passed! {len(test)} rows, {len(all_urls)} total URLs extracted")
def test_citations_only():
"""Test that --citations extracts citations correctly."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "citations_only", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify citations column exists
assert "citations" in test.columns, "citations column should exist"
# Verify external_links column does NOT exist
assert "external_links" not in test.columns, "external_links column should NOT exist when only --citations is used"
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify that extracted citations have correct prefixes (ref: or template:)
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
rows_with_citations = test[test["citations"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_citations) > 0:
parser = WikitextParser()
# Test up to 5 rows
sample = rows_with_citations.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"])
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"Citations only test passed! {len(test)} rows, {len(all_citations)} total citations extracted")
def test_external_links_and_citations():
"""Test that both --external-links and --citations work together (shared parser)."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "external_links_and_citations", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify both columns exist
assert "external_links" in test.columns, "external_links column should exist"
assert "citations" in test.columns, "citations column should exist"
# Verify both columns have list/array types (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify citations have correct prefixes
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
# This tests that the shared parser optimization works correctly
parser = WikitextParser()
rows_with_content = test[
(test["external_links"].apply(lambda x: x is not None and len(x) > 0)) |
(test["citations"].apply(lambda x: x is not None and len(x) > 0))
]
if len(rows_with_content) > 0:
# Test up to 5 rows
sample = rows_with_content.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
# Verify external links
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"]) if row["external_links"] is not None else []
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
# Verify citations
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"]) if row["citations"] is not None else []
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"External links and citations test passed! {len(test)} rows, {len(all_urls)} URLs, {len(all_citations)} citations")
def test_no_wikitext_columns():
"""Test that neither external_links nor citations columns exist without flags."""
tester = WikiqTester(SAILORMOON, "no_wikitext_columns", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify neither column exists
assert "external_links" not in test.columns, "external_links column should NOT exist without --external-links flag"
assert "citations" not in test.columns, "citations column should NOT exist without --citations flag"
print(f"No wikitext columns test passed! {len(test)} rows processed")
def test_wikilinks():
"""Test that --wikilinks extracts internal wikilinks correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "wikilinks", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--wikilinks", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify wikilinks column exists
assert "wikilinks" in test.columns, "wikilinks column should exist"
# Verify column has list/array type
assert test["wikilinks"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_links = test[test["wikilinks"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_links) > 0:
sample = rows_with_links.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for link in wikicode.filter_wikilinks():
title = str(link.title).strip()
display_text = str(link.text).strip() if link.text else None
expected.append({"title": title, "text": display_text})
actual = list(row["wikilinks"])
# Convert to comparable format (pandas may read as dicts or named tuples)
actual_dicts = [{"title": item["title"], "text": item["text"]} for item in actual]
assert actual_dicts == expected, f"Row {idx}: wikilinks mismatch"
print(f"Wikilinks test passed! {len(test)} rows processed")
def test_templates():
"""Test that --templates extracts templates correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "templates", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--templates", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify templates column exists
assert "templates" in test.columns, "templates column should exist"
# Verify column has list/array type
assert test["templates"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_templates = test[test["templates"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_templates) > 0:
sample = rows_with_templates.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for template in wikicode.filter_templates():
name = str(template.name).strip()
params = {}
for param in template.params:
param_name = str(param.name).strip()
param_value = str(param.value).strip()
params[param_name] = param_value
expected.append({"name": name, "params": params})
actual = list(row["templates"])
# Convert to comparable format
actual_list = []
for item in actual:
actual_list.append({
"name": item["name"],
"params": dict(item["params"]) if item["params"] else {}
})
assert actual_list == expected, f"Row {idx}: templates mismatch"
print(f"Templates test passed! {len(test)} rows processed")
def test_headings():
"""Test that --headings extracts section headings correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "headings", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--headings", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify headings column exists
assert "headings" in test.columns, "headings column should exist"
# Verify column has list/array type
assert test["headings"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_headings = test[test["headings"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_headings) > 0:
sample = rows_with_headings.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for heading in wikicode.filter_headings():
level = heading.level
heading_text = str(heading.title).strip()
expected.append({"level": level, "text": heading_text})
actual = list(row["headings"])
# Convert to comparable format
actual_list = [{"level": item["level"], "text": item["text"]} for item in actual]
assert actual_list == expected, f"Row {idx}: headings mismatch"
print(f"Headings test passed! {len(test)} rows processed")

5
test/conftest.py Normal file
View File

@@ -0,0 +1,5 @@
import os
import sys
# Add the test directory to Python path so test utilities can be imported
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

466
test/test_resume.py Normal file
View File

@@ -0,0 +1,466 @@
import json
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pytest
from pandas.testing import assert_frame_equal
from wikiq.resume import (
cleanup_interrupted_resume,
get_checkpoint_path,
get_resume_point,
merge_parquet_files,
)
from wikiq_test_utils import (
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
WIKIQ,
WikiqTester,
)
def test_resume():
"""Test that --resume properly resumes processing from the last written revid."""
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
middle_idx = len(full_table) // 2
resume_revid = full_table.column("revid")[middle_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, middle_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_diff():
"""Test that --resume works correctly with diff computation."""
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces.
Interrupts wikiq partway through processing, then resumes and verifies
the result matches an uninterrupted run. Uses --flush-per-batch to ensure
data is written to disk after each batch, making interruption deterministic.
"""
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
for output_dir in [full_dir, partial_dir]:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
try:
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
full_df = full_dataset.to_table().to_pandas()
total_rows = len(full_df)
print(f"Full run produced {total_rows} rows")
batch_size = 10
cmd_partial = [
sys.executable, WIKIQ, input_file,
"-o", partial_output,
"--batch-size", str(batch_size),
"--partition-namespaces"
]
print(f"Starting: {' '.join(cmd_partial)}")
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
interrupt_delay = 5
time.sleep(interrupt_delay)
if proc.poll() is not None:
pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
print(f"Sending SIGUSR1 after {interrupt_delay}s")
proc.send_signal(signal.SIGUSR1)
try:
proc.wait(timeout=5)
print("Process exited gracefully after SIGUSR1")
except subprocess.TimeoutExpired:
print("Sending SIGTERM after SIGUSR1 timeout")
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=30)
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
interrupted_rows = interrupted_dataset.count_rows()
print(f"Interrupted run wrote {interrupted_rows} rows")
assert interrupted_rows < total_rows, \
f"Process wrote all {interrupted_rows} rows before being killed"
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
resumed_df = resumed_dataset.to_table().to_pandas()
full_revids = set(full_df['revid'])
resumed_revids = set(resumed_df['revid'])
missing_revids = full_revids - resumed_revids
extra_revids = resumed_revids - full_revids
assert missing_revids == set() and extra_revids == set(), \
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
assert len(resumed_df) == len(full_df), \
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
def test_resume_file_not_found():
"""Test that --resume starts fresh when output file doesn't exist."""
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
if os.path.exists(expected_output):
os.remove(expected_output)
# Should succeed by starting fresh
tester.call_wikiq("--resume")
# Verify output was created
assert os.path.exists(expected_output), "Output file should be created when starting fresh"
table = pq.read_table(expected_output)
assert table.num_rows > 0, "Output should have data"
print("Resume file not found test passed - started fresh!")
def test_resume_simple():
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
try:
tester_partial.call_wikiq("--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_table = pq.read_table(partial_output_path)
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_merge_with_invalid_temp_file():
"""Test that resume handles invalid/empty temp files gracefully.
This can happen when a namespace has no records after the resume point,
resulting in a temp file that was created but never written to.
"""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
pq.write_table(table, original_path)
with open(temp_path, 'w') as f:
f.write("")
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}"
assert os.path.exists(original_path), "Original file should still exist"
original_table = pq.read_table(original_path)
assert len(original_table) == 3, "Original file should be unchanged"
assert not os.path.exists(merged_path), "Merged file should not be created"
print("Resume merge with invalid temp file test passed!")
def test_resume_merge_with_corrupted_original():
"""Test that resume recovers from a corrupted original file if temp is valid.
This can happen if the original file was being written when the process
was killed, leaving it in a corrupted state.
"""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
with open(original_path, 'w') as f:
f.write("corrupted data")
table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]})
pq.write_table(table, temp_path)
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}"
assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case"
print("Resume merge with corrupted original test passed!")
def test_resume_merge_both_invalid():
"""Test that resume handles both files being invalid."""
with tempfile.TemporaryDirectory() as tmpdir:
original_path = os.path.join(tmpdir, "original.parquet")
temp_path = os.path.join(tmpdir, "temp.parquet")
merged_path = os.path.join(tmpdir, "merged.parquet")
with open(original_path, 'w') as f:
f.write("corrupted original")
with open(temp_path, 'w') as f:
f.write("corrupted temp")
result = merge_parquet_files(original_path, temp_path, merged_path)
assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}"
print("Resume merge with both invalid test passed!")
def test_cleanup_interrupted_resume_both_corrupted():
"""Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
with open(output_file, 'w') as f:
f.write("corrupted original")
with open(temp_file, 'w') as f:
f.write("corrupted temp")
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": 100, "revid": 200}, f)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result == "start_fresh", f"Expected 'start_fresh', got {result}"
assert not os.path.exists(output_file), "Corrupted original should be deleted"
assert not os.path.exists(temp_file), "Corrupted temp should be deleted"
assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted"
print("Cleanup interrupted resume with both corrupted test passed!")
def test_cleanup_interrupted_resume_original_corrupted_temp_valid():
"""Test that cleanup recovers from temp when original is corrupted."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
with open(output_file, 'w') as f:
f.write("corrupted original")
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
pq.write_table(table, temp_file)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result is None, f"Expected None (normal recovery), got {result}"
assert os.path.exists(output_file), "Output file should exist after recovery"
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
recovered_table = pq.read_table(output_file)
assert len(recovered_table) == 3, "Recovered file should have 3 rows"
resume_point = get_resume_point(output_file, partition_namespaces=False)
assert resume_point is not None, "Should find resume point from recovered file"
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
print("Cleanup with original corrupted, temp valid test passed!")
def test_cleanup_original_missing_temp_valid_no_checkpoint():
"""Test recovery when original is missing, temp is valid, and no checkpoint exists."""
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "output.parquet")
temp_file = output_file + ".resume_temp"
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
assert not os.path.exists(output_file)
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
pq.write_table(table, temp_file)
assert not os.path.exists(checkpoint_path)
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
assert result is None, f"Expected None (normal recovery), got {result}"
assert os.path.exists(output_file), "Output file should exist after recovery"
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
resume_point = get_resume_point(output_file, partition_namespaces=False)
assert resume_point is not None, "Should find resume point from recovered file"
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
print("Original missing, temp valid, no checkpoint test passed!")
def test_concurrent_jobs_different_input_files():
"""Test that merge only processes temp files for the current input file.
When multiple wikiq processes write to the same partitioned output directory
with different input files, each process should only merge its own temp files.
"""
from wikiq.resume import merge_partitioned_namespaces
with tempfile.TemporaryDirectory() as tmpdir:
# Create partitioned output structure
ns0_dir = os.path.join(tmpdir, "namespace=0")
ns1_dir = os.path.join(tmpdir, "namespace=1")
os.makedirs(ns0_dir)
os.makedirs(ns1_dir)
# Simulate two different input files producing output
file1 = "enwiki-20250123-pages-meta-history24-p1p100.parquet"
file2 = "enwiki-20250123-pages-meta-history24-p101p200.parquet"
# Create original and temp files for file1
table1_orig = pa.table({"articleid": [1, 2], "revid": [10, 20]})
table1_temp = pa.table({"articleid": [3, 4], "revid": [30, 40]})
pq.write_table(table1_orig, os.path.join(ns0_dir, file1))
pq.write_table(table1_temp, os.path.join(ns0_dir, file1 + ".resume_temp"))
pq.write_table(table1_orig, os.path.join(ns1_dir, file1))
pq.write_table(table1_temp, os.path.join(ns1_dir, file1 + ".resume_temp"))
# Create original and temp files for file2 (simulating another concurrent job)
table2_orig = pa.table({"articleid": [100, 200], "revid": [1000, 2000]})
table2_temp = pa.table({"articleid": [300, 400], "revid": [3000, 4000]})
pq.write_table(table2_orig, os.path.join(ns0_dir, file2))
pq.write_table(table2_temp, os.path.join(ns0_dir, file2 + ".resume_temp"))
pq.write_table(table2_orig, os.path.join(ns1_dir, file2))
pq.write_table(table2_temp, os.path.join(ns1_dir, file2 + ".resume_temp"))
# Merge only file1's temp files
merge_partitioned_namespaces(tmpdir, ".resume_temp", file1)
# Verify file1's temp files were merged and removed
assert not os.path.exists(os.path.join(ns0_dir, file1 + ".resume_temp")), \
"file1 temp should be merged in ns0"
assert not os.path.exists(os.path.join(ns1_dir, file1 + ".resume_temp")), \
"file1 temp should be merged in ns1"
# Verify file1's original now has merged data
merged1_ns0 = pq.read_table(os.path.join(ns0_dir, file1))
merged1_ns1 = pq.read_table(os.path.join(ns1_dir, file1))
assert merged1_ns0.num_rows == 4, f"file1 ns0 should have 4 rows after merge, got {merged1_ns0.num_rows}"
assert merged1_ns1.num_rows == 4, f"file1 ns1 should have 4 rows after merge, got {merged1_ns1.num_rows}"
# Verify file2's temp files are UNTOUCHED (still exist)
assert os.path.exists(os.path.join(ns0_dir, file2 + ".resume_temp")), \
"file2 temp should NOT be touched in ns0"
assert os.path.exists(os.path.join(ns1_dir, file2 + ".resume_temp")), \
"file2 temp should NOT be touched in ns1"
# Verify file2's original is unchanged
orig2_ns0 = pq.read_table(os.path.join(ns0_dir, file2))
orig2_ns1 = pq.read_table(os.path.join(ns1_dir, file2))
assert orig2_ns0.num_rows == 2, "file2 ns0 should still have 2 rows"
assert orig2_ns1.num_rows == 2, "file2 ns1 should still have 2 rows"
print("Concurrent jobs with different input files test passed!")

75
test/wikiq_test_utils.py Normal file
View File

@@ -0,0 +1,75 @@
import os
import shutil
import subprocess
from typing import Final, Union
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)