Compare commits
37 Commits
compute-di
...
jsonl-outp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7eb374ceb | ||
|
|
4b8288c016 | ||
|
|
8590e5f920 | ||
|
|
93f6ed0ff5 | ||
|
|
5ebdb26d82 | ||
|
|
9e6b0fb64c | ||
|
|
d822085698 | ||
|
|
618c343898 | ||
|
|
3f1a9ba862 | ||
|
|
6988a281dc | ||
|
|
6a4bf81e1a | ||
|
|
38dabd0547 | ||
|
|
006feb795c | ||
|
|
d7f5abef2d | ||
|
|
2c54425726 | ||
|
|
5d1a246898 | ||
|
|
70a10db228 | ||
|
|
1001c780fa | ||
|
|
6b4f3939a5 | ||
|
|
c3d31b4ab5 | ||
|
|
f4a9491ff2 | ||
|
|
c6e96c2f54 | ||
|
|
f427291fd8 | ||
|
|
d1fc094c96 | ||
|
|
783f5fd8bc | ||
|
|
577ddc87f5 | ||
|
|
d69d8b0df2 | ||
|
|
5ce9808b50 | ||
|
|
d3517ed5ca | ||
|
|
329341efb6 | ||
|
|
76626a2785 | ||
|
|
b46f98a875 | ||
|
|
3c26185739 | ||
|
|
95b33123e3 | ||
|
|
5c4fc6d5a0 | ||
|
|
77c7d2ba97 | ||
|
|
c40930d7d2 |
@@ -8,11 +8,13 @@ dependencies = [
|
||||
"deltas>=0.7.0",
|
||||
"mediawiki-utilities>=0.4.18",
|
||||
"more-itertools>=10.7.0",
|
||||
"mwparserfromhell>=0.6.0",
|
||||
"mwpersistence>=0.2.4",
|
||||
"mwreverts>=0.1.5",
|
||||
"mwtypes>=0.4.0",
|
||||
"mwxml>=0.3.6",
|
||||
"pyarrow>=20.0.0",
|
||||
"pyspark>=3.5.0",
|
||||
"pywikidiff2",
|
||||
"sortedcontainers>=2.4.0",
|
||||
"yamlconf>=0.2.6",
|
||||
@@ -20,20 +22,21 @@ dependencies = [
|
||||
|
||||
[project.scripts]
|
||||
wikiq = "wikiq:main"
|
||||
wikiq-spark = "wikiq_spark:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/wikiq"]
|
||||
packages = ["src/wikiq", "src/wikiq_spark"]
|
||||
|
||||
[tool.uv.sources]
|
||||
|
||||
yamlconf = { git = "https://github.com/groceryheist/yamlconf" }
|
||||
mwxml = { git = "https://github.com/groceryheist/python-mwxml" }
|
||||
deltas = { git = "https://github.com/groceryheist/deltas" }
|
||||
pywikidiff2 = { git = "https://gitea.communitydata.science/groceryheist/pywikidiff2@1cd3b13aea89a8abb0cee38dfd7cc6ab4d0980f9" }
|
||||
pywikidiff2 = { git = "ssh://gitea@gitea.communitydata.science:2200/groceryheist/pywikidiff2.git"}
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
531
src/wikiq/resume.py
Normal file
531
src/wikiq/resume.py
Normal file
@@ -0,0 +1,531 @@
|
||||
"""
|
||||
Checkpoint and resume functionality for wikiq output.
|
||||
|
||||
This module handles:
|
||||
- Finding resume points in existing output (JSONL or Parquet)
|
||||
- Merging resumed data with existing output (for Parquet, streaming, memory-efficient)
|
||||
- Checkpoint file management for fast resume point lookup
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import deque
|
||||
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
|
||||
def get_checkpoint_path(output_file, partition_namespaces=False):
|
||||
"""Get the path to the checkpoint file for a given output file.
|
||||
|
||||
For partitioned output, the checkpoint is placed outside the partition directory
|
||||
to avoid pyarrow trying to read it as a parquet file. The filename includes
|
||||
the output filename to keep it unique per input file (for parallel jobs).
|
||||
"""
|
||||
if partition_namespaces:
|
||||
partition_dir = os.path.dirname(output_file)
|
||||
output_filename = os.path.basename(output_file)
|
||||
parent_dir = os.path.dirname(partition_dir)
|
||||
return os.path.join(parent_dir, output_filename + ".checkpoint")
|
||||
return str(output_file) + ".checkpoint"
|
||||
|
||||
|
||||
def read_checkpoint(checkpoint_path, partition_namespaces=False):
|
||||
"""
|
||||
Read resume point from checkpoint file if it exists.
|
||||
|
||||
Checkpoint format:
|
||||
Single file: {"pageid": 54, "revid": 325} or {"pageid": 54, "revid": 325, "part": 2}
|
||||
Partitioned: {"0": {"pageid": 54, "revid": 325, "part": 1}, ...}
|
||||
|
||||
Returns:
|
||||
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None if not found.
|
||||
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
|
||||
"""
|
||||
if not os.path.exists(checkpoint_path):
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(checkpoint_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if not data:
|
||||
return None
|
||||
|
||||
# Single-file format: {"pageid": ..., "revid": ..., "part": ...}
|
||||
if "pageid" in data and "revid" in data:
|
||||
part = data.get("part", 0)
|
||||
if part > 0:
|
||||
return (data["pageid"], data["revid"], part)
|
||||
return (data["pageid"], data["revid"])
|
||||
|
||||
# Partitioned format: {"0": {"pageid": ..., "revid": ..., "part": ...}, ...}
|
||||
result = {}
|
||||
for key, value in data.items():
|
||||
part = value.get("part", 0)
|
||||
result[int(key)] = (value["pageid"], value["revid"], part)
|
||||
|
||||
return result if result else None
|
||||
|
||||
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
|
||||
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def cleanup_interrupted_resume(output_file, partition_namespaces):
|
||||
"""
|
||||
Merge any leftover .resume_temp files from a previous interrupted run.
|
||||
|
||||
This should be called BEFORE get_resume_point() so the resume point
|
||||
is calculated from the merged data.
|
||||
|
||||
Returns:
|
||||
None - no temp files found or normal merge completed
|
||||
"start_fresh" - both original and temp were corrupted and deleted
|
||||
"""
|
||||
import shutil
|
||||
|
||||
if partition_namespaces:
|
||||
partition_dir = os.path.dirname(output_file)
|
||||
output_filename = os.path.basename(output_file)
|
||||
temp_suffix = ".resume_temp"
|
||||
|
||||
if not os.path.isdir(partition_dir):
|
||||
return
|
||||
|
||||
has_old_temp_files = False
|
||||
for ns_dir in os.listdir(partition_dir):
|
||||
if ns_dir.startswith('namespace='):
|
||||
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
|
||||
if os.path.exists(temp_path):
|
||||
has_old_temp_files = True
|
||||
break
|
||||
|
||||
if has_old_temp_files:
|
||||
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
|
||||
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
|
||||
|
||||
has_valid_data = False
|
||||
for ns_dir in os.listdir(partition_dir):
|
||||
if ns_dir.startswith('namespace='):
|
||||
ns_path = os.path.join(partition_dir, ns_dir)
|
||||
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')]
|
||||
if parquet_files:
|
||||
has_valid_data = True
|
||||
break
|
||||
|
||||
if had_corruption and not has_valid_data:
|
||||
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||
if os.path.exists(checkpoint_path):
|
||||
os.remove(checkpoint_path)
|
||||
print("All partitioned files were corrupted, will start fresh.", file=sys.stderr)
|
||||
return "start_fresh"
|
||||
|
||||
print("Previous temp files merged successfully.", file=sys.stderr)
|
||||
else:
|
||||
temp_output_file = output_file + ".resume_temp"
|
||||
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
|
||||
print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr)
|
||||
merged_path = output_file + ".merged"
|
||||
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
|
||||
if merged == "original_only":
|
||||
os.remove(temp_output_file)
|
||||
elif merged == "temp_only":
|
||||
if os.path.exists(output_file):
|
||||
os.remove(output_file)
|
||||
os.rename(temp_output_file, output_file)
|
||||
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
|
||||
elif merged == "both_invalid":
|
||||
if os.path.exists(output_file):
|
||||
os.remove(output_file)
|
||||
if os.path.exists(temp_output_file):
|
||||
os.remove(temp_output_file)
|
||||
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||
if os.path.exists(checkpoint_path):
|
||||
os.remove(checkpoint_path)
|
||||
print("Both files were corrupted, will start fresh.", file=sys.stderr)
|
||||
return "start_fresh"
|
||||
elif merged == "merged":
|
||||
os.remove(output_file)
|
||||
os.rename(merged_path, output_file)
|
||||
os.remove(temp_output_file)
|
||||
print("Previous temp file merged successfully.", file=sys.stderr)
|
||||
else:
|
||||
os.remove(temp_output_file)
|
||||
|
||||
|
||||
def get_jsonl_resume_point(output_file, input_file=None):
|
||||
"""Get resume point from last complete line of JSONL file.
|
||||
|
||||
For .jsonl.d directories, derives the file path from input_file using get_output_filename.
|
||||
"""
|
||||
# Handle .jsonl.d directory output
|
||||
if output_file.endswith('.jsonl.d'):
|
||||
if input_file is None:
|
||||
return None
|
||||
if os.path.isdir(output_file):
|
||||
# Import here to avoid circular import
|
||||
from wikiq import get_output_filename
|
||||
jsonl_filename = os.path.basename(get_output_filename(input_file, 'jsonl'))
|
||||
output_file = os.path.join(output_file, jsonl_filename)
|
||||
print(f"Looking for resume point in: {output_file}", file=sys.stderr)
|
||||
else:
|
||||
return None
|
||||
|
||||
if not os.path.exists(output_file):
|
||||
return None
|
||||
|
||||
try:
|
||||
# Track positions of last two valid lines for potential truncation
|
||||
valid_lines = deque(maxlen=2) # (end_position, record)
|
||||
with open(output_file, 'rb') as f:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
break
|
||||
try:
|
||||
record = json.loads(line.decode('utf-8'))
|
||||
valid_lines.append((f.tell(), record))
|
||||
except (json.JSONDecodeError, KeyError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
if not valid_lines:
|
||||
return None
|
||||
|
||||
last_valid_pos, last_valid_record = valid_lines[-1]
|
||||
|
||||
# Truncate if file extends past last valid line (corrupted trailing data)
|
||||
file_size = os.path.getsize(output_file)
|
||||
if last_valid_pos < file_size:
|
||||
print(f"Truncating corrupted data from {output_file} ({file_size - last_valid_pos} bytes)", file=sys.stderr)
|
||||
with open(output_file, 'r+b') as f:
|
||||
f.truncate(last_valid_pos)
|
||||
|
||||
return (last_valid_record['articleid'], last_valid_record['revid'])
|
||||
except IOError as e:
|
||||
print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def get_resume_point(output_file, partition_namespaces=False, input_file=None):
|
||||
"""
|
||||
Find the resume point(s) from existing output.
|
||||
|
||||
For JSONL: reads last line of file (no checkpoint needed).
|
||||
For Parquet: checks checkpoint file, falls back to scanning parquet.
|
||||
|
||||
Args:
|
||||
output_file: Path to the output file.
|
||||
partition_namespaces: Whether the output uses namespace partitioning.
|
||||
input_file: Path to input file (needed for .jsonl.d directory output).
|
||||
|
||||
Returns:
|
||||
For single files: A tuple (pageid, revid) or (pageid, revid, part), or None.
|
||||
For partitioned: A dict mapping namespace -> (pageid, revid, part), or None.
|
||||
"""
|
||||
# For JSONL, read resume point directly from last line (no checkpoint needed)
|
||||
if output_file.endswith('.jsonl') or output_file.endswith('.jsonl.d'):
|
||||
result = get_jsonl_resume_point(output_file, input_file)
|
||||
if result:
|
||||
print(f"Resume point found from JSONL: pageid={result[0]}, revid={result[1]}", file=sys.stderr)
|
||||
return result
|
||||
|
||||
# For Parquet, use checkpoint file (fast)
|
||||
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||
checkpoint_result = read_checkpoint(checkpoint_path, partition_namespaces)
|
||||
if checkpoint_result is not None:
|
||||
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
|
||||
return checkpoint_result
|
||||
|
||||
# Fall back to scanning parquet (slow, for backwards compatibility)
|
||||
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
|
||||
try:
|
||||
if partition_namespaces:
|
||||
return _get_resume_point_partitioned(output_file)
|
||||
else:
|
||||
return _get_resume_point_single_file(output_file)
|
||||
except Exception as e:
|
||||
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def _get_last_row_resume_point(pq_path):
|
||||
"""Get resume point by reading only the last row group of a parquet file."""
|
||||
pf = pq.ParquetFile(pq_path)
|
||||
if pf.metadata.num_row_groups == 0:
|
||||
return None
|
||||
|
||||
last_rg_idx = pf.metadata.num_row_groups - 1
|
||||
table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid'])
|
||||
if table.num_rows == 0:
|
||||
return None
|
||||
|
||||
max_pageid = table['articleid'][-1].as_py()
|
||||
max_revid = table['revid'][-1].as_py()
|
||||
return (max_pageid, max_revid, 0)
|
||||
|
||||
|
||||
def _get_resume_point_partitioned(output_file):
|
||||
"""Find per-namespace resume points from partitioned output."""
|
||||
partition_dir = os.path.dirname(output_file)
|
||||
output_filename = os.path.basename(output_file)
|
||||
|
||||
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
|
||||
return None
|
||||
|
||||
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
||||
if not namespace_dirs:
|
||||
return None
|
||||
|
||||
resume_points = {}
|
||||
for ns_dir in namespace_dirs:
|
||||
ns = int(ns_dir.split('=')[1])
|
||||
pq_path = os.path.join(partition_dir, ns_dir, output_filename)
|
||||
|
||||
if not os.path.exists(pq_path):
|
||||
continue
|
||||
|
||||
try:
|
||||
result = _get_last_row_resume_point(pq_path)
|
||||
if result is not None:
|
||||
resume_points[ns] = result
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
return resume_points if resume_points else None
|
||||
|
||||
|
||||
def _get_resume_point_single_file(output_file):
|
||||
"""Find resume point from a single parquet file."""
|
||||
if not os.path.exists(output_file):
|
||||
return None
|
||||
|
||||
if os.path.isdir(output_file):
|
||||
return None
|
||||
|
||||
return _get_last_row_resume_point(output_file)
|
||||
|
||||
|
||||
def merge_parquet_files(original_path, temp_path, merged_path):
|
||||
"""
|
||||
Merge two parquet files by streaming row groups.
|
||||
|
||||
Returns:
|
||||
"merged" - merged file was created from both sources
|
||||
"original_only" - temp was invalid, keep original unchanged
|
||||
"temp_only" - original was corrupted but temp is valid
|
||||
"both_invalid" - both files invalid
|
||||
False - both files were valid but empty
|
||||
"""
|
||||
original_valid = False
|
||||
temp_valid = False
|
||||
original_pq = None
|
||||
temp_pq = None
|
||||
|
||||
try:
|
||||
original_pq = pq.ParquetFile(original_path)
|
||||
original_valid = True
|
||||
except Exception as e:
|
||||
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
|
||||
|
||||
try:
|
||||
if not os.path.exists(temp_path):
|
||||
print(f"Note: Temp file {temp_path} does not exist", file=sys.stderr)
|
||||
else:
|
||||
temp_pq = pq.ParquetFile(temp_path)
|
||||
temp_valid = True
|
||||
except Exception:
|
||||
print(f"Note: No new data in temp file {temp_path}", file=sys.stderr)
|
||||
|
||||
if not original_valid and not temp_valid:
|
||||
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
|
||||
return "both_invalid"
|
||||
|
||||
if not original_valid and temp_valid:
|
||||
print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr)
|
||||
return "temp_only"
|
||||
|
||||
if original_valid and not temp_valid:
|
||||
return "original_only"
|
||||
|
||||
merged_writer = None
|
||||
|
||||
for i in range(original_pq.num_row_groups):
|
||||
row_group = original_pq.read_row_group(i)
|
||||
if merged_writer is None:
|
||||
merged_writer = pq.ParquetWriter(
|
||||
merged_path,
|
||||
row_group.schema,
|
||||
flavor="spark"
|
||||
)
|
||||
merged_writer.write_table(row_group)
|
||||
|
||||
for i in range(temp_pq.num_row_groups):
|
||||
row_group = temp_pq.read_row_group(i)
|
||||
if merged_writer is None:
|
||||
merged_writer = pq.ParquetWriter(
|
||||
merged_path,
|
||||
row_group.schema,
|
||||
flavor="spark"
|
||||
)
|
||||
merged_writer.write_table(row_group)
|
||||
|
||||
if merged_writer is not None:
|
||||
merged_writer.close()
|
||||
return "merged"
|
||||
return False
|
||||
|
||||
|
||||
def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
|
||||
"""
|
||||
Merge partitioned namespace directories after resume.
|
||||
|
||||
Returns:
|
||||
True if at least one namespace has valid data after merge
|
||||
False if all namespaces ended up with corrupted/deleted data
|
||||
"""
|
||||
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
||||
had_corruption = False
|
||||
expected_temp = file_filter + temp_suffix
|
||||
|
||||
for ns_dir in namespace_dirs:
|
||||
ns_path = os.path.join(partition_dir, ns_dir)
|
||||
temp_path = os.path.join(ns_path, expected_temp)
|
||||
|
||||
if not os.path.exists(temp_path):
|
||||
continue
|
||||
|
||||
original_file = file_filter
|
||||
original_path = os.path.join(ns_path, original_file)
|
||||
|
||||
if os.path.exists(original_path):
|
||||
merged_path = original_path + ".merged"
|
||||
merged = merge_parquet_files(original_path, temp_path, merged_path)
|
||||
|
||||
if merged == "original_only":
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
elif merged == "temp_only":
|
||||
os.remove(original_path)
|
||||
os.rename(temp_path, original_path)
|
||||
elif merged == "both_invalid":
|
||||
if os.path.exists(original_path):
|
||||
os.remove(original_path)
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
had_corruption = True
|
||||
elif merged == "merged":
|
||||
os.remove(original_path)
|
||||
os.rename(merged_path, original_path)
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
else:
|
||||
if os.path.exists(original_path):
|
||||
os.remove(original_path)
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
else:
|
||||
try:
|
||||
pq.ParquetFile(temp_path)
|
||||
os.rename(temp_path, original_path)
|
||||
except Exception:
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
had_corruption = True
|
||||
|
||||
return had_corruption
|
||||
|
||||
|
||||
def finalize_resume_merge(
|
||||
original_output_file,
|
||||
temp_output_file,
|
||||
partition_namespaces,
|
||||
original_partition_dir
|
||||
):
|
||||
"""
|
||||
Finalize the resume by merging temp output with original output.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
print("Merging resumed data with existing output...", file=sys.stderr)
|
||||
try:
|
||||
if partition_namespaces and original_partition_dir is not None:
|
||||
file_filter = os.path.basename(original_output_file)
|
||||
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
|
||||
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
|
||||
shutil.rmtree(temp_output_file)
|
||||
else:
|
||||
merged_output_file = original_output_file + ".merged"
|
||||
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
|
||||
|
||||
if merged == "original_only":
|
||||
if os.path.exists(temp_output_file):
|
||||
os.remove(temp_output_file)
|
||||
elif merged == "temp_only":
|
||||
os.remove(original_output_file)
|
||||
os.rename(temp_output_file, original_output_file)
|
||||
elif merged == "both_invalid":
|
||||
os.remove(original_output_file)
|
||||
if os.path.exists(temp_output_file):
|
||||
os.remove(temp_output_file)
|
||||
elif merged == "merged":
|
||||
os.remove(original_output_file)
|
||||
os.rename(merged_output_file, original_output_file)
|
||||
if os.path.exists(temp_output_file):
|
||||
os.remove(temp_output_file)
|
||||
else:
|
||||
os.remove(original_output_file)
|
||||
if os.path.exists(temp_output_file):
|
||||
os.remove(temp_output_file)
|
||||
|
||||
print("Merge complete.", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr)
|
||||
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
|
||||
def setup_resume_temp_output(output_file, partition_namespaces):
|
||||
"""
|
||||
Set up temp output for resume mode (Parquet only).
|
||||
|
||||
Returns:
|
||||
Tuple of (original_output_file, temp_output_file, original_partition_dir)
|
||||
or (None, None, None) if no existing output to resume from.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
original_output_file = None
|
||||
temp_output_file = None
|
||||
original_partition_dir = None
|
||||
|
||||
if partition_namespaces:
|
||||
partition_dir = os.path.dirname(output_file)
|
||||
output_filename = os.path.basename(output_file)
|
||||
output_exists = False
|
||||
if os.path.isdir(partition_dir):
|
||||
for d in os.listdir(partition_dir):
|
||||
if d.startswith('namespace='):
|
||||
if os.path.exists(os.path.join(partition_dir, d, output_filename)):
|
||||
output_exists = True
|
||||
break
|
||||
if output_exists:
|
||||
original_partition_dir = partition_dir
|
||||
else:
|
||||
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
|
||||
|
||||
if output_exists:
|
||||
original_output_file = output_file
|
||||
temp_output_file = output_file + ".resume_temp"
|
||||
|
||||
if os.path.exists(temp_output_file):
|
||||
if os.path.isdir(temp_output_file):
|
||||
shutil.rmtree(temp_output_file)
|
||||
else:
|
||||
os.remove(temp_output_file)
|
||||
|
||||
if partition_namespaces:
|
||||
os.makedirs(temp_output_file, exist_ok=True)
|
||||
|
||||
return original_output_file, temp_output_file, original_partition_dir
|
||||
@@ -2,7 +2,7 @@ import sys
|
||||
from abc import abstractmethod, ABC
|
||||
from datetime import datetime, timezone
|
||||
from hashlib import sha1
|
||||
from typing import Generic, TypeVar, Union
|
||||
from typing import Generic, TypeVar, Union, TYPE_CHECKING
|
||||
|
||||
import mwreverts
|
||||
import mwtypes
|
||||
@@ -10,13 +10,13 @@ import mwxml
|
||||
|
||||
import pyarrow as pa
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from wikiq.wikitext_parser import WikitextParser
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class RevisionField(ABC, Generic[T]):
|
||||
def __init__(self):
|
||||
self.data: list[T] = []
|
||||
|
||||
"""
|
||||
Abstract type which represents a field in a table of page revisions.
|
||||
"""
|
||||
@@ -40,14 +40,6 @@ class RevisionField(ABC, Generic[T]):
|
||||
"""
|
||||
pass
|
||||
|
||||
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> None:
|
||||
self.data.append(self.extract(page, revisions))
|
||||
|
||||
def pop(self) -> list[T]:
|
||||
data = self.data
|
||||
self.data = []
|
||||
return data
|
||||
|
||||
|
||||
class RevisionTable:
|
||||
columns: list[RevisionField]
|
||||
@@ -55,19 +47,15 @@ class RevisionTable:
|
||||
def __init__(self, columns: list[RevisionField]):
|
||||
self.columns = columns
|
||||
|
||||
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]):
|
||||
for column in self.columns:
|
||||
column.add(page=page, revisions=revisions)
|
||||
|
||||
def schema(self) -> pa.Schema:
|
||||
return pa.schema([c.field for c in self.columns])
|
||||
|
||||
def pop(self) -> dict:
|
||||
data = dict()
|
||||
for column in self.columns:
|
||||
data[column.field.name] = column.pop()
|
||||
|
||||
return data
|
||||
def extract_row(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> dict:
|
||||
"""Extract a single row dict for the given page and revisions."""
|
||||
return {
|
||||
column.field.name: column.extract(page, revisions)
|
||||
for column in self.columns
|
||||
}
|
||||
|
||||
|
||||
class RevisionId(RevisionField[int]):
|
||||
@@ -217,3 +205,108 @@ class RevisionText(RevisionField[str]):
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
|
||||
revision = revisions[-1]
|
||||
return revision.text
|
||||
|
||||
|
||||
class RevisionExternalLinks(RevisionField[Union[list[str], None]]):
|
||||
"""Extract all external links from revision text."""
|
||||
|
||||
field = pa.field("external_links", pa.list_(pa.string()), nullable=True)
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
|
||||
revision = revisions[-1]
|
||||
if revision.deleted.text:
|
||||
return None
|
||||
return self.wikitext_parser.extract_external_links(revision.text)
|
||||
|
||||
|
||||
class RevisionCitations(RevisionField[Union[list[str], None]]):
|
||||
"""Extract citations from ref tags and cite templates."""
|
||||
|
||||
field = pa.field("citations", pa.list_(pa.string()), nullable=True)
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
|
||||
revision = revisions[-1]
|
||||
if revision.deleted.text:
|
||||
return None
|
||||
return self.wikitext_parser.extract_citations(revision.text)
|
||||
|
||||
|
||||
class RevisionWikilinks(RevisionField[Union[list[dict], None]]):
|
||||
"""Extract all internal wikilinks from revision text."""
|
||||
|
||||
# Struct type with title and optional display text
|
||||
field = pa.field("wikilinks", pa.list_(pa.struct([
|
||||
pa.field("title", pa.string()),
|
||||
pa.field("text", pa.string(), nullable=True),
|
||||
])), nullable=True)
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||
revision = revisions[-1]
|
||||
if revision.deleted.text:
|
||||
return None
|
||||
return self.wikitext_parser.extract_wikilinks(revision.text)
|
||||
|
||||
|
||||
class RevisionTemplates(RevisionField[Union[list[dict], None]]):
|
||||
"""Extract all templates from revision text."""
|
||||
|
||||
# Struct type with name and params map
|
||||
field = pa.field("templates", pa.list_(pa.struct([
|
||||
pa.field("name", pa.string()),
|
||||
pa.field("params", pa.map_(pa.string(), pa.string())),
|
||||
])), nullable=True)
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||
revision = revisions[-1]
|
||||
if revision.deleted.text:
|
||||
return None
|
||||
return self.wikitext_parser.extract_templates(revision.text)
|
||||
|
||||
|
||||
class RevisionHeadings(RevisionField[Union[list[dict], None]]):
|
||||
"""Extract all section headings from revision text."""
|
||||
|
||||
# Struct type with level and text
|
||||
field = pa.field("headings", pa.list_(pa.struct([
|
||||
pa.field("level", pa.int8()),
|
||||
pa.field("text", pa.string()),
|
||||
])), nullable=True)
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||
revision = revisions[-1]
|
||||
if revision.deleted.text:
|
||||
return None
|
||||
return self.wikitext_parser.extract_headings(revision.text)
|
||||
|
||||
|
||||
class RevisionParserTimeout(RevisionField[bool]):
|
||||
"""Track whether the wikitext parser timed out for this revision."""
|
||||
|
||||
field = pa.field("parser_timeout", pa.bool_())
|
||||
|
||||
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||
super().__init__()
|
||||
self.wikitext_parser = wikitext_parser
|
||||
|
||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
|
||||
return self.wikitext_parser.last_parse_timed_out
|
||||
|
||||
133
src/wikiq/wikitext_parser.py
Normal file
133
src/wikiq/wikitext_parser.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Shared wikitext parser with caching to avoid duplicate parsing."""
|
||||
from __future__ import annotations
|
||||
|
||||
import signal
|
||||
|
||||
import mwparserfromhell
|
||||
|
||||
PARSER_TIMEOUT = 60 # seconds
|
||||
|
||||
|
||||
class WikitextParser:
|
||||
"""Caches parsed wikicode to avoid re-parsing the same text."""
|
||||
|
||||
CITE_TEMPLATES = {
|
||||
'cite web', 'cite book', 'cite journal', 'cite news',
|
||||
'cite magazine', 'cite conference', 'cite encyclopedia',
|
||||
'cite report', 'cite thesis', 'cite press release',
|
||||
'citation', 'sfn', 'harvnb', 'harv'
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._cached_text: str | None = None
|
||||
self._cached_wikicode = None
|
||||
self.last_parse_timed_out: bool = False
|
||||
|
||||
def _timeout_handler(self, signum, frame):
|
||||
raise TimeoutError("mwparserfromhell parse exceeded timeout")
|
||||
|
||||
def _get_wikicode(self, text: str):
|
||||
"""Parse text and cache result. Returns cached result if text unchanged."""
|
||||
if text == self._cached_text:
|
||||
return self._cached_wikicode
|
||||
|
||||
old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
|
||||
signal.alarm(PARSER_TIMEOUT)
|
||||
try:
|
||||
self._cached_wikicode = mwparserfromhell.parse(text)
|
||||
self._cached_text = text
|
||||
self.last_parse_timed_out = False
|
||||
except TimeoutError:
|
||||
self._cached_wikicode = None
|
||||
self._cached_text = text
|
||||
self.last_parse_timed_out = True
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, old_handler)
|
||||
|
||||
return self._cached_wikicode
|
||||
|
||||
def extract_external_links(self, text: str | None) -> list[str] | None:
|
||||
"""Extract all external link URLs from wikitext."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
wikicode = self._get_wikicode(text)
|
||||
return [str(link.url) for link in wikicode.filter_external_links()]
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def extract_citations(self, text: str | None) -> list[str] | None:
|
||||
"""Extract citations from ref tags and cite templates."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
wikicode = self._get_wikicode(text)
|
||||
citations = []
|
||||
|
||||
# Extract ref tag contents
|
||||
for tag in wikicode.filter_tags():
|
||||
if tag.tag.lower() == 'ref':
|
||||
content = str(tag.contents).strip()
|
||||
if content:
|
||||
citations.append(f"ref:{content}")
|
||||
|
||||
# Extract cite templates
|
||||
for template in wikicode.filter_templates():
|
||||
template_name = str(template.name).strip().lower()
|
||||
if any(template_name.startswith(cite) for cite in self.CITE_TEMPLATES):
|
||||
citations.append(f"template:{str(template)}")
|
||||
|
||||
return citations
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def extract_wikilinks(self, text: str | None) -> list[dict[str, str | None]] | None:
|
||||
"""Extract all internal wikilinks with title and display text."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
wikicode = self._get_wikicode(text)
|
||||
result = []
|
||||
for link in wikicode.filter_wikilinks():
|
||||
title = str(link.title).strip()
|
||||
# text is None if no pipe, otherwise the display text
|
||||
display_text = str(link.text).strip() if link.text else None
|
||||
result.append({"title": title, "text": display_text})
|
||||
return result
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def extract_templates(self, text: str | None) -> list[dict] | None:
|
||||
"""Extract all templates with their names and parameters."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
wikicode = self._get_wikicode(text)
|
||||
result = []
|
||||
for template in wikicode.filter_templates():
|
||||
name = str(template.name).strip()
|
||||
params = {}
|
||||
for param in template.params:
|
||||
param_name = str(param.name).strip()
|
||||
param_value = str(param.value).strip()
|
||||
params[param_name] = param_value
|
||||
result.append({"name": name, "params": params})
|
||||
return result
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def extract_headings(self, text: str | None) -> list[dict] | None:
|
||||
"""Extract all section headings with their levels."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
wikicode = self._get_wikicode(text)
|
||||
result = []
|
||||
for heading in wikicode.filter_headings():
|
||||
level = heading.level
|
||||
heading_text = str(heading.title).strip()
|
||||
result.append({"level": level, "text": heading_text})
|
||||
return result
|
||||
except Exception:
|
||||
return None
|
||||
@@ -1,110 +1,61 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tracemalloc
|
||||
from io import StringIO
|
||||
from typing import Final, Union
|
||||
import pytest
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.json as pj
|
||||
import pytest
|
||||
from pandas import DataFrame
|
||||
from pandas.testing import assert_frame_equal, assert_series_equal
|
||||
|
||||
# Make references to files and wikiq relative to this file, not to the current working directory.
|
||||
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
||||
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
|
||||
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
||||
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
||||
|
||||
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
||||
SAILORMOON: Final[str] = "sailormoon"
|
||||
TWINPEAKS: Final[str] = "twinpeaks"
|
||||
REGEXTEST: Final[str] = "regextest"
|
||||
from wikiq import build_table, build_schema
|
||||
from wikiq_test_utils import (
|
||||
BASELINE_DIR,
|
||||
IKWIKI,
|
||||
REGEXTEST,
|
||||
SAILORMOON,
|
||||
TEST_DIR,
|
||||
TEST_OUTPUT_DIR,
|
||||
TWINPEAKS,
|
||||
WIKIQ,
|
||||
WikiqTester,
|
||||
)
|
||||
|
||||
|
||||
def setup():
|
||||
tracemalloc.start()
|
||||
|
||||
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
|
||||
if not os.path.exists(TEST_OUTPUT_DIR):
|
||||
os.mkdir(TEST_OUTPUT_DIR)
|
||||
|
||||
|
||||
# Always run setup, even if this is executed via "python -m unittest" rather
|
||||
# than as __main__.
|
||||
setup()
|
||||
|
||||
|
||||
class WikiqTester:
|
||||
def __init__(
|
||||
self,
|
||||
wiki: str,
|
||||
case_name: str,
|
||||
suffix: Union[str, None] = None,
|
||||
in_compression: str = "bz2",
|
||||
baseline_format: str = "tsv",
|
||||
out_format: str = "tsv",
|
||||
):
|
||||
self.input_file = os.path.join(
|
||||
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
|
||||
)
|
||||
def read_jsonl_with_schema(filepath: str, **schema_kwargs) -> pd.DataFrame:
|
||||
"""Read JSONL file using PyArrow with explicit schema from wikiq."""
|
||||
table, _ = build_table(**schema_kwargs)
|
||||
schema = build_schema(table, **schema_kwargs)
|
||||
pa_table = pj.read_json(
|
||||
filepath,
|
||||
parse_options=pj.ParseOptions(explicit_schema=schema),
|
||||
)
|
||||
return pa_table.to_pandas()
|
||||
|
||||
basename = "{0}_{1}".format(case_name, wiki)
|
||||
if suffix:
|
||||
basename = "{0}_{1}".format(basename, suffix)
|
||||
|
||||
self.output = os.path.join(
|
||||
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
|
||||
)
|
||||
|
||||
if os.path.exists(self.output):
|
||||
if os.path.isfile(self.output):
|
||||
os.remove(self.output)
|
||||
else:
|
||||
shutil.rmtree(self.output)
|
||||
|
||||
if out_format == "parquet":
|
||||
os.makedirs(self.output, exist_ok=True)
|
||||
|
||||
if suffix is None:
|
||||
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
||||
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
||||
else:
|
||||
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
|
||||
wiki, suffix, baseline_format
|
||||
)
|
||||
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
||||
|
||||
# If case_name is unset, there are no relevant baseline or test files.
|
||||
if case_name is not None:
|
||||
self.baseline_file = os.path.join(
|
||||
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
|
||||
)
|
||||
|
||||
def call_wikiq(self, *args: str, out: bool = True):
|
||||
"""
|
||||
Calls wikiq with the passed arguments on the input file relevant to the test.
|
||||
:param args: The command line arguments to pass to wikiq.
|
||||
:param out: Whether to pass an output argument to wikiq.
|
||||
:return: The output of the wikiq call.
|
||||
"""
|
||||
if out:
|
||||
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
|
||||
else:
|
||||
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
|
||||
|
||||
print(call)
|
||||
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
||||
|
||||
# with / without pwr DONE
|
||||
# with / without url encode DONE
|
||||
# with / without collapse user DONE
|
||||
# with output to stdout DONE
|
||||
# note that the persistence radius is 7 by default
|
||||
# reading various file formats including
|
||||
# 7z, gz, bz2, xml DONE
|
||||
# wikia and wikipedia data DONE
|
||||
# malformed xmls DONE
|
||||
# with / without pwr DONE
|
||||
# with / without url encode DONE
|
||||
# with / without collapse user DONE
|
||||
# with output to stdout DONE
|
||||
# note that the persistence radius is 7 by default
|
||||
# reading various file formats including
|
||||
# 7z, gz, bz2, xml DONE
|
||||
# wikia and wikipedia data DONE
|
||||
# malformed xmls DONE
|
||||
|
||||
def test_WP_noargs():
|
||||
tester = WikiqTester(IKWIKI, "noargs")
|
||||
@@ -186,7 +137,62 @@ def test_noargs():
|
||||
test = pd.read_table(tester.output)
|
||||
baseline = pd.read_table(tester.baseline_file)
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
|
||||
|
||||
def test_jsonl_noargs():
|
||||
"""Test JSONL output format with baseline comparison."""
|
||||
tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z", out_format="jsonl", baseline_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq()
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = read_jsonl_with_schema(tester.output)
|
||||
baseline = read_jsonl_with_schema(tester.baseline_file)
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
|
||||
def test_jsonl_tsv_equivalence():
|
||||
"""Test that JSONL and TSV outputs contain equivalent data."""
|
||||
tester_tsv = WikiqTester(SAILORMOON, "equiv_tsv", in_compression="7z", out_format="tsv")
|
||||
tester_jsonl = WikiqTester(SAILORMOON, "equiv_jsonl", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_tsv.call_wikiq()
|
||||
tester_jsonl.call_wikiq()
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
tsv_df = pd.read_table(tester_tsv.output)
|
||||
jsonl_df = read_jsonl_with_schema(tester_jsonl.output)
|
||||
|
||||
# Row counts must match
|
||||
assert len(tsv_df) == len(jsonl_df), f"Row count mismatch: TSV={len(tsv_df)}, JSONL={len(jsonl_df)}"
|
||||
|
||||
# Column sets must match
|
||||
assert set(tsv_df.columns) == set(jsonl_df.columns), \
|
||||
f"Column mismatch: TSV={set(tsv_df.columns)}, JSONL={set(jsonl_df.columns)}"
|
||||
|
||||
# Sort both by revid for comparison
|
||||
tsv_df = tsv_df.sort_values("revid").reset_index(drop=True)
|
||||
jsonl_df = jsonl_df.sort_values("revid").reset_index(drop=True)
|
||||
|
||||
# Normalize null values: TSV uses nan, schema-based JSONL uses None
|
||||
jsonl_df = jsonl_df.replace({None: np.nan})
|
||||
|
||||
# Compare columns - schema-based reading handles types correctly
|
||||
for col in tsv_df.columns:
|
||||
if col == "date_time":
|
||||
# TSV reads as string, JSONL with schema reads as datetime
|
||||
tsv_dates = pd.to_datetime(tsv_df[col]).dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
jsonl_dates = jsonl_df[col].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert_series_equal(tsv_dates, jsonl_dates, check_names=False)
|
||||
else:
|
||||
# Allow dtype differences (TSV infers int64, schema uses int32)
|
||||
assert_series_equal(tsv_df[col], jsonl_df[col], check_names=False, check_dtype=False)
|
||||
|
||||
|
||||
def test_collapse_user():
|
||||
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z")
|
||||
|
||||
@@ -199,19 +205,6 @@ def test_collapse_user():
|
||||
baseline = pd.read_table(tester.baseline_file)
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
def test_partition_namespaces():
|
||||
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z", out_format='parquet', baseline_format='parquet')
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--collapse-user", "--fandom-2020", "--partition-namespaces")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_parquet(os.path.join(tester.output,"namespace=10/sailormoon.parquet"))
|
||||
baseline = pd.read_parquet(tester.baseline_file)
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
|
||||
def test_pwr_wikidiff2():
|
||||
tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z")
|
||||
|
||||
@@ -263,46 +256,43 @@ def test_pwr():
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
def test_diff():
|
||||
tester = WikiqTester(SAILORMOON, "diff", in_compression="7z", out_format='parquet', baseline_format='parquet')
|
||||
tester = WikiqTester(SAILORMOON, "diff", in_compression="7z", out_format='jsonl')
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--diff", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||
baseline = pd.read_parquet(tester.baseline_file)
|
||||
|
||||
test = test.reindex(columns=sorted(test.columns))
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
assert "diff" in test.columns, "diff column should exist"
|
||||
assert "diff_timeout" in test.columns, "diff_timeout column should exist"
|
||||
assert len(test) > 0, "Should have output rows"
|
||||
|
||||
def test_diff_plus_pwr():
|
||||
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='parquet', baseline_format='parquet')
|
||||
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='jsonl')
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--diff --persistence wikidiff2", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||
baseline = pd.read_parquet(tester.baseline_file)
|
||||
|
||||
test = test.reindex(columns=sorted(test.columns))
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
assert "diff" in test.columns, "diff column should exist"
|
||||
assert "token_revs" in test.columns, "token_revs column should exist"
|
||||
assert len(test) > 0, "Should have output rows"
|
||||
|
||||
def test_text():
|
||||
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='parquet', baseline_format='parquet')
|
||||
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='jsonl')
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--diff", "--text","--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||
baseline = pd.read_parquet(tester.baseline_file)
|
||||
|
||||
test = test.reindex(columns=sorted(test.columns))
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
assert "text" in test.columns, "text column should exist"
|
||||
assert "diff" in test.columns, "diff column should exist"
|
||||
assert len(test) > 0, "Should have output rows"
|
||||
|
||||
def test_malformed_noargs():
|
||||
tester = WikiqTester(wiki=TWINPEAKS, case_name="noargs", in_compression="7z")
|
||||
@@ -401,41 +391,370 @@ def test_capturegroup_regex():
|
||||
baseline = pd.read_table(tester.baseline_file)
|
||||
assert_frame_equal(test, baseline, check_like=True)
|
||||
|
||||
def test_parquet():
|
||||
tester = WikiqTester(IKWIKI, "noargs", out_format="parquet")
|
||||
def test_external_links_only():
|
||||
"""Test that --external-links extracts external links correctly."""
|
||||
import mwparserfromhell
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "external_links_only", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq()
|
||||
# Also include --text so we can verify extraction against actual wikitext
|
||||
tester.call_wikiq("--external-links", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
# as a test let's make sure that we get equal data frames
|
||||
test: DataFrame = pd.read_parquet(tester.output)
|
||||
# test = test.drop(['reverteds'], axis=1)
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
baseline: DataFrame = pd.read_table(tester.baseline_file)
|
||||
# Verify external_links column exists
|
||||
assert "external_links" in test.columns, "external_links column should exist"
|
||||
|
||||
# Pandas does not read timestamps as the desired datetime type.
|
||||
baseline["date_time"] = pd.to_datetime(baseline["date_time"])
|
||||
# Split strings to the arrays of reverted IDs so they can be compared.
|
||||
baseline["revert"] = baseline["revert"].replace(np.nan, None)
|
||||
baseline["reverteds"] = baseline["reverteds"].replace(np.nan, None)
|
||||
# baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']]
|
||||
baseline["sha1"] = baseline["sha1"].replace(np.nan, None)
|
||||
baseline["editor"] = baseline["editor"].replace(np.nan, None)
|
||||
baseline["anon"] = baseline["anon"].replace(np.nan, None)
|
||||
# Verify citations column does NOT exist
|
||||
assert "citations" not in test.columns, "citations column should NOT exist when only --external-links is used"
|
||||
|
||||
for index, row in baseline.iterrows():
|
||||
if row["revert"] != test["revert"][index]:
|
||||
print(row["revid"], ":", row["revert"], "!=", test["revert"][index])
|
||||
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
|
||||
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||
"external_links should be a list/array type or None"
|
||||
|
||||
# Verify that extracted URLs look like valid URIs (have a scheme or are protocol-relative)
|
||||
all_urls = []
|
||||
for links in test["external_links"]:
|
||||
if links is not None and len(links) > 0:
|
||||
all_urls.extend(links)
|
||||
|
||||
for url in all_urls:
|
||||
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
|
||||
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
|
||||
is_protocol_relative = url.startswith("//")
|
||||
assert has_scheme or is_protocol_relative, \
|
||||
f"External link should be a valid URI, got: {url}"
|
||||
|
||||
# Verify extraction matches mwparserfromhell for a sample of rows with text
|
||||
rows_with_links = test[test["external_links"].apply(lambda x: x is not None and len(x) > 0)]
|
||||
if len(rows_with_links) > 0:
|
||||
# Test up to 5 rows
|
||||
sample = rows_with_links.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
wikicode = mwparserfromhell.parse(text)
|
||||
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
|
||||
actual_links = list(row["external_links"])
|
||||
assert actual_links == expected_links, \
|
||||
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
|
||||
|
||||
print(f"External links only test passed! {len(test)} rows, {len(all_urls)} total URLs extracted")
|
||||
|
||||
|
||||
def test_citations_only():
|
||||
"""Test that --citations extracts citations correctly."""
|
||||
import mwparserfromhell
|
||||
from wikiq.wikitext_parser import WikitextParser
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "citations_only", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
# Also include --text so we can verify extraction against actual wikitext
|
||||
tester.call_wikiq("--citations", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify citations column exists
|
||||
assert "citations" in test.columns, "citations column should exist"
|
||||
|
||||
# Verify external_links column does NOT exist
|
||||
assert "external_links" not in test.columns, "external_links column should NOT exist when only --citations is used"
|
||||
|
||||
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
|
||||
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||
"citations should be a list/array type or None"
|
||||
|
||||
# Verify that extracted citations have correct prefixes (ref: or template:)
|
||||
all_citations = []
|
||||
for citations in test["citations"]:
|
||||
if citations is not None and len(citations) > 0:
|
||||
all_citations.extend(citations)
|
||||
|
||||
for citation in all_citations:
|
||||
assert citation.startswith("ref:") or citation.startswith("template:"), \
|
||||
f"Citation should start with 'ref:' or 'template:', got: {citation}"
|
||||
|
||||
# Verify extraction matches WikitextParser for a sample of rows with text
|
||||
rows_with_citations = test[test["citations"].apply(lambda x: x is not None and len(x) > 0)]
|
||||
if len(rows_with_citations) > 0:
|
||||
parser = WikitextParser()
|
||||
# Test up to 5 rows
|
||||
sample = rows_with_citations.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
expected_citations = parser.extract_citations(text)
|
||||
actual_citations = list(row["citations"])
|
||||
assert actual_citations == expected_citations, \
|
||||
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
|
||||
|
||||
print(f"Citations only test passed! {len(test)} rows, {len(all_citations)} total citations extracted")
|
||||
|
||||
|
||||
def test_external_links_and_citations():
|
||||
"""Test that both --external-links and --citations work together (shared parser)."""
|
||||
import mwparserfromhell
|
||||
from wikiq.wikitext_parser import WikitextParser
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "external_links_and_citations", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
# Also include --text so we can verify extraction against actual wikitext
|
||||
tester.call_wikiq("--external-links", "--citations", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify both columns exist
|
||||
assert "external_links" in test.columns, "external_links column should exist"
|
||||
assert "citations" in test.columns, "citations column should exist"
|
||||
|
||||
# Verify both columns have list/array types (pandas reads parquet lists as numpy arrays)
|
||||
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||
"external_links should be a list/array type or None"
|
||||
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||
"citations should be a list/array type or None"
|
||||
|
||||
# Verify URLs look like valid URIs (have a scheme or are protocol-relative)
|
||||
all_urls = []
|
||||
for links in test["external_links"]:
|
||||
if links is not None and len(links) > 0:
|
||||
all_urls.extend(links)
|
||||
|
||||
for url in all_urls:
|
||||
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
|
||||
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
|
||||
is_protocol_relative = url.startswith("//")
|
||||
assert has_scheme or is_protocol_relative, \
|
||||
f"External link should be a valid URI, got: {url}"
|
||||
|
||||
# Verify citations have correct prefixes
|
||||
all_citations = []
|
||||
for citations in test["citations"]:
|
||||
if citations is not None and len(citations) > 0:
|
||||
all_citations.extend(citations)
|
||||
|
||||
for citation in all_citations:
|
||||
assert citation.startswith("ref:") or citation.startswith("template:"), \
|
||||
f"Citation should start with 'ref:' or 'template:', got: {citation}"
|
||||
|
||||
# Verify extraction matches WikitextParser for a sample of rows with text
|
||||
# This tests that the shared parser optimization works correctly
|
||||
parser = WikitextParser()
|
||||
rows_with_content = test[
|
||||
(test["external_links"].apply(lambda x: x is not None and len(x) > 0)) |
|
||||
(test["citations"].apply(lambda x: x is not None and len(x) > 0))
|
||||
]
|
||||
if len(rows_with_content) > 0:
|
||||
# Test up to 5 rows
|
||||
sample = rows_with_content.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
# Verify external links
|
||||
wikicode = mwparserfromhell.parse(text)
|
||||
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
|
||||
actual_links = list(row["external_links"]) if row["external_links"] is not None else []
|
||||
assert actual_links == expected_links, \
|
||||
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
|
||||
|
||||
# Verify citations
|
||||
expected_citations = parser.extract_citations(text)
|
||||
actual_citations = list(row["citations"]) if row["citations"] is not None else []
|
||||
assert actual_citations == expected_citations, \
|
||||
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
|
||||
|
||||
print(f"External links and citations test passed! {len(test)} rows, {len(all_urls)} URLs, {len(all_citations)} citations")
|
||||
|
||||
|
||||
def test_no_wikitext_columns():
|
||||
"""Test that neither external_links nor citations columns exist without flags."""
|
||||
tester = WikiqTester(SAILORMOON, "no_wikitext_columns", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify neither column exists
|
||||
assert "external_links" not in test.columns, "external_links column should NOT exist without --external-links flag"
|
||||
assert "citations" not in test.columns, "citations column should NOT exist without --citations flag"
|
||||
|
||||
print(f"No wikitext columns test passed! {len(test)} rows processed")
|
||||
|
||||
|
||||
def test_wikilinks():
|
||||
"""Test that --wikilinks extracts internal wikilinks correctly."""
|
||||
import mwparserfromhell
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "wikilinks", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--wikilinks", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify wikilinks column exists
|
||||
assert "wikilinks" in test.columns, "wikilinks column should exist"
|
||||
|
||||
# Verify column has list/array type
|
||||
assert test["wikilinks"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||
|
||||
# Verify extraction matches mwparserfromhell for sample rows
|
||||
rows_with_links = test[test["wikilinks"].apply(lambda x: x is not None and len(x) > 0)]
|
||||
if len(rows_with_links) > 0:
|
||||
sample = rows_with_links.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
wikicode = mwparserfromhell.parse(text)
|
||||
expected = []
|
||||
for link in wikicode.filter_wikilinks():
|
||||
title = str(link.title).strip()
|
||||
display_text = str(link.text).strip() if link.text else None
|
||||
expected.append({"title": title, "text": display_text})
|
||||
|
||||
actual = list(row["wikilinks"])
|
||||
# Convert to comparable format (pandas may read as dicts or named tuples)
|
||||
actual_dicts = [{"title": item["title"], "text": item["text"]} for item in actual]
|
||||
assert actual_dicts == expected, f"Row {idx}: wikilinks mismatch"
|
||||
|
||||
print(f"Wikilinks test passed! {len(test)} rows processed")
|
||||
|
||||
|
||||
def test_templates():
|
||||
"""Test that --templates extracts templates correctly."""
|
||||
import mwparserfromhell
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "templates", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--templates", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify templates column exists
|
||||
assert "templates" in test.columns, "templates column should exist"
|
||||
|
||||
# Verify column has list/array type
|
||||
assert test["templates"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||
|
||||
# Verify extraction matches mwparserfromhell for sample rows
|
||||
rows_with_templates = test[test["templates"].apply(lambda x: x is not None and len(x) > 0)]
|
||||
if len(rows_with_templates) > 0:
|
||||
sample = rows_with_templates.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
wikicode = mwparserfromhell.parse(text)
|
||||
expected = []
|
||||
for template in wikicode.filter_templates():
|
||||
name = str(template.name).strip()
|
||||
params = {}
|
||||
for param in template.params:
|
||||
param_name = str(param.name).strip()
|
||||
param_value = str(param.value).strip()
|
||||
params[param_name] = param_value
|
||||
expected.append({"name": name, "params": params})
|
||||
|
||||
actual = list(row["templates"])
|
||||
# Convert to comparable format
|
||||
actual_list = []
|
||||
for item in actual:
|
||||
actual_list.append({
|
||||
"name": item["name"],
|
||||
"params": dict(item["params"]) if item["params"] else {}
|
||||
})
|
||||
assert actual_list == expected, f"Row {idx}: templates mismatch"
|
||||
|
||||
print(f"Templates test passed! {len(test)} rows processed")
|
||||
|
||||
|
||||
def test_headings():
|
||||
"""Test that --headings extracts section headings correctly."""
|
||||
import mwparserfromhell
|
||||
|
||||
tester = WikiqTester(SAILORMOON, "headings", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--headings", "--text", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test = pd.read_json(tester.output, lines=True)
|
||||
|
||||
# Verify headings column exists
|
||||
assert "headings" in test.columns, "headings column should exist"
|
||||
|
||||
# Verify column has list/array type
|
||||
assert test["headings"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||
|
||||
# Verify extraction matches mwparserfromhell for sample rows
|
||||
rows_with_headings = test[test["headings"].apply(lambda x: x is not None and len(x) > 0)]
|
||||
if len(rows_with_headings) > 0:
|
||||
sample = rows_with_headings.head(5)
|
||||
for idx, row in sample.iterrows():
|
||||
text = row["text"]
|
||||
if text:
|
||||
wikicode = mwparserfromhell.parse(text)
|
||||
expected = []
|
||||
for heading in wikicode.filter_headings():
|
||||
level = heading.level
|
||||
heading_text = str(heading.title).strip()
|
||||
expected.append({"level": level, "text": heading_text})
|
||||
|
||||
actual = list(row["headings"])
|
||||
# Convert to comparable format
|
||||
actual_list = [{"level": item["level"], "text": item["text"]} for item in actual]
|
||||
assert actual_list == expected, f"Row {idx}: headings mismatch"
|
||||
|
||||
print(f"Headings test passed! {len(test)} rows processed")
|
||||
|
||||
|
||||
def test_parquet_output():
|
||||
"""Test that Parquet output format works correctly."""
|
||||
tester = WikiqTester(SAILORMOON, "parquet_output", in_compression="7z", out_format="parquet")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
# Verify output file exists
|
||||
assert os.path.exists(tester.output), f"Parquet output file should exist at {tester.output}"
|
||||
|
||||
# Read and verify content
|
||||
test = pd.read_parquet(tester.output)
|
||||
|
||||
# Verify expected columns exist
|
||||
assert "revid" in test.columns
|
||||
assert "articleid" in test.columns
|
||||
assert "title" in test.columns
|
||||
assert "namespace" in test.columns
|
||||
|
||||
# Verify row count matches JSONL output
|
||||
tester_jsonl = WikiqTester(SAILORMOON, "parquet_compare", in_compression="7z", out_format="jsonl")
|
||||
try:
|
||||
tester_jsonl.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
test_jsonl = pd.read_json(tester_jsonl.output, lines=True)
|
||||
assert len(test) == len(test_jsonl), f"Parquet and JSONL should have same row count: {len(test)} vs {len(test_jsonl)}"
|
||||
|
||||
print(f"Parquet output test passed! {len(test)} rows")
|
||||
|
||||
for col in baseline.columns:
|
||||
try:
|
||||
assert_series_equal(
|
||||
test[col], baseline[col], check_like=True, check_dtype=False
|
||||
)
|
||||
except ValueError as exc:
|
||||
print(f"Error comparing column {col}")
|
||||
pytest.fail(exc)
|
||||
|
||||
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
5
test/conftest.py
Normal file
5
test/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Add the test directory to Python path so test utilities can be imported
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
808
test/test_resume.py
Normal file
808
test/test_resume.py
Normal file
@@ -0,0 +1,808 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from wikiq.resume import (
|
||||
get_checkpoint_path,
|
||||
read_checkpoint,
|
||||
)
|
||||
from wikiq_test_utils import (
|
||||
SAILORMOON,
|
||||
TEST_DIR,
|
||||
TEST_OUTPUT_DIR,
|
||||
WIKIQ,
|
||||
WikiqTester,
|
||||
)
|
||||
|
||||
|
||||
def read_jsonl(filepath):
|
||||
"""Read JSONL file and return list of dicts."""
|
||||
rows = []
|
||||
with open(filepath, 'r') as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
rows.append(json.loads(line))
|
||||
return rows
|
||||
|
||||
|
||||
def test_resume():
|
||||
"""Test that --resume properly resumes processing from the last checkpoint."""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
middle_idx = len(full_rows) // 2
|
||||
resume_revid = full_rows[middle_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:middle_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[middle_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_with_diff():
|
||||
"""Test that --resume correctly computes diff values after resume.
|
||||
|
||||
The diff computation depends on having the correct prev_text state.
|
||||
This test verifies that diff values (text_chars, added_chars, etc.)
|
||||
are identical between a full run and a resumed run.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--diff", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
resume_idx = len(full_rows) // 3
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
|
||||
# Verify diff columns are present
|
||||
diff_columns = ["text_chars", "diff", "diff_timeout"]
|
||||
for col in diff_columns:
|
||||
assert col in df_full.columns, f"Diff column {col} should exist in full output"
|
||||
assert col in df_resumed.columns, f"Diff column {col} should exist in resumed output"
|
||||
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_file_not_found():
|
||||
"""Test that --resume starts fresh when output file doesn't exist."""
|
||||
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="jsonl")
|
||||
|
||||
expected_output = tester.output
|
||||
if os.path.exists(expected_output):
|
||||
os.remove(expected_output)
|
||||
|
||||
# Should succeed by starting fresh
|
||||
tester.call_wikiq("--fandom-2020", "--resume")
|
||||
|
||||
# Verify output was created
|
||||
assert os.path.exists(expected_output), "Output file should be created when starting fresh"
|
||||
rows = read_jsonl(expected_output)
|
||||
assert len(rows) > 0, "Output should have data"
|
||||
|
||||
print("Resume file not found test passed - started fresh!")
|
||||
|
||||
|
||||
def test_resume_simple():
|
||||
"""Test that --resume works without --fandom-2020."""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq()
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
resume_idx = len(full_rows) // 3
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_checkpoint_read():
|
||||
"""Test that read_checkpoint correctly reads checkpoint files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
checkpoint_path = os.path.join(tmpdir, "test.jsonl.checkpoint")
|
||||
|
||||
# Test reading valid checkpoint
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": 100, "revid": 200}, f)
|
||||
|
||||
result = read_checkpoint(checkpoint_path)
|
||||
assert result == (100, 200), f"Expected (100, 200), got {result}"
|
||||
|
||||
# Test reading non-existent checkpoint
|
||||
result = read_checkpoint(os.path.join(tmpdir, "nonexistent.checkpoint"))
|
||||
assert result is None, f"Expected None for non-existent file, got {result}"
|
||||
|
||||
# Test reading empty checkpoint
|
||||
empty_path = os.path.join(tmpdir, "empty.checkpoint")
|
||||
with open(empty_path, 'w') as f:
|
||||
f.write("{}")
|
||||
result = read_checkpoint(empty_path)
|
||||
assert result is None, f"Expected None for empty checkpoint, got {result}"
|
||||
|
||||
# Test reading corrupted checkpoint
|
||||
corrupt_path = os.path.join(tmpdir, "corrupt.checkpoint")
|
||||
with open(corrupt_path, 'w') as f:
|
||||
f.write("not valid json")
|
||||
result = read_checkpoint(corrupt_path)
|
||||
assert result is None, f"Expected None for corrupted checkpoint, got {result}"
|
||||
|
||||
print("Checkpoint read test passed!")
|
||||
|
||||
|
||||
def test_resume_with_interruption():
|
||||
"""Test that resume works correctly after interruption."""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
output_dir = os.path.join(TEST_OUTPUT_DIR, "resume_interrupt")
|
||||
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
os.makedirs(output_dir)
|
||||
|
||||
output_file = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
|
||||
|
||||
# First, run to completion to know expected output
|
||||
cmd_full = f"{WIKIQ} {input_file} -o {output_file} --fandom-2020"
|
||||
try:
|
||||
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_rows = read_jsonl(output_file)
|
||||
|
||||
# Clean up for interrupted run
|
||||
if os.path.exists(output_file):
|
||||
os.remove(output_file)
|
||||
checkpoint_path = get_checkpoint_path(output_file)
|
||||
if os.path.exists(checkpoint_path):
|
||||
os.remove(checkpoint_path)
|
||||
|
||||
# Start wikiq and interrupt it
|
||||
cmd_partial = [
|
||||
sys.executable, WIKIQ, input_file,
|
||||
"-o", output_file,
|
||||
"--batch-size", "10",
|
||||
"--fandom-2020"
|
||||
]
|
||||
|
||||
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
|
||||
|
||||
interrupt_delay = 3
|
||||
time.sleep(interrupt_delay)
|
||||
|
||||
if proc.poll() is not None:
|
||||
# Process completed before we could interrupt
|
||||
interrupted_rows = read_jsonl(output_file)
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_interrupted = pd.DataFrame(interrupted_rows)
|
||||
assert_frame_equal(df_full, df_interrupted)
|
||||
return
|
||||
|
||||
proc.send_signal(signal.SIGUSR1)
|
||||
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.send_signal(signal.SIGTERM)
|
||||
proc.wait(timeout=30)
|
||||
|
||||
interrupted_rows = read_jsonl(output_file)
|
||||
|
||||
if len(interrupted_rows) >= len(full_rows):
|
||||
# Process completed before interrupt
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_interrupted = pd.DataFrame(interrupted_rows)
|
||||
assert_frame_equal(df_full, df_interrupted)
|
||||
return
|
||||
|
||||
# Now resume
|
||||
cmd_resume = f"{WIKIQ} {input_file} -o {output_file} --batch-size 10 --fandom-2020 --resume"
|
||||
try:
|
||||
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(output_file)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_parquet():
|
||||
"""Test that --resume works correctly with Parquet output format."""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_parquet_full", in_compression="7z", out_format="parquet")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_table = pq.read_table(full_output_path)
|
||||
|
||||
# Use unsorted indices consistently - slice the table and get checkpoint from same position
|
||||
resume_idx = len(full_table) // 3
|
||||
resume_revid = int(full_table.column("revid")[resume_idx].as_py())
|
||||
resume_pageid = int(full_table.column("articleid")[resume_idx].as_py())
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_parquet_partial", in_compression="7z", out_format="parquet")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
# Write partial Parquet file using the SAME schema as the full file
|
||||
partial_table = full_table.slice(0, resume_idx + 1)
|
||||
pq.write_table(partial_table, partial_output_path)
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
df_full = full_table.to_pandas()
|
||||
df_resumed = pd.read_parquet(partial_output_path)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_tsv_error():
|
||||
"""Test that --resume with TSV output produces a proper error message."""
|
||||
tester = WikiqTester(SAILORMOON, "resume_tsv_error", in_compression="7z", out_format="tsv")
|
||||
|
||||
try:
|
||||
tester.call_wikiq("--fandom-2020", "--resume")
|
||||
pytest.fail("Expected error for --resume with TSV output")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
stderr = exc.stderr.decode("utf8")
|
||||
assert "Error: --resume only works with JSONL or Parquet" in stderr, \
|
||||
f"Expected proper error message, got: {stderr}"
|
||||
|
||||
print("TSV resume error test passed!")
|
||||
|
||||
|
||||
def test_resume_data_equivalence():
|
||||
"""Test that resumed output produces exactly equivalent data to a full run.
|
||||
|
||||
The revert detector state is maintained during the skip phase, so
|
||||
revert detection should be identical to a full run.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_equiv_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
resume_idx = len(full_rows) // 3
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_equiv_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_with_persistence():
|
||||
"""Test that --resume correctly handles persistence state after resume.
|
||||
|
||||
Persistence (PWR) depends on maintaining token state across revisions.
|
||||
This test verifies that persistence values (token_revs) are identical
|
||||
between a full run and a resumed run.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_persist_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--persistence wikidiff2", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
resume_idx = len(full_rows) // 4
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_persist_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--persistence wikidiff2", "--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
|
||||
# Check persistence columns are present
|
||||
assert "token_revs" in df_full.columns, "token_revs should exist in full output"
|
||||
assert "token_revs" in df_resumed.columns, "token_revs should exist in resumed output"
|
||||
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_corrupted_jsonl_last_line():
|
||||
"""Test that JSONL resume correctly handles corrupted/incomplete last line.
|
||||
|
||||
When the previous run was interrupted mid-write leaving an incomplete JSON
|
||||
line, the resume should:
|
||||
1. Find the resume point from the last valid line (no checkpoint file needed)
|
||||
2. Truncate the corrupted trailing data
|
||||
3. Append new data, resulting in valid JSONL
|
||||
"""
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_corrupt_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_rows = read_jsonl(tester_full.output)
|
||||
|
||||
# Create a partial file with a corrupted last line
|
||||
tester_corrupt = WikiqTester(SAILORMOON, "resume_corrupt_test", in_compression="7z", out_format="jsonl")
|
||||
corrupt_output_path = tester_corrupt.output
|
||||
|
||||
resume_idx = len(full_rows) // 2
|
||||
|
||||
with open(corrupt_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
# Write incomplete JSON (simulates crash mid-write)
|
||||
f.write('{"revid": 999, "articleid": 123, "incomplet')
|
||||
|
||||
# Record file size before resume
|
||||
size_before = os.path.getsize(corrupt_output_path)
|
||||
|
||||
# NO checkpoint file - JSONL resume works from last valid line in the file
|
||||
checkpoint_path = get_checkpoint_path(corrupt_output_path)
|
||||
assert not os.path.exists(checkpoint_path), "Test setup error: checkpoint should not exist"
|
||||
|
||||
# Resume should detect corrupted line, truncate it, then append new data
|
||||
try:
|
||||
tester_corrupt.call_wikiq("--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(f"Resume failed unexpectedly: {exc.stderr.decode('utf8')}")
|
||||
|
||||
# Verify the file is valid JSONL and readable (no corrupted lines)
|
||||
resumed_rows = read_jsonl(corrupt_output_path)
|
||||
|
||||
# Full data equivalence check
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_diff_persistence_combined():
|
||||
"""Test that --resume correctly handles both diff and persistence state together.
|
||||
|
||||
This tests that multiple stateful features work correctly when combined.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_combined_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_output_path = tester_full.output
|
||||
full_rows = read_jsonl(full_output_path)
|
||||
|
||||
resume_idx = len(full_rows) // 3
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_combined_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
|
||||
# Verify both diff and persistence columns exist
|
||||
assert "diff" in df_full.columns
|
||||
assert "token_revs" in df_full.columns
|
||||
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_mid_page():
|
||||
"""Test resume from the middle of a page with many revisions.
|
||||
|
||||
This specifically tests that state restoration works when resuming
|
||||
partway through a page's revision history.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_midpage_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--diff", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_rows = read_jsonl(tester_full.output)
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
|
||||
# Find a page with many revisions
|
||||
page_counts = df_full.groupby("articleid").size()
|
||||
large_page_id = page_counts[page_counts >= 10].index[0] if any(page_counts >= 10) else page_counts.idxmax()
|
||||
page_revs = df_full[df_full["articleid"] == large_page_id].sort_values("revid")
|
||||
|
||||
# Resume from middle of this page
|
||||
mid_idx = len(page_revs) // 2
|
||||
resume_rev = page_revs.iloc[mid_idx]
|
||||
resume_revid = int(resume_rev["revid"])
|
||||
resume_pageid = int(resume_rev["articleid"])
|
||||
|
||||
# Find global index for checkpoint
|
||||
global_idx = df_full[df_full["revid"] == resume_revid].index[0]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_midpage_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
# Write all rows up to and including the resume point
|
||||
rows_to_write = [full_rows[i] for i in range(global_idx + 1)]
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in rows_to_write:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_page_boundary():
|
||||
"""Test resume at the exact start of a new page.
|
||||
|
||||
This tests for off-by-one errors at page boundaries.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_boundary_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--diff", "--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_rows = read_jsonl(tester_full.output)
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
|
||||
# Find a page boundary - last revision of one page
|
||||
page_last_revs = df_full.groupby("articleid")["revid"].max()
|
||||
# Pick a page that's not the very last one
|
||||
for page_id in page_last_revs.index[:-1]:
|
||||
last_rev_of_page = page_last_revs[page_id]
|
||||
row_idx = df_full[df_full["revid"] == last_rev_of_page].index[0]
|
||||
if row_idx < len(df_full) - 1:
|
||||
break
|
||||
|
||||
resume_revid = int(last_rev_of_page)
|
||||
resume_pageid = int(page_id)
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_boundary_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
rows_to_write = [full_rows[i] for i in range(row_idx + 1)]
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in rows_to_write:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_jsonl_dir_output():
|
||||
"""Test that .jsonl.d output creates files named after input files.
|
||||
|
||||
When output is a .jsonl.d directory, each input file should write to
|
||||
a separate JSONL file named after the input (e.g., sailormoon.jsonl),
|
||||
not a generic data.jsonl.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_test.jsonl.d")
|
||||
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# Run wikiq with .jsonl.d output
|
||||
cmd = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
|
||||
try:
|
||||
subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
# Verify output file is named after input, not "data.jsonl"
|
||||
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
|
||||
wrong_output = os.path.join(output_dir, "data.jsonl")
|
||||
|
||||
assert os.path.exists(expected_output), \
|
||||
f"Expected {expected_output} to exist, but it doesn't. Directory contents: {os.listdir(output_dir)}"
|
||||
assert not os.path.exists(wrong_output), \
|
||||
f"Expected {wrong_output} NOT to exist (should be named after input file)"
|
||||
|
||||
# Verify output has data
|
||||
rows = read_jsonl(expected_output)
|
||||
assert len(rows) > 0, "Output file should have data"
|
||||
|
||||
|
||||
def test_jsonl_dir_resume():
|
||||
"""Test that resume works correctly with .jsonl.d directory output.
|
||||
|
||||
The resume logic must derive the same filename from the input file
|
||||
as the write logic does.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_resume.jsonl.d")
|
||||
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# First run: complete
|
||||
cmd_full = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
|
||||
try:
|
||||
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
|
||||
full_rows = read_jsonl(expected_output)
|
||||
|
||||
# Truncate to partial
|
||||
partial_idx = len(full_rows) // 2
|
||||
with open(expected_output, 'w') as f:
|
||||
for row in full_rows[:partial_idx]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
# Resume
|
||||
cmd_resume = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10 --resume"
|
||||
try:
|
||||
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(expected_output)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_revert_detection():
|
||||
"""Test that revert detection works correctly after resume.
|
||||
|
||||
Verifies that the revert detector state is properly maintained during
|
||||
the skip phase so that reverts are correctly detected after resume.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_series_equal
|
||||
|
||||
tester_full = WikiqTester(SAILORMOON, "resume_revert_full", in_compression="7z", out_format="jsonl")
|
||||
|
||||
try:
|
||||
tester_full.call_wikiq("--fandom-2020")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
full_rows = read_jsonl(tester_full.output)
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
|
||||
# Find rows with reverts
|
||||
revert_rows = df_full[df_full["revert"] == True]
|
||||
if len(revert_rows) == 0:
|
||||
pytest.skip("No reverts found in test data")
|
||||
|
||||
# Resume from before a known revert so we can verify it's detected
|
||||
first_revert_idx = revert_rows.index[0]
|
||||
if first_revert_idx < 2:
|
||||
pytest.skip("First revert too early in dataset")
|
||||
|
||||
resume_idx = first_revert_idx - 1
|
||||
resume_revid = full_rows[resume_idx]["revid"]
|
||||
resume_pageid = full_rows[resume_idx]["articleid"]
|
||||
|
||||
tester_partial = WikiqTester(SAILORMOON, "resume_revert_partial", in_compression="7z", out_format="jsonl")
|
||||
partial_output_path = tester_partial.output
|
||||
|
||||
with open(partial_output_path, 'w') as f:
|
||||
for row in full_rows[:resume_idx + 1]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
checkpoint_path = get_checkpoint_path(partial_output_path)
|
||||
with open(checkpoint_path, 'w') as f:
|
||||
json.dump({"pageid": resume_pageid, "revid": resume_revid}, f)
|
||||
|
||||
try:
|
||||
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(partial_output_path)
|
||||
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
|
||||
# Verify revert column matches exactly
|
||||
assert_series_equal(df_full["revert"], df_resumed["revert"])
|
||||
assert_series_equal(df_full["reverteds"], df_resumed["reverteds"])
|
||||
87
test/wikiq_test_utils.py
Normal file
87
test/wikiq_test_utils.py
Normal file
@@ -0,0 +1,87 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Final, Union
|
||||
|
||||
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
||||
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py")
|
||||
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
||||
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
||||
|
||||
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
||||
SAILORMOON: Final[str] = "sailormoon"
|
||||
TWINPEAKS: Final[str] = "twinpeaks"
|
||||
REGEXTEST: Final[str] = "regextest"
|
||||
|
||||
|
||||
class WikiqTester:
|
||||
def __init__(
|
||||
self,
|
||||
wiki: str,
|
||||
case_name: str,
|
||||
suffix: Union[str, None] = None,
|
||||
in_compression: str = "bz2",
|
||||
baseline_format: str = "tsv",
|
||||
out_format: str = "tsv",
|
||||
):
|
||||
self.input_file = os.path.join(
|
||||
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
|
||||
)
|
||||
|
||||
basename = "{0}_{1}".format(case_name, wiki)
|
||||
if suffix:
|
||||
basename = "{0}_{1}".format(basename, suffix)
|
||||
|
||||
self.output = os.path.join(
|
||||
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
|
||||
)
|
||||
|
||||
if os.path.exists(self.output):
|
||||
if os.path.isfile(self.output):
|
||||
os.remove(self.output)
|
||||
else:
|
||||
shutil.rmtree(self.output)
|
||||
|
||||
# Also clean up resume-related files
|
||||
for temp_suffix in [".resume_temp", ".checkpoint", ".merged"]:
|
||||
temp_path = self.output + temp_suffix
|
||||
if os.path.exists(temp_path):
|
||||
if os.path.isfile(temp_path):
|
||||
os.remove(temp_path)
|
||||
else:
|
||||
shutil.rmtree(temp_path)
|
||||
|
||||
# For JSONL and Parquet, self.output is a file path. Create parent directory if needed.
|
||||
if out_format in ("jsonl", "parquet"):
|
||||
parent_dir = os.path.dirname(self.output)
|
||||
if parent_dir:
|
||||
os.makedirs(parent_dir, exist_ok=True)
|
||||
|
||||
if suffix is None:
|
||||
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
||||
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
||||
else:
|
||||
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
|
||||
wiki, suffix, baseline_format
|
||||
)
|
||||
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
||||
|
||||
if case_name is not None:
|
||||
self.baseline_file = os.path.join(
|
||||
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
|
||||
)
|
||||
|
||||
def call_wikiq(self, *args: str, out: bool = True):
|
||||
"""
|
||||
Calls wikiq with the passed arguments on the input file relevant to the test.
|
||||
:param args: The command line arguments to pass to wikiq.
|
||||
:param out: Whether to pass an output argument to wikiq.
|
||||
:return: The output of the wikiq call.
|
||||
"""
|
||||
if out:
|
||||
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
|
||||
else:
|
||||
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
|
||||
|
||||
print(call)
|
||||
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
||||
Reference in New Issue
Block a user