37 Commits

Author SHA1 Message Date
Nathan TeBlunthuis
c7eb374ceb use signalling to timeout mwparserfromhell instead of asyncio. 2026-01-07 12:42:37 -08:00
Nathan TeBlunthuis
4b8288c016 add some debug lines. 2026-01-06 19:58:18 -08:00
Nathan TeBlunthuis
8590e5f920 fix jsonl.d output. 2025-12-30 11:26:24 -08:00
Nathan TeBlunthuis
93f6ed0ff5 fix bug by truncating corrupted jsonl lines. 2025-12-23 19:52:37 -08:00
Nathan TeBlunthuis
5ebdb26d82 make resume with jsonl output fault tolerant. 2025-12-23 09:09:51 -08:00
Nathan TeBlunthuis
9e6b0fb64c make updating the checkpoint files atomic. 2025-12-23 08:41:38 -08:00
Nathan TeBlunthuis
d822085698 support .jsonl.d 2025-12-22 20:13:04 -08:00
Nathan TeBlunthuis
618c343898 allow output dir to be jsonl.d. 2025-12-21 23:50:00 -08:00
Nathan TeBlunthuis
3f1a9ba862 refactor and enable jsonl output. 2025-12-21 23:42:18 -08:00
Nathan TeBlunthuis
6988a281dc output parquet files in chunks to avoid memory issues with parquet. 2025-12-20 21:45:39 -08:00
Nathan TeBlunthuis
6a4bf81e1a add test for two wikiq jobs in the same directory. 2025-12-19 11:50:56 -08:00
Nathan TeBlunthuis
38dabd0547 only merge the correct partitioned files. 2025-12-19 11:47:18 -08:00
Nathan TeBlunthuis
006feb795c fix interruption handling by breaking the diff loop. 2025-12-18 18:00:30 -08:00
Nathan TeBlunthuis
d7f5abef2d resume starts fresh if the first run didn't happen 2025-12-13 15:41:44 -08:00
Nathan TeBlunthuis
2c54425726 use the wikidiff2 diff timeout instead of async. 2025-12-13 14:29:16 -08:00
Nathan TeBlunthuis
5d1a246898 don't try to remove files that don't exist. 2025-12-13 11:57:47 -08:00
Nathan TeBlunthuis
70a10db228 save work after a time limit. 2025-12-11 08:30:32 -08:00
Nathan TeBlunthuis
1001c780fa start fresh if output and resume are both broken. 2025-12-10 21:20:52 -08:00
Nathan TeBlunthuis
6b4f3939a5 more work on resuming. 2025-12-10 21:07:52 -08:00
Nathan TeBlunthuis
c3d31b4ab5 handle case when we have a valid resume file, but a corrupted original. 2025-12-10 20:33:04 -08:00
Nathan TeBlunthuis
f4a9491ff2 improve print debugging. 2025-12-10 19:50:47 -08:00
Nathan TeBlunthuis
c6e96c2f54 try/catch opening original file in resume. 2025-12-10 19:49:29 -08:00
Nathan TeBlunthuis
f427291fd8 add logic for resuming after a resume. 2025-12-10 19:26:54 -08:00
Nathan TeBlunthuis
d1fc094c96 don't put checkpoint files inside namespace directories. 2025-12-07 06:24:04 -08:00
Nathan TeBlunthuis
783f5fd8bc improve resume logic. 2025-12-07 06:06:26 -08:00
Nathan TeBlunthuis
577ddc87f5 Add per-namespace resume support for partitioned parquet output.
- Implement per-namespace resume points (dict mapping namespace -> (pageid, revid))
  to correctly handle interleaved dump ordering in partitioned output
- Extract resume functionality to dedicated resume.py module
- Add graceful shutdown handling via shutdown_requested flag (CLI-level only)
- Use lazy ParquetWriter creation to avoid empty files on early exit
- Refactor writing logic to _write_batch() helper method
- Simplify control flow by replacing continue statements with should_write flag
2025-12-06 06:56:19 -08:00
Nathan TeBlunthuis
d69d8b0df2 fix baseline output for new columns. 2025-12-02 19:22:08 -08:00
Nathan TeBlunthuis
5ce9808b50 add templates and headings to wikiq. 2025-12-02 17:51:08 -08:00
Nathan TeBlunthuis
d3517ed5ca extract wikilinks. 2025-12-02 14:09:29 -08:00
Nathan TeBlunthuis
329341efb6 improve tests. 2025-12-02 13:52:12 -08:00
Nathan TeBlunthuis
76626a2785 Start working on adding columns from mwparserfromhell. 2025-12-02 12:26:03 -08:00
Nathan TeBlunthuis
b46f98a875 make --resume work with partitioned namespaces. 2025-12-01 07:19:52 -08:00
Nathan TeBlunthuis
3c26185739 enable --resuming from interrupted jobs. 2025-11-30 20:36:31 -08:00
Nathan TeBlunthuis
95b33123e3 revert previous and decrease timeout. 2025-11-28 20:29:51 -08:00
Nathan TeBlunthuis
5c4fc6d5a0 let cache capacity be large. 2025-11-28 19:22:43 -08:00
Nathan TeBlunthuis
77c7d2ba97 Merge branch 'compute-diffs' of gitea:collective/mediawiki_dump_tools into compute-diffs 2025-11-24 11:03:24 -08:00
Nathan TeBlunthuis
c40930d7d2 use ssh for gitea. 2025-11-24 11:01:36 -08:00
11 changed files with 3107 additions and 521 deletions

View File

@@ -8,11 +8,13 @@ dependencies = [
"deltas>=0.7.0",
"mediawiki-utilities>=0.4.18",
"more-itertools>=10.7.0",
"mwparserfromhell>=0.6.0",
"mwpersistence>=0.2.4",
"mwreverts>=0.1.5",
"mwtypes>=0.4.0",
"mwxml>=0.3.6",
"pyarrow>=20.0.0",
"pyspark>=3.5.0",
"pywikidiff2",
"sortedcontainers>=2.4.0",
"yamlconf>=0.2.6",
@@ -20,20 +22,21 @@ dependencies = [
[project.scripts]
wikiq = "wikiq:main"
wikiq-spark = "wikiq_spark:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/wikiq"]
packages = ["src/wikiq", "src/wikiq_spark"]
[tool.uv.sources]
yamlconf = { git = "https://github.com/groceryheist/yamlconf" }
mwxml = { git = "https://github.com/groceryheist/python-mwxml" }
deltas = { git = "https://github.com/groceryheist/deltas" }
pywikidiff2 = { git = "https://gitea.communitydata.science/groceryheist/pywikidiff2@1cd3b13aea89a8abb0cee38dfd7cc6ab4d0980f9" }
pywikidiff2 = { git = "ssh://gitea@gitea.communitydata.science:2200/groceryheist/pywikidiff2.git"}
[dependency-groups]
dev = [

File diff suppressed because it is too large Load Diff

531
src/wikiq/resume.py Normal file
View File

@@ -0,0 +1,531 @@
"""
Checkpoint and resume functionality for wikiq output.
This module handles:
- Finding resume points in existing output (JSONL or Parquet)
- Merging resumed data with existing output (for Parquet, streaming, memory-efficient)
- Checkpoint file management for fast resume point lookup
"""
import json
import os
import sys
from collections import deque
import pyarrow.parquet as pq
def get_checkpoint_path(output_file, partition_namespaces=False):
"""Get the path to the checkpoint file for a given output file.
For partitioned output, the checkpoint is placed outside the partition directory
to avoid pyarrow trying to read it as a parquet file. The filename includes
the output filename to keep it unique per input file (for parallel jobs).
"""
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
parent_dir = os.path.dirname(partition_dir)
return os.path.join(parent_dir, output_filename + ".checkpoint")
return str(output_file) + ".checkpoint"
def read_checkpoint(checkpoint_path, partition_namespaces=False):
"""
Read resume point from checkpoint file if it exists.
Checkpoint format:
Single file: {"pageid": 54, "revid": 325} or {"pageid": 54, "revid": 325, "part": 2}
Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...}
Returns:
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None if not found.
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
"""
if not os.path.exists(checkpoint_path):
return None
try:
with open(checkpoint_path, 'r') as f:
data = json.load(f)
if not data:
return None
# Single-file format: {"pageid": ..., "revid": ..., "part": ...}
if "pageid" in data and "revid" in data:
part = data.get("part", 0)
if part > 0:
return (data["pageid"], data["revid"], part)
return (data["pageid"], data["revid"])
# Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...}
result = {}
for key, value in data.items():
part = value.get("part", 0)
result[int(key)] = (value["pageid"], value["revid"], part)
return result if result else None
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
return None
def cleanup_interrupted_resume(output_file, partition_namespaces):
"""
Merge any leftover .resume_temp files from a previous interrupted run.
This should be called BEFORE get_resume_point() so the resume point
is calculated from the merged data.
Returns:
None - no temp files found or normal merge completed
"start_fresh" - both original and temp were corrupted and deleted
"""
import shutil
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
temp_suffix = ".resume_temp"
if not os.path.isdir(partition_dir):
return
has_old_temp_files = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
if os.path.exists(temp_path):
has_old_temp_files = True
break
if has_old_temp_files:
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
has_valid_data = False
for ns_dir in os.listdir(partition_dir):
if ns_dir.startswith('namespace='):
ns_path = os.path.join(partition_dir, ns_dir)
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')]
if parquet_files:
has_valid_data = True
break
if had_corruption and not has_valid_data:
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
print("All partitioned files were corrupted, will start fresh.", file=sys.stderr)
return "start_fresh"
print("Previous temp files merged successfully.", file=sys.stderr)
else:
temp_output_file = output_file + ".resume_temp"
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr)
merged_path = output_file + ".merged"
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
if merged == "original_only":
os.remove(temp_output_file)
elif merged == "temp_only":
if os.path.exists(output_file):
os.remove(output_file)
os.rename(temp_output_file, output_file)
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
elif merged == "both_invalid":
if os.path.exists(output_file):
os.remove(output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
print("Both files were corrupted, will start fresh.", file=sys.stderr)
return "start_fresh"
elif merged == "merged":
os.remove(output_file)
os.rename(merged_path, output_file)
os.remove(temp_output_file)
print("Previous temp file merged successfully.", file=sys.stderr)
else:
os.remove(temp_output_file)
def get_jsonl_resume_point(output_file, input_file=None):
"""Get resume point from last complete line of JSONL file.
For .jsonl.d directories, derives the file path from input_file using get_output_filename.
"""
# Handle .jsonl.d directory output
if output_file.endswith('.jsonl.d'):
if input_file is None:
return None
if os.path.isdir(output_file):
# Import here to avoid circular import
from wikiq import get_output_filename
jsonl_filename = os.path.basename(get_output_filename(input_file, 'jsonl'))
output_file = os.path.join(output_file, jsonl_filename)
print(f"Looking for resume point in: {output_file}", file=sys.stderr)
else:
return None
if not os.path.exists(output_file):
return None
try:
# Track positions of last two valid lines for potential truncation
valid_lines = deque(maxlen=2) # (end_position, record)
with open(output_file, 'rb') as f:
while True:
line = f.readline()
if not line:
break
try:
record = json.loads(line.decode('utf-8'))
valid_lines.append((f.tell(), record))
except (json.JSONDecodeError, KeyError, UnicodeDecodeError):
pass
if not valid_lines:
return None
last_valid_pos, last_valid_record = valid_lines[-1]
# Truncate if file extends past last valid line (corrupted trailing data)
file_size = os.path.getsize(output_file)
if last_valid_pos < file_size:
print(f"Truncating corrupted data from {output_file} ({file_size - last_valid_pos} bytes)", file=sys.stderr)
with open(output_file, 'r+b') as f:
f.truncate(last_valid_pos)
return (last_valid_record['articleid'], last_valid_record['revid'])
except IOError as e:
print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr)
return None
def get_resume_point(output_file, partition_namespaces=False, input_file=None):
"""
Find the resume point(s) from existing output.
For JSONL: reads last line of file (no checkpoint needed).
For Parquet: checks checkpoint file, falls back to scanning parquet.
Args:
output_file: Path to the output file.
partition_namespaces: Whether the output uses namespace partitioning.
input_file: Path to input file (needed for .jsonl.d directory output).
Returns:
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None.
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
"""
# For JSONL, read resume point directly from last line (no checkpoint needed)
if output_file.endswith('.jsonl') or output_file.endswith('.jsonl.d'):
result = get_jsonl_resume_point(output_file, input_file)
if result:
print(f"Resume point found from JSONL: pageid={result[0]}, revid={result[1]}", file=sys.stderr)
return result
# For Parquet, use checkpoint file (fast)
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces)
if checkpoint_result is not None:
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
return checkpoint_result
# Fall back to scanning parquet (slow, for backwards compatibility)
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
try:
if partition_namespaces:
return _get_resume_point_partitioned(output_file)
else:
return _get_resume_point_single_file(output_file)
except Exception as e:
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
return None
def _get_last_row_resume_point(pq_path):
"""Get resume point by reading only the last row group of a parquet file."""
pf = pq.ParquetFile(pq_path)
if pf.metadata.num_row_groups == 0:
return None
last_rg_idx = pf.metadata.num_row_groups - 1
table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid'])
if table.num_rows == 0:
return None
max_pageid = table['articleid'][-1].as_py()
max_revid = table['revid'][-1].as_py()
return (max_pageid, max_revid, 0)
def _get_resume_point_partitioned(output_file):
"""Find per-namespace resume points from partitioned output."""
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
return None
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
if not namespace_dirs:
return None
resume_points = {}
for ns_dir in namespace_dirs:
ns = int(ns_dir.split('=')[1])
pq_path = os.path.join(partition_dir, ns_dir, output_filename)
if not os.path.exists(pq_path):
continue
try:
result = _get_last_row_resume_point(pq_path)
if result is not None:
resume_points[ns] = result
except Exception as e:
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
continue
return resume_points if resume_points else None
def _get_resume_point_single_file(output_file):
"""Find resume point from a single parquet file."""
if not os.path.exists(output_file):
return None
if os.path.isdir(output_file):
return None
return _get_last_row_resume_point(output_file)
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by streaming row groups.
Returns:
"merged" - merged file was created from both sources
"original_only" - temp was invalid, keep original unchanged
"temp_only" - original was corrupted but temp is valid
"both_invalid" - both files invalid
False - both files were valid but empty
"""
original_valid = False
temp_valid = False
original_pq = None
temp_pq = None
try:
original_pq = pq.ParquetFile(original_path)
original_valid = True
except Exception as e:
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
try:
if not os.path.exists(temp_path):
print(f"Note: Temp file {temp_path} does not exist", file=sys.stderr)
else:
temp_pq = pq.ParquetFile(temp_path)
temp_valid = True
except Exception:
print(f"Note: No new data in temp file {temp_path}", file=sys.stderr)
if not original_valid and not temp_valid:
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
return "both_invalid"
if not original_valid and temp_valid:
print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr)
return "temp_only"
if original_valid and not temp_valid:
return "original_only"
merged_writer = None
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
if merged_writer is not None:
merged_writer.close()
return "merged"
return False
def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
"""
Merge partitioned namespace directories after resume.
Returns:
True if at least one namespace has valid data after merge
False if all namespaces ended up with corrupted/deleted data
"""
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
had_corruption = False
expected_temp = file_filter + temp_suffix
for ns_dir in namespace_dirs:
ns_path = os.path.join(partition_dir, ns_dir)
temp_path = os.path.join(ns_path, expected_temp)
if not os.path.exists(temp_path):
continue
original_file = file_filter
original_path = os.path.join(ns_path, original_file)
if os.path.exists(original_path):
merged_path = original_path + ".merged"
merged = merge_parquet_files(original_path, temp_path, merged_path)
if merged == "original_only":
if os.path.exists(temp_path):
os.remove(temp_path)
elif merged == "temp_only":
os.remove(original_path)
os.rename(temp_path, original_path)
elif merged == "both_invalid":
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
elif merged == "merged":
os.remove(original_path)
os.rename(merged_path, original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
if os.path.exists(original_path):
os.remove(original_path)
if os.path.exists(temp_path):
os.remove(temp_path)
else:
try:
pq.ParquetFile(temp_path)
os.rename(temp_path, original_path)
except Exception:
if os.path.exists(temp_path):
os.remove(temp_path)
had_corruption = True
return had_corruption
def finalize_resume_merge(
original_output_file,
temp_output_file,
partition_namespaces,
original_partition_dir
):
"""
Finalize the resume by merging temp output with original output.
"""
import shutil
print("Merging resumed data with existing output...", file=sys.stderr)
try:
if partition_namespaces and original_partition_dir is not None:
file_filter = os.path.basename(original_output_file)
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
merged_output_file = original_output_file + ".merged"
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
if merged == "original_only":
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "temp_only":
os.remove(original_output_file)
os.rename(temp_output_file, original_output_file)
elif merged == "both_invalid":
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
elif merged == "merged":
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
else:
os.remove(original_output_file)
if os.path.exists(temp_output_file):
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr)
except Exception as e:
print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr)
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
raise
def setup_resume_temp_output(output_file, partition_namespaces):
"""
Set up temp output for resume mode (Parquet only).
Returns:
Tuple of (original_output_file, temp_output_file, original_partition_dir)
or (None, None, None) if no existing output to resume from.
"""
import shutil
original_output_file = None
temp_output_file = None
original_partition_dir = None
if partition_namespaces:
partition_dir = os.path.dirname(output_file)
output_filename = os.path.basename(output_file)
output_exists = False
if os.path.isdir(partition_dir):
for d in os.listdir(partition_dir):
if d.startswith('namespace='):
if os.path.exists(os.path.join(partition_dir, d, output_filename)):
output_exists = True
break
if output_exists:
original_partition_dir = partition_dir
else:
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
if output_exists:
original_output_file = output_file
temp_output_file = output_file + ".resume_temp"
if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
if partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True)
return original_output_file, temp_output_file, original_partition_dir

View File

@@ -2,7 +2,7 @@ import sys
from abc import abstractmethod, ABC
from datetime import datetime, timezone
from hashlib import sha1
from typing import Generic, TypeVar, Union
from typing import Generic, TypeVar, Union, TYPE_CHECKING
import mwreverts
import mwtypes
@@ -10,13 +10,13 @@ import mwxml
import pyarrow as pa
if TYPE_CHECKING:
from wikiq.wikitext_parser import WikitextParser
T = TypeVar('T')
class RevisionField(ABC, Generic[T]):
def __init__(self):
self.data: list[T] = []
"""
Abstract type which represents a field in a table of page revisions.
"""
@@ -40,14 +40,6 @@ class RevisionField(ABC, Generic[T]):
"""
pass
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> None:
self.data.append(self.extract(page, revisions))
def pop(self) -> list[T]:
data = self.data
self.data = []
return data
class RevisionTable:
columns: list[RevisionField]
@@ -55,19 +47,15 @@ class RevisionTable:
def __init__(self, columns: list[RevisionField]):
self.columns = columns
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]):
for column in self.columns:
column.add(page=page, revisions=revisions)
def schema(self) -> pa.Schema:
return pa.schema([c.field for c in self.columns])
def pop(self) -> dict:
data = dict()
for column in self.columns:
data[column.field.name] = column.pop()
return data
def extract_row(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> dict:
"""Extract a single row dict for the given page and revisions."""
return {
column.field.name: column.extract(page, revisions)
for column in self.columns
}
class RevisionId(RevisionField[int]):
@@ -217,3 +205,108 @@ class RevisionText(RevisionField[str]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
revision = revisions[-1]
return revision.text
class RevisionExternalLinks(RevisionField[Union[list[str], None]]):
"""Extract all external links from revision text."""
field = pa.field("external_links", pa.list_(pa.string()), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_external_links(revision.text)
class RevisionCitations(RevisionField[Union[list[str], None]]):
"""Extract citations from ref tags and cite templates."""
field = pa.field("citations", pa.list_(pa.string()), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_citations(revision.text)
class RevisionWikilinks(RevisionField[Union[list[dict], None]]):
"""Extract all internal wikilinks from revision text."""
# Struct type with title and optional display text
field = pa.field("wikilinks", pa.list_(pa.struct([
pa.field("title", pa.string()),
pa.field("text", pa.string(), nullable=True),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_wikilinks(revision.text)
class RevisionTemplates(RevisionField[Union[list[dict], None]]):
"""Extract all templates from revision text."""
# Struct type with name and params map
field = pa.field("templates", pa.list_(pa.struct([
pa.field("name", pa.string()),
pa.field("params", pa.map_(pa.string(), pa.string())),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_templates(revision.text)
class RevisionHeadings(RevisionField[Union[list[dict], None]]):
"""Extract all section headings from revision text."""
# Struct type with level and text
field = pa.field("headings", pa.list_(pa.struct([
pa.field("level", pa.int8()),
pa.field("text", pa.string()),
])), nullable=True)
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
revision = revisions[-1]
if revision.deleted.text:
return None
return self.wikitext_parser.extract_headings(revision.text)
class RevisionParserTimeout(RevisionField[bool]):
"""Track whether the wikitext parser timed out for this revision."""
field = pa.field("parser_timeout", pa.bool_())
def __init__(self, wikitext_parser: "WikitextParser"):
super().__init__()
self.wikitext_parser = wikitext_parser
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
return self.wikitext_parser.last_parse_timed_out

View File

@@ -0,0 +1,133 @@
"""Shared wikitext parser with caching to avoid duplicate parsing."""
from __future__ import annotations
import signal
import mwparserfromhell
PARSER_TIMEOUT = 60 # seconds
class WikitextParser:
"""Caches parsed wikicode to avoid re-parsing the same text."""
CITE_TEMPLATES = {
'cite web', 'cite book', 'cite journal', 'cite news',
'cite magazine', 'cite conference', 'cite encyclopedia',
'cite report', 'cite thesis', 'cite press release',
'citation', 'sfn', 'harvnb', 'harv'
}
def __init__(self):
self._cached_text: str | None = None
self._cached_wikicode = None
self.last_parse_timed_out: bool = False
def _timeout_handler(self, signum, frame):
raise TimeoutError("mwparserfromhell parse exceeded timeout")
def _get_wikicode(self, text: str):
"""Parse text and cache result. Returns cached result if text unchanged."""
if text == self._cached_text:
return self._cached_wikicode
old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
signal.alarm(PARSER_TIMEOUT)
try:
self._cached_wikicode = mwparserfromhell.parse(text)
self._cached_text = text
self.last_parse_timed_out = False
except TimeoutError:
self._cached_wikicode = None
self._cached_text = text
self.last_parse_timed_out = True
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return self._cached_wikicode
def extract_external_links(self, text: str | None) -> list[str] | None:
"""Extract all external link URLs from wikitext."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
return [str(link.url) for link in wikicode.filter_external_links()]
except Exception:
return None
def extract_citations(self, text: str | None) -> list[str] | None:
"""Extract citations from ref tags and cite templates."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
citations = []
# Extract ref tag contents
for tag in wikicode.filter_tags():
if tag.tag.lower() == 'ref':
content = str(tag.contents).strip()
if content:
citations.append(f"ref:{content}")
# Extract cite templates
for template in wikicode.filter_templates():
template_name = str(template.name).strip().lower()
if any(template_name.startswith(cite) for cite in self.CITE_TEMPLATES):
citations.append(f"template:{str(template)}")
return citations
except Exception:
return None
def extract_wikilinks(self, text: str | None) -> list[dict[str, str | None]] | None:
"""Extract all internal wikilinks with title and display text."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for link in wikicode.filter_wikilinks():
title = str(link.title).strip()
# text is None if no pipe, otherwise the display text
display_text = str(link.text).strip() if link.text else None
result.append({"title": title, "text": display_text})
return result
except Exception:
return None
def extract_templates(self, text: str | None) -> list[dict] | None:
"""Extract all templates with their names and parameters."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for template in wikicode.filter_templates():
name = str(template.name).strip()
params = {}
for param in template.params:
param_name = str(param.name).strip()
param_value = str(param.value).strip()
params[param_name] = param_value
result.append({"name": name, "params": params})
return result
except Exception:
return None
def extract_headings(self, text: str | None) -> list[dict] | None:
"""Extract all section headings with their levels."""
if text is None:
return None
try:
wikicode = self._get_wikicode(text)
result = []
for heading in wikicode.filter_headings():
level = heading.level
heading_text = str(heading.title).strip()
result.append({"level": level, "text": heading_text})
return result
except Exception:
return None

View File

@@ -1,110 +1,61 @@
import os
import shutil
import subprocess
import sys
import tracemalloc
from io import StringIO
from typing import Final, Union
import pytest
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.json as pj
import pytest
from pandas import DataFrame
from pandas.testing import assert_frame_equal, assert_series_equal
# Make references to files and wikiq relative to this file, not to the current working directory.
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
from wikiq import build_table, build_schema
from wikiq_test_utils import (
BASELINE_DIR,
IKWIKI,
REGEXTEST,
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
TWINPEAKS,
WIKIQ,
WikiqTester,
)
def setup():
tracemalloc.start()
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
if not os.path.exists(TEST_OUTPUT_DIR):
os.mkdir(TEST_OUTPUT_DIR)
# Always run setup, even if this is executed via "python -m unittest" rather
# than as __main__.
setup()
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
def read_jsonl_with_schema(filepath: str, **schema_kwargs) -> pd.DataFrame:
"""Read JSONL file using PyArrow with explicit schema from wikiq."""
table, _ = build_table(**schema_kwargs)
schema = build_schema(table, **schema_kwargs)
pa_table = pj.read_json(
filepath,
parse_options=pj.ParseOptions(explicit_schema=schema),
)
return pa_table.to_pandas()
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
# If case_name is unset, there are no relevant baseline or test files.
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
def test_WP_noargs():
tester = WikiqTester(IKWIKI, "noargs")
@@ -187,6 +138,61 @@ def test_noargs():
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_jsonl_noargs():
"""Test JSONL output format with baseline comparison."""
tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z", out_format="jsonl", baseline_format="jsonl")
try:
tester.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = read_jsonl_with_schema(tester.output)
baseline = read_jsonl_with_schema(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_jsonl_tsv_equivalence():
"""Test that JSONL and TSV outputs contain equivalent data."""
tester_tsv = WikiqTester(SAILORMOON, "equiv_tsv", in_compression="7z", out_format="tsv")
tester_jsonl = WikiqTester(SAILORMOON, "equiv_jsonl", in_compression="7z", out_format="jsonl")
try:
tester_tsv.call_wikiq()
tester_jsonl.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
tsv_df = pd.read_table(tester_tsv.output)
jsonl_df = read_jsonl_with_schema(tester_jsonl.output)
# Row counts must match
assert len(tsv_df) == len(jsonl_df), f"Row count mismatch: TSV={len(tsv_df)}, JSONL={len(jsonl_df)}"
# Column sets must match
assert set(tsv_df.columns) == set(jsonl_df.columns), \
f"Column mismatch: TSV={set(tsv_df.columns)}, JSONL={set(jsonl_df.columns)}"
# Sort both by revid for comparison
tsv_df = tsv_df.sort_values("revid").reset_index(drop=True)
jsonl_df = jsonl_df.sort_values("revid").reset_index(drop=True)
# Normalize null values: TSV uses nan, schema-based JSONL uses None
jsonl_df = jsonl_df.replace({None: np.nan})
# Compare columns - schema-based reading handles types correctly
for col in tsv_df.columns:
if col == "date_time":
# TSV reads as string, JSONL with schema reads as datetime
tsv_dates = pd.to_datetime(tsv_df[col]).dt.strftime("%Y-%m-%dT%H:%M:%SZ")
jsonl_dates = jsonl_df[col].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
assert_series_equal(tsv_dates, jsonl_dates, check_names=False)
else:
# Allow dtype differences (TSV infers int64, schema uses int32)
assert_series_equal(tsv_df[col], jsonl_df[col], check_names=False, check_dtype=False)
def test_collapse_user():
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z")
@@ -199,19 +205,6 @@ def test_collapse_user():
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_partition_namespaces():
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--collapse-user", "--fandom-2020", "--partition-namespaces")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(os.path.join(tester.output,"namespace=10/sailormoon.parquet"))
baseline = pd.read_parquet(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_wikidiff2():
tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z")
@@ -263,46 +256,43 @@ def test_pwr():
assert_frame_equal(test, baseline, check_like=True)
def test_diff():
tester = WikiqTester(SAILORMOON, "diff", in_compression="7z", out_format='parquet', baseline_format='parquet')
tester = WikiqTester(SAILORMOON, "diff", in_compression="7z", out_format='jsonl')
try:
tester.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
test = pd.read_json(tester.output, lines=True)
assert "diff" in test.columns, "diff column should exist"
assert "diff_timeout" in test.columns, "diff_timeout column should exist"
assert len(test) > 0, "Should have output rows"
def test_diff_plus_pwr():
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='parquet', baseline_format='parquet')
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='jsonl')
try:
tester.call_wikiq("--diff --persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
test = pd.read_json(tester.output, lines=True)
assert "diff" in test.columns, "diff column should exist"
assert "token_revs" in test.columns, "token_revs column should exist"
assert len(test) > 0, "Should have output rows"
def test_text():
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='parquet', baseline_format='parquet')
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='jsonl')
try:
tester.call_wikiq("--diff", "--text","--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
test = pd.read_json(tester.output, lines=True)
assert "text" in test.columns, "text column should exist"
assert "diff" in test.columns, "diff column should exist"
assert len(test) > 0, "Should have output rows"
def test_malformed_noargs():
tester = WikiqTester(wiki=TWINPEAKS, case_name="noargs", in_compression="7z")
@@ -401,41 +391,370 @@ def test_capturegroup_regex():
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_parquet():
tester = WikiqTester(IKWIKI, "noargs", out_format="parquet")
def test_external_links_only():
"""Test that --external-links extracts external links correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "external_links_only", in_compression="7z", out_format="jsonl")
try:
tester.call_wikiq()
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# as a test let's make sure that we get equal data frames
test: DataFrame = pd.read_parquet(tester.output)
# test = test.drop(['reverteds'], axis=1)
test = pd.read_json(tester.output, lines=True)
baseline: DataFrame = pd.read_table(tester.baseline_file)
# Verify external_links column exists
assert "external_links" in test.columns, "external_links column should exist"
# Pandas does not read timestamps as the desired datetime type.
baseline["date_time"] = pd.to_datetime(baseline["date_time"])
# Split strings to the arrays of reverted IDs so they can be compared.
baseline["revert"] = baseline["revert"].replace(np.nan, None)
baseline["reverteds"] = baseline["reverteds"].replace(np.nan, None)
# baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']]
baseline["sha1"] = baseline["sha1"].replace(np.nan, None)
baseline["editor"] = baseline["editor"].replace(np.nan, None)
baseline["anon"] = baseline["anon"].replace(np.nan, None)
# Verify citations column does NOT exist
assert "citations" not in test.columns, "citations column should NOT exist when only --external-links is used"
for index, row in baseline.iterrows():
if row["revert"] != test["revert"][index]:
print(row["revid"], ":", row["revert"], "!=", test["revert"][index])
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
# Verify that extracted URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify extraction matches mwparserfromhell for a sample of rows with text
rows_with_links = test[test["external_links"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_links) > 0:
# Test up to 5 rows
sample = rows_with_links.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"])
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
print(f"External links only test passed! {len(test)} rows, {len(all_urls)} total URLs extracted")
def test_citations_only():
"""Test that --citations extracts citations correctly."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "citations_only", in_compression="7z", out_format="jsonl")
for col in baseline.columns:
try:
assert_series_equal(
test[col], baseline[col], check_like=True, check_dtype=False
)
except ValueError as exc:
print(f"Error comparing column {col}")
pytest.fail(exc)
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify citations column exists
assert "citations" in test.columns, "citations column should exist"
# Verify external_links column does NOT exist
assert "external_links" not in test.columns, "external_links column should NOT exist when only --citations is used"
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify that extracted citations have correct prefixes (ref: or template:)
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
rows_with_citations = test[test["citations"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_citations) > 0:
parser = WikitextParser()
# Test up to 5 rows
sample = rows_with_citations.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"])
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"Citations only test passed! {len(test)} rows, {len(all_citations)} total citations extracted")
def test_external_links_and_citations():
"""Test that both --external-links and --citations work together (shared parser)."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "external_links_and_citations", in_compression="7z", out_format="jsonl")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify both columns exist
assert "external_links" in test.columns, "external_links column should exist"
assert "citations" in test.columns, "citations column should exist"
# Verify both columns have list/array types (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify citations have correct prefixes
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
# This tests that the shared parser optimization works correctly
parser = WikitextParser()
rows_with_content = test[
(test["external_links"].apply(lambda x: x is not None and len(x) > 0)) |
(test["citations"].apply(lambda x: x is not None and len(x) > 0))
]
if len(rows_with_content) > 0:
# Test up to 5 rows
sample = rows_with_content.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
# Verify external links
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"]) if row["external_links"] is not None else []
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
# Verify citations
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"]) if row["citations"] is not None else []
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"External links and citations test passed! {len(test)} rows, {len(all_urls)} URLs, {len(all_citations)} citations")
def test_no_wikitext_columns():
"""Test that neither external_links nor citations columns exist without flags."""
tester = WikiqTester(SAILORMOON, "no_wikitext_columns", in_compression="7z", out_format="jsonl")
try:
tester.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify neither column exists
assert "external_links" not in test.columns, "external_links column should NOT exist without --external-links flag"
assert "citations" not in test.columns, "citations column should NOT exist without --citations flag"
print(f"No wikitext columns test passed! {len(test)} rows processed")
def test_wikilinks():
"""Test that --wikilinks extracts internal wikilinks correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "wikilinks", in_compression="7z", out_format="jsonl")
try:
tester.call_wikiq("--wikilinks", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify wikilinks column exists
assert "wikilinks" in test.columns, "wikilinks column should exist"
# Verify column has list/array type
assert test["wikilinks"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_links = test[test["wikilinks"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_links) > 0:
sample = rows_with_links.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for link in wikicode.filter_wikilinks():
title = str(link.title).strip()
display_text = str(link.text).strip() if link.text else None
expected.append({"title": title, "text": display_text})
actual = list(row["wikilinks"])
# Convert to comparable format (pandas may read as dicts or named tuples)
actual_dicts = [{"title": item["title"], "text": item["text"]} for item in actual]
assert actual_dicts == expected, f"Row {idx}: wikilinks mismatch"
print(f"Wikilinks test passed! {len(test)} rows processed")
def test_templates():
"""Test that --templates extracts templates correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "templates", in_compression="7z", out_format="jsonl")
try:
tester.call_wikiq("--templates", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify templates column exists
assert "templates" in test.columns, "templates column should exist"
# Verify column has list/array type
assert test["templates"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_templates = test[test["templates"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_templates) > 0:
sample = rows_with_templates.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for template in wikicode.filter_templates():
name = str(template.name).strip()
params = {}
for param in template.params:
param_name = str(param.name).strip()
param_value = str(param.value).strip()
params[param_name] = param_value
expected.append({"name": name, "params": params})
actual = list(row["templates"])
# Convert to comparable format
actual_list = []
for item in actual:
actual_list.append({
"name": item["name"],
"params": dict(item["params"]) if item["params"] else {}
})
assert actual_list == expected, f"Row {idx}: templates mismatch"
print(f"Templates test passed! {len(test)} rows processed")
def test_headings():
"""Test that --headings extracts section headings correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "headings", in_compression="7z", out_format="jsonl")
try:
tester.call_wikiq("--headings", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_json(tester.output, lines=True)
# Verify headings column exists
assert "headings" in test.columns, "headings column should exist"
# Verify column has list/array type
assert test["headings"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
# Verify extraction matches mwparserfromhell for sample rows
rows_with_headings = test[test["headings"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_headings) > 0:
sample = rows_with_headings.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected = []
for heading in wikicode.filter_headings():
level = heading.level
heading_text = str(heading.title).strip()
expected.append({"level": level, "text": heading_text})
actual = list(row["headings"])
# Convert to comparable format
actual_list = [{"level": item["level"], "text": item["text"]} for item in actual]
assert actual_list == expected, f"Row {idx}: headings mismatch"
print(f"Headings test passed! {len(test)} rows processed")
def test_parquet_output():
"""Test that Parquet output format works correctly."""
tester = WikiqTester(SAILORMOON, "parquet_output", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Verify output file exists
assert os.path.exists(tester.output), f"Parquet output file should exist at {tester.output}"
# Read and verify content
test = pd.read_parquet(tester.output)
# Verify expected columns exist
assert "revid" in test.columns
assert "articleid" in test.columns
assert "title" in test.columns
assert "namespace" in test.columns
# Verify row count matches JSONL output
tester_jsonl = WikiqTester(SAILORMOON, "parquet_compare", in_compression="7z", out_format="jsonl")
try:
tester_jsonl.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test_jsonl = pd.read_json(tester_jsonl.output, lines=True)
assert len(test) == len(test_jsonl), f"Parquet and JSONL should have same row count: {len(test)} vs {len(test_jsonl)}"
print(f"Parquet output test passed! {len(test)} rows")
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)

5
test/conftest.py Normal file
View File

@@ -0,0 +1,5 @@
import os
import sys
# Add the test directory to Python path so test utilities can be imported
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

808
test/test_resume.py Normal file
View File

@@ -0,0 +1,808 @@
import json
import os
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import pytest
from wikiq.resume import (
get_checkpoint_path,
read_checkpoint,
)
from wikiq_test_utils import (
SAILORMOON,
TEST_DIR,
TEST_OUTPUT_DIR,
WIKIQ,
WikiqTester,
)
def read_jsonl(filepath):
"""Read JSONL file and return list of dicts."""
rows = []
with open(filepath, 'r') as f:
for line in f:
if line.strip():
rows.append(json.loads(line))
return rows
def test_resume():
"""Test that --resume properly resumes processing from the last checkpoint."""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
middle_idx = len(full_rows) // 2
resume_revid = full_rows[middle_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:middle_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[middle_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_with_diff():
"""Test that --resume correctly computes diff values after resume.
The diff computation depends on having the correct prev_text state.
This test verifies that diff values (text_chars, added_chars, etc.)
are identical between a full run and a resumed run.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
resume_idx = len(full_rows) // 3
resume_revid = full_rows[resume_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
# Verify diff columns are present
diff_columns = ["text_chars", "diff", "diff_timeout"]
for col in diff_columns:
assert col in df_full.columns, f"Diff column {col} should exist in full output"
assert col in df_resumed.columns, f"Diff column {col} should exist in resumed output"
assert_frame_equal(df_full, df_resumed)
def test_resume_file_not_found():
"""Test that --resume starts fresh when output file doesn't exist."""
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="jsonl")
expected_output = tester.output
if os.path.exists(expected_output):
os.remove(expected_output)
# Should succeed by starting fresh
tester.call_wikiq("--fandom-2020", "--resume")
# Verify output was created
assert os.path.exists(expected_output), "Output file should be created when starting fresh"
rows = read_jsonl(expected_output)
assert len(rows) > 0, "Output should have data"
print("Resume file not found test passed - started fresh!")
def test_resume_simple():
"""Test that --resume works without --fandom-2020."""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
resume_idx = len(full_rows) // 3
resume_revid = full_rows[resume_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_checkpoint_read():
"""Test that read_checkpoint correctly reads checkpoint files."""
with tempfile.TemporaryDirectory() as tmpdir:
checkpoint_path = os.path.join(tmpdir, "test.jsonl.checkpoint")
# Test reading valid checkpoint
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": 100, "revid": 200}, f)
result = read_checkpoint(checkpoint_path)
assert result == (100, 200), f"Expected (100, 200), got {result}"
# Test reading non-existent checkpoint
result = read_checkpoint(os.path.join(tmpdir, "nonexistent.checkpoint"))
assert result is None, f"Expected None for non-existent file, got {result}"
# Test reading empty checkpoint
empty_path = os.path.join(tmpdir, "empty.checkpoint")
with open(empty_path, 'w') as f:
f.write("{}")
result = read_checkpoint(empty_path)
assert result is None, f"Expected None for empty checkpoint, got {result}"
# Test reading corrupted checkpoint
corrupt_path = os.path.join(tmpdir, "corrupt.checkpoint")
with open(corrupt_path, 'w') as f:
f.write("not valid json")
result = read_checkpoint(corrupt_path)
assert result is None, f"Expected None for corrupted checkpoint, got {result}"
print("Checkpoint read test passed!")
def test_resume_with_interruption():
"""Test that resume works correctly after interruption."""
import pandas as pd
from pandas.testing import assert_frame_equal
output_dir = os.path.join(TEST_OUTPUT_DIR, "resume_interrupt")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
output_file = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
# First, run to completion to know expected output
cmd_full = f"{WIKIQ} {input_file} -o {output_file} --fandom-2020"
try:
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_rows = read_jsonl(output_file)
# Clean up for interrupted run
if os.path.exists(output_file):
os.remove(output_file)
checkpoint_path = get_checkpoint_path(output_file)
if os.path.exists(checkpoint_path):
os.remove(checkpoint_path)
# Start wikiq and interrupt it
cmd_partial = [
sys.executable, WIKIQ, input_file,
"-o", output_file,
"--batch-size", "10",
"--fandom-2020"
]
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
interrupt_delay = 3
time.sleep(interrupt_delay)
if proc.poll() is not None:
# Process completed before we could interrupt
interrupted_rows = read_jsonl(output_file)
df_full = pd.DataFrame(full_rows)
df_interrupted = pd.DataFrame(interrupted_rows)
assert_frame_equal(df_full, df_interrupted)
return
proc.send_signal(signal.SIGUSR1)
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=30)
interrupted_rows = read_jsonl(output_file)
if len(interrupted_rows) >= len(full_rows):
# Process completed before interrupt
df_full = pd.DataFrame(full_rows)
df_interrupted = pd.DataFrame(interrupted_rows)
assert_frame_equal(df_full, df_interrupted)
return
# Now resume
cmd_resume = f"{WIKIQ} {input_file} -o {output_file} --batch-size 10 --fandom-2020 --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(output_file)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_parquet():
"""Test that --resume works correctly with Parquet output format."""
import pandas as pd
from pandas.testing import assert_frame_equal
import pyarrow.parquet as pq
tester_full = WikiqTester(SAILORMOON, "resume_parquet_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_table = pq.read_table(full_output_path)
# Use unsorted indices consistently - slice the table and get checkpoint from same position
resume_idx = len(full_table) // 3
resume_revid = int(full_table.column("revid")[resume_idx].as_py())
resume_pageid = int(full_table.column("articleid")[resume_idx].as_py())
tester_partial = WikiqTester(SAILORMOON, "resume_parquet_partial", in_compression="7z", out_format="parquet")
partial_output_path = tester_partial.output
# Write partial Parquet file using the SAME schema as the full file
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
df_full = full_table.to_pandas()
df_resumed = pd.read_parquet(partial_output_path)
assert_frame_equal(df_full, df_resumed)
def test_resume_tsv_error():
"""Test that --resume with TSV output produces a proper error message."""
tester = WikiqTester(SAILORMOON, "resume_tsv_error", in_compression="7z", out_format="tsv")
try:
tester.call_wikiq("--fandom-2020", "--resume")
pytest.fail("Expected error for --resume with TSV output")
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode("utf8")
assert "Error: --resume only works with JSONL or Parquet" in stderr, \
f"Expected proper error message, got: {stderr}"
print("TSV resume error test passed!")
def test_resume_data_equivalence():
"""Test that resumed output produces exactly equivalent data to a full run.
The revert detector state is maintained during the skip phase, so
revert detection should be identical to a full run.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_equiv_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
resume_idx = len(full_rows) // 3
resume_revid = full_rows[resume_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_equiv_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_with_persistence():
"""Test that --resume correctly handles persistence state after resume.
Persistence (PWR) depends on maintaining token state across revisions.
This test verifies that persistence values (token_revs) are identical
between a full run and a resumed run.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_persist_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
resume_idx = len(full_rows) // 4
resume_revid = full_rows[resume_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_persist_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--persistence wikidiff2", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
# Check persistence columns are present
assert "token_revs" in df_full.columns, "token_revs should exist in full output"
assert "token_revs" in df_resumed.columns, "token_revs should exist in resumed output"
assert_frame_equal(df_full, df_resumed)
def test_resume_corrupted_jsonl_last_line():
"""Test that JSONL resume correctly handles corrupted/incomplete last line.
When the previous run was interrupted mid-write leaving an incomplete JSON
line, the resume should:
1. Find the resume point from the last valid line (no checkpoint file needed)
2. Truncate the corrupted trailing data
3. Append new data, resulting in valid JSONL
"""
tester_full = WikiqTester(SAILORMOON, "resume_corrupt_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_rows = read_jsonl(tester_full.output)
# Create a partial file with a corrupted last line
tester_corrupt = WikiqTester(SAILORMOON, "resume_corrupt_test", in_compression="7z", out_format="jsonl")
corrupt_output_path = tester_corrupt.output
resume_idx = len(full_rows) // 2
with open(corrupt_output_path, 'w') as f:
for row in full_rows[:resume_idx]:
f.write(json.dumps(row) + "\n")
# Write incomplete JSON (simulates crash mid-write)
f.write('{"revid": 999, "articleid": 123, "incomplet')
# Record file size before resume
size_before = os.path.getsize(corrupt_output_path)
# NO checkpoint file - JSONL resume works from last valid line in the file
checkpoint_path = get_checkpoint_path(corrupt_output_path)
assert not os.path.exists(checkpoint_path), "Test setup error: checkpoint should not exist"
# Resume should detect corrupted line, truncate it, then append new data
try:
tester_corrupt.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(f"Resume failed unexpectedly: {exc.stderr.decode('utf8')}")
# Verify the file is valid JSONL and readable (no corrupted lines)
resumed_rows = read_jsonl(corrupt_output_path)
# Full data equivalence check
import pandas as pd
from pandas.testing import assert_frame_equal
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_diff_persistence_combined():
"""Test that --resume correctly handles both diff and persistence state together.
This tests that multiple stateful features work correctly when combined.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_combined_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_output_path = tester_full.output
full_rows = read_jsonl(full_output_path)
resume_idx = len(full_rows) // 3
resume_revid = full_rows[resume_idx]["revid"]
tester_partial = WikiqTester(SAILORMOON, "resume_combined_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
# Verify both diff and persistence columns exist
assert "diff" in df_full.columns
assert "token_revs" in df_full.columns
assert_frame_equal(df_full, df_resumed)
def test_resume_mid_page():
"""Test resume from the middle of a page with many revisions.
This specifically tests that state restoration works when resuming
partway through a page's revision history.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_midpage_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_rows = read_jsonl(tester_full.output)
df_full = pd.DataFrame(full_rows)
# Find a page with many revisions
page_counts = df_full.groupby("articleid").size()
large_page_id = page_counts[page_counts >= 10].index[0] if any(page_counts >= 10) else page_counts.idxmax()
page_revs = df_full[df_full["articleid"] == large_page_id].sort_values("revid")
# Resume from middle of this page
mid_idx = len(page_revs) // 2
resume_rev = page_revs.iloc[mid_idx]
resume_revid = int(resume_rev["revid"])
resume_pageid = int(resume_rev["articleid"])
# Find global index for checkpoint
global_idx = df_full[df_full["revid"] == resume_revid].index[0]
tester_partial = WikiqTester(SAILORMOON, "resume_midpage_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
# Write all rows up to and including the resume point
rows_to_write = [full_rows[i] for i in range(global_idx + 1)]
with open(partial_output_path, 'w') as f:
for row in rows_to_write:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_page_boundary():
"""Test resume at the exact start of a new page.
This tests for off-by-one errors at page boundaries.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
tester_full = WikiqTester(SAILORMOON, "resume_boundary_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_rows = read_jsonl(tester_full.output)
df_full = pd.DataFrame(full_rows)
# Find a page boundary - last revision of one page
page_last_revs = df_full.groupby("articleid")["revid"].max()
# Pick a page that's not the very last one
for page_id in page_last_revs.index[:-1]:
last_rev_of_page = page_last_revs[page_id]
row_idx = df_full[df_full["revid"] == last_rev_of_page].index[0]
if row_idx < len(df_full) - 1:
break
resume_revid = int(last_rev_of_page)
resume_pageid = int(page_id)
tester_partial = WikiqTester(SAILORMOON, "resume_boundary_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
rows_to_write = [full_rows[i] for i in range(row_idx + 1)]
with open(partial_output_path, 'w') as f:
for row in rows_to_write:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_jsonl_dir_output():
"""Test that .jsonl.d output creates files named after input files.
When output is a .jsonl.d directory, each input file should write to
a separate JSONL file named after the input (e.g., sailormoon.jsonl),
not a generic data.jsonl.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_test.jsonl.d")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
# Run wikiq with .jsonl.d output
cmd = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
try:
subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Verify output file is named after input, not "data.jsonl"
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
wrong_output = os.path.join(output_dir, "data.jsonl")
assert os.path.exists(expected_output), \
f"Expected {expected_output} to exist, but it doesn't. Directory contents: {os.listdir(output_dir)}"
assert not os.path.exists(wrong_output), \
f"Expected {wrong_output} NOT to exist (should be named after input file)"
# Verify output has data
rows = read_jsonl(expected_output)
assert len(rows) > 0, "Output file should have data"
def test_jsonl_dir_resume():
"""Test that resume works correctly with .jsonl.d directory output.
The resume logic must derive the same filename from the input file
as the write logic does.
"""
import pandas as pd
from pandas.testing import assert_frame_equal
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_resume.jsonl.d")
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
# First run: complete
cmd_full = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
try:
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
full_rows = read_jsonl(expected_output)
# Truncate to partial
partial_idx = len(full_rows) // 2
with open(expected_output, 'w') as f:
for row in full_rows[:partial_idx]:
f.write(json.dumps(row) + "\n")
# Resume
cmd_resume = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10 --resume"
try:
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(expected_output)
df_full = pd.DataFrame(full_rows)
df_resumed = pd.DataFrame(resumed_rows)
assert_frame_equal(df_full, df_resumed)
def test_resume_revert_detection():
"""Test that revert detection works correctly after resume.
Verifies that the revert detector state is properly maintained during
the skip phase so that reverts are correctly detected after resume.
"""
import pandas as pd
from pandas.testing import assert_series_equal
tester_full = WikiqTester(SAILORMOON, "resume_revert_full", in_compression="7z", out_format="jsonl")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
full_rows = read_jsonl(tester_full.output)
df_full = pd.DataFrame(full_rows)
# Find rows with reverts
revert_rows = df_full[df_full["revert"] == True]
if len(revert_rows) == 0:
pytest.skip("No reverts found in test data")
# Resume from before a known revert so we can verify it's detected
first_revert_idx = revert_rows.index[0]
if first_revert_idx < 2:
pytest.skip("First revert too early in dataset")
resume_idx = first_revert_idx - 1
resume_revid = full_rows[resume_idx]["revid"]
resume_pageid = full_rows[resume_idx]["articleid"]
tester_partial = WikiqTester(SAILORMOON, "resume_revert_partial", in_compression="7z", out_format="jsonl")
partial_output_path = tester_partial.output
with open(partial_output_path, 'w') as f:
for row in full_rows[:resume_idx + 1]:
f.write(json.dumps(row) + "\n")
checkpoint_path = get_checkpoint_path(partial_output_path)
with open(checkpoint_path, 'w') as f:
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
resumed_rows = read_jsonl(partial_output_path)
df_resumed = pd.DataFrame(resumed_rows)
# Verify revert column matches exactly
assert_series_equal(df_full["revert"], df_resumed["revert"])
assert_series_equal(df_full["reverteds"], df_resumed["reverteds"])

87
test/wikiq_test_utils.py Normal file
View File

@@ -0,0 +1,87 @@
import os
import shutil
import subprocess
from typing import Final, Union
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
# Also clean up resume-related files
for temp_suffix in [".resume_temp", ".checkpoint", ".merged"]:
temp_path = self.output + temp_suffix
if os.path.exists(temp_path):
if os.path.isfile(temp_path):
os.remove(temp_path)
else:
shutil.rmtree(temp_path)
# For JSONL and Parquet, self.output is a file path. Create parent directory if needed.
if out_format in ("jsonl", "parquet"):
parent_dir = os.path.dirname(self.output)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)