mediawiki_dump_tools/test/Wikiq_Unit_Test.py
Nathan TeBlunthuis 329341efb6 improve tests.
2025-12-02 13:52:12 -08:00

834 lines
33 KiB
Python

import os
import shutil
import subprocess
import tracemalloc
from io import StringIO
from typing import Final, Union
import pytest
import numpy as np
import pandas as pd
import pyarrow as pa
from pandas import DataFrame
from pandas.testing import assert_frame_equal, assert_series_equal
# Make references to files and wikiq relative to this file, not to the current working directory.
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
SAILORMOON: Final[str] = "sailormoon"
TWINPEAKS: Final[str] = "twinpeaks"
REGEXTEST: Final[str] = "regextest"
def setup():
tracemalloc.start()
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
if not os.path.exists(TEST_OUTPUT_DIR):
os.mkdir(TEST_OUTPUT_DIR)
# Always run setup, even if this is executed via "python -m unittest" rather
# than as __main__.
setup()
class WikiqTester:
def __init__(
self,
wiki: str,
case_name: str,
suffix: Union[str, None] = None,
in_compression: str = "bz2",
baseline_format: str = "tsv",
out_format: str = "tsv",
):
self.input_file = os.path.join(
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
)
basename = "{0}_{1}".format(case_name, wiki)
if suffix:
basename = "{0}_{1}".format(basename, suffix)
self.output = os.path.join(
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
)
if os.path.exists(self.output):
if os.path.isfile(self.output):
os.remove(self.output)
else:
shutil.rmtree(self.output)
if out_format == "parquet":
os.makedirs(self.output, exist_ok=True)
if suffix is None:
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
else:
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
wiki, suffix, baseline_format
)
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
# If case_name is unset, there are no relevant baseline or test files.
if case_name is not None:
self.baseline_file = os.path.join(
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
)
def call_wikiq(self, *args: str, out: bool = True):
"""
Calls wikiq with the passed arguments on the input file relevant to the test.
:param args: The command line arguments to pass to wikiq.
:param out: Whether to pass an output argument to wikiq.
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
# with / without pwr DONE
# with / without url encode DONE
# with / without collapse user DONE
# with output to stdout DONE
# note that the persistence radius is 7 by default
# reading various file formats including
# 7z, gz, bz2, xml DONE
# wikia and wikipedia data DONE
# malformed xmls DONE
def test_WP_noargs():
tester = WikiqTester(IKWIKI, "noargs")
try:
tester.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_namespaces():
tester = WikiqTester(IKWIKI, "namespaces")
try:
tester.call_wikiq("-n 0", "-n 1")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# as a test let's make sure that we get equal data frames
test = pd.read_table(tester.output)
num_wrong_ns = sum(~test.namespace.isin({0, 1}))
assert num_wrong_ns == 0
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_revert_radius():
tester = WikiqTester(IKWIKI, "revert_radius")
try:
tester.call_wikiq("-n 0", "-n 1", "-rr 1")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# as a test let's make sure that we get equal data frames
test = pd.read_table(tester.output)
num_wrong_ns = sum(~test.namespace.isin({0, 1}))
assert num_wrong_ns == 0
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_no_revert_radius():
tester = WikiqTester(IKWIKI, "no_revert_radius")
try:
tester.call_wikiq("-rr 0")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# as a test let's make sure that we get equal data frames
test = pd.read_table(tester.output)
num_reverted = sum(i is None for i in test.revert)
assert num_reverted == 0
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_WP_collapse_user():
tester = WikiqTester(IKWIKI, "collapse_user")
try:
tester.call_wikiq("--collapse-user")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_noargs():
tester = WikiqTester(SAILORMOON, "noargs", in_compression="7z")
try:
tester.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_collapse_user():
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z")
try:
tester.call_wikiq("--collapse-user", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_partition_namespaces():
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--collapse-user", "--fandom-2020", "--partition-namespaces")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(os.path.join(tester.output,"namespace=10/sailormoon.parquet"))
baseline = pd.read_parquet(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_wikidiff2():
tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z")
try:
tester.call_wikiq("--persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_segment():
tester = WikiqTester(SAILORMOON, "persistence_segment", in_compression="7z")
try:
tester.call_wikiq("--persistence segment", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_legacy():
tester = WikiqTester(SAILORMOON, "persistence_legacy", in_compression="7z")
try:
tester.call_wikiq("--persistence legacy", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr():
tester = WikiqTester(SAILORMOON, "persistence", in_compression="7z")
try:
tester.call_wikiq("--persistence", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_diff():
tester = WikiqTester(SAILORMOON, "diff", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_diff_plus_pwr():
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--diff --persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_text():
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--diff", "--text","--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_malformed_noargs():
tester = WikiqTester(wiki=TWINPEAKS, case_name="noargs", in_compression="7z")
want_exception = (
"xml.etree.ElementTree.ParseError: no element found: line 1369, column 0"
)
try:
tester.call_wikiq()
except subprocess.CalledProcessError as exc:
errlines = exc.stderr.decode("utf8").splitlines()
assert errlines[-1] == want_exception
else:
pytest.fail("No exception raised, want: {}".format(want_exception))
def test_stdout_noargs():
tester = WikiqTester(wiki=SAILORMOON, case_name="noargs", in_compression="7z")
try:
outs = tester.call_wikiq("--stdout", "--fandom-2020", out=False).decode(
"utf8"
)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(StringIO(outs))
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_bad_regex():
tester = WikiqTester(wiki=REGEXTEST, case_name="bad_regex")
# sample arguments for checking that bad arguments get terminated / test_regex_arguments
bad_arguments_list = [
# label is missing
"-RP '\\b\\d+\\b'",
# number of reg and number of labels do not match
"-RP 'NPO V' -RP THE -RPl testlabel",
# cp but rp label
"-CP '(Tamil|Li)' -RPl testlabel",
# regex is missing
"-CPl testlabel",
"-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'",
]
for arguments in bad_arguments_list:
try:
tester.call_wikiq("--stdout", arguments, out=False)
except subprocess.CalledProcessError as exc:
# we want to check that the bad arguments were caught and sys.exit is stopping the code
print(exc.stderr.decode("utf-8"))
else:
pytest.fail("No exception raised, want Exception")
def test_good_regex():
# sample arguments for checking the outcomes of good arguments / test_basic_regex
good_arguments_list = [
"-RP '\\b\\d{3}\\b' -RPl threedigits",
"-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
"-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
"-CP 'WP:EVADE' -CPl wp_evade",
]
for i, arguments in enumerate(good_arguments_list):
tester = WikiqTester(wiki=REGEXTEST, case_name="basic", suffix=str(i))
try:
tester.call_wikiq(arguments)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
print(i)
def test_capturegroup_regex():
cap_arguments_list = [
"-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
"-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov",
]
for i, arguments in enumerate(cap_arguments_list):
tester = WikiqTester(
wiki=REGEXTEST, case_name="capturegroup", suffix=str(i)
)
try:
tester.call_wikiq(arguments)
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_table(tester.output)
baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_parquet():
tester = WikiqTester(IKWIKI, "noargs", out_format="parquet")
try:
tester.call_wikiq()
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# as a test let's make sure that we get equal data frames
test: DataFrame = pd.read_parquet(tester.output)
# test = test.drop(['reverteds'], axis=1)
baseline: DataFrame = pd.read_table(tester.baseline_file)
# Pandas does not read timestamps as the desired datetime type.
baseline["date_time"] = pd.to_datetime(baseline["date_time"])
# Split strings to the arrays of reverted IDs so they can be compared.
baseline["revert"] = baseline["revert"].replace(np.nan, None)
baseline["reverteds"] = baseline["reverteds"].replace(np.nan, None)
# baseline['reverteds'] = [None if i is np.nan else [int(j) for j in str(i).split(",")] for i in baseline['reverteds']]
baseline["sha1"] = baseline["sha1"].replace(np.nan, None)
baseline["editor"] = baseline["editor"].replace(np.nan, None)
baseline["anon"] = baseline["anon"].replace(np.nan, None)
for index, row in baseline.iterrows():
if row["revert"] != test["revert"][index]:
print(row["revid"], ":", row["revert"], "!=", test["revert"][index])
for col in baseline.columns:
try:
assert_series_equal(
test[col], baseline[col], check_like=True, check_dtype=False
)
except ValueError as exc:
print(f"Error comparing column {col}")
pytest.fail(exc)
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
def test_resume():
"""Test that --resume properly resumes processing from the last written revid."""
import pyarrow.parquet as pq
# First, create a complete baseline output
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get the middle revid to use as the resume point
middle_idx = len(full_table) // 2
resume_revid = full_table.column("revid")[middle_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
# Create a partial output by copying row groups to preserve the exact schema
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
# Create partial output by filtering the table and writing with the same schema
partial_table = full_table.slice(0, middle_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# The resumed output should match the full output
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_diff():
"""Test that --resume works correctly with diff computation."""
import pyarrow.parquet as pq
# First, create a complete baseline output with diff
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--diff", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
full_table = pq.read_table(full_output_path)
# Get a revid about 1/3 through to use as the resume point
resume_idx = len(full_table) // 3
resume_revid = full_table.column("revid")[resume_idx].as_py()
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by filtering the table to preserve the exact schema
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
# Create partial output by slicing the table
partial_table = full_table.slice(0, resume_idx + 1)
pq.write_table(partial_table, partial_output_path)
# Now resume from the partial output
try:
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output
resumed_table = pq.read_table(partial_output_path)
# Convert to dataframes for comparison, sorting by revid
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
# Compare the dataframes
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces."""
import pyarrow.parquet as pq
# First, create a complete baseline output with partition-namespaces
tester_full = WikiqTester(SAILORMOON, "resume_partition_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--partition-namespaces", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output from the partitioned directory
full_output_dir = tester_full.output
namespace_dirs = [d for d in os.listdir(full_output_dir) if d.startswith('namespace=')]
if not namespace_dirs:
pytest.fail("No namespace directories found in output")
# Collect all revisions from all namespaces
full_revids = []
for ns_dir in sorted(namespace_dirs):
parquet_files = [f for f in os.listdir(os.path.join(full_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(full_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
full_revids.extend(revids)
full_revids_sorted = sorted(set(full_revids))
total_revisions = len(full_revids_sorted)
# Get a revid about 1/3 through to use as the resume point
resume_idx = total_revisions // 3
resume_revid = full_revids_sorted[resume_idx]
print(f"Total revisions: {total_revisions}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by manually creating the partitioned structure
tester_partial = WikiqTester(SAILORMOON, "resume_partition_partial", in_compression="7z", out_format="parquet")
partial_output_dir = tester_partial.output
# Copy the full partitioned output to the partial directory
for ns_dir in namespace_dirs:
src_ns_path = os.path.join(full_output_dir, ns_dir)
dst_ns_path = os.path.join(partial_output_dir, ns_dir)
shutil.copytree(src_ns_path, dst_ns_path)
# Now filter each namespace file to only include revisions up to resume_idx
revised_data_count = 0
for ns_dir in namespace_dirs:
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read()
# Filter to only rows up to the resume point
revids = table.column('revid').to_pylist()
mask = pa.array([revid <= resume_revid for revid in revids], type=pa.bool_())
partial_table = table.filter(mask)
revised_data_count += len(partial_table)
# Write back the filtered data
pq.write_table(partial_table, ns_parquet_path)
print(f"Created partial output with {revised_data_count} revisions (up to revid {resume_revid})")
# Now resume from the partial output
try:
tester_partial.call_wikiq("--partition-namespaces", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output and collect revids
resumed_revids = []
for ns_dir in namespace_dirs:
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
resumed_revids.extend(revids)
resumed_revids_sorted = sorted(set(resumed_revids))
# Compare the revids
assert resumed_revids_sorted == full_revids_sorted, f"Resumed revids mismatch: {len(resumed_revids_sorted)} vs {len(full_revids_sorted)}"
print(f"Resume with partition-namespaces test passed! Original: {len(full_revids_sorted)} revisions, Resumed: {len(resumed_revids_sorted)} revisions")
def test_external_links_only():
"""Test that --external-links extracts external links correctly."""
import mwparserfromhell
tester = WikiqTester(SAILORMOON, "external_links_only", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify external_links column exists
assert "external_links" in test.columns, "external_links column should exist"
# Verify citations column does NOT exist
assert "citations" not in test.columns, "citations column should NOT exist when only --external-links is used"
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
# Verify that extracted URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify extraction matches mwparserfromhell for a sample of rows with text
rows_with_links = test[test["external_links"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_links) > 0:
# Test up to 5 rows
sample = rows_with_links.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"])
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
print(f"External links only test passed! {len(test)} rows, {len(all_urls)} total URLs extracted")
def test_citations_only():
"""Test that --citations extracts citations correctly."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "citations_only", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify citations column exists
assert "citations" in test.columns, "citations column should exist"
# Verify external_links column does NOT exist
assert "external_links" not in test.columns, "external_links column should NOT exist when only --citations is used"
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify that extracted citations have correct prefixes (ref: or template:)
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
rows_with_citations = test[test["citations"].apply(lambda x: x is not None and len(x) > 0)]
if len(rows_with_citations) > 0:
parser = WikitextParser()
# Test up to 5 rows
sample = rows_with_citations.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"])
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"Citations only test passed! {len(test)} rows, {len(all_citations)} total citations extracted")
def test_external_links_and_citations():
"""Test that both --external-links and --citations work together (shared parser)."""
import mwparserfromhell
from wikiq.wikitext_parser import WikitextParser
tester = WikiqTester(SAILORMOON, "external_links_and_citations", in_compression="7z", out_format="parquet")
try:
# Also include --text so we can verify extraction against actual wikitext
tester.call_wikiq("--external-links", "--citations", "--text", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify both columns exist
assert "external_links" in test.columns, "external_links column should exist"
assert "citations" in test.columns, "citations column should exist"
# Verify both columns have list/array types (pandas reads parquet lists as numpy arrays)
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"external_links should be a list/array type or None"
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
"citations should be a list/array type or None"
# Verify URLs look like valid URIs (have a scheme or are protocol-relative)
all_urls = []
for links in test["external_links"]:
if links is not None and len(links) > 0:
all_urls.extend(links)
for url in all_urls:
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
is_protocol_relative = url.startswith("//")
assert has_scheme or is_protocol_relative, \
f"External link should be a valid URI, got: {url}"
# Verify citations have correct prefixes
all_citations = []
for citations in test["citations"]:
if citations is not None and len(citations) > 0:
all_citations.extend(citations)
for citation in all_citations:
assert citation.startswith("ref:") or citation.startswith("template:"), \
f"Citation should start with 'ref:' or 'template:', got: {citation}"
# Verify extraction matches WikitextParser for a sample of rows with text
# This tests that the shared parser optimization works correctly
parser = WikitextParser()
rows_with_content = test[
(test["external_links"].apply(lambda x: x is not None and len(x) > 0)) |
(test["citations"].apply(lambda x: x is not None and len(x) > 0))
]
if len(rows_with_content) > 0:
# Test up to 5 rows
sample = rows_with_content.head(5)
for idx, row in sample.iterrows():
text = row["text"]
if text:
# Verify external links
wikicode = mwparserfromhell.parse(text)
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
actual_links = list(row["external_links"]) if row["external_links"] is not None else []
assert actual_links == expected_links, \
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
# Verify citations
expected_citations = parser.extract_citations(text)
actual_citations = list(row["citations"]) if row["citations"] is not None else []
assert actual_citations == expected_citations, \
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
print(f"External links and citations test passed! {len(test)} rows, {len(all_urls)} URLs, {len(all_citations)} citations")
def test_no_wikitext_columns():
"""Test that neither external_links nor citations columns exist without flags."""
tester = WikiqTester(SAILORMOON, "no_wikitext_columns", in_compression="7z", out_format="parquet")
try:
tester.call_wikiq("--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
# Verify neither column exists
assert "external_links" not in test.columns, "external_links column should NOT exist without --external-links flag"
assert "citations" not in test.columns, "citations column should NOT exist without --citations flag"
print(f"No wikitext columns test passed! {len(test)} rows processed")