Compare commits
27 Commits
compute-di
...
6a4bf81e1a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a4bf81e1a | ||
|
|
38dabd0547 | ||
|
|
006feb795c | ||
|
|
d7f5abef2d | ||
|
|
2c54425726 | ||
|
|
5d1a246898 | ||
|
|
70a10db228 | ||
|
|
1001c780fa | ||
|
|
6b4f3939a5 | ||
|
|
c3d31b4ab5 | ||
|
|
f4a9491ff2 | ||
|
|
c6e96c2f54 | ||
|
|
f427291fd8 | ||
|
|
d1fc094c96 | ||
|
|
783f5fd8bc | ||
|
|
577ddc87f5 | ||
|
|
d69d8b0df2 | ||
|
|
5ce9808b50 | ||
|
|
d3517ed5ca | ||
|
|
329341efb6 | ||
|
|
76626a2785 | ||
|
|
b46f98a875 | ||
|
|
3c26185739 | ||
|
|
95b33123e3 | ||
|
|
5c4fc6d5a0 | ||
|
|
77c7d2ba97 | ||
|
|
c40930d7d2 |
@@ -8,6 +8,7 @@ dependencies = [
|
|||||||
"deltas>=0.7.0",
|
"deltas>=0.7.0",
|
||||||
"mediawiki-utilities>=0.4.18",
|
"mediawiki-utilities>=0.4.18",
|
||||||
"more-itertools>=10.7.0",
|
"more-itertools>=10.7.0",
|
||||||
|
"mwparserfromhell>=0.6.0",
|
||||||
"mwpersistence>=0.2.4",
|
"mwpersistence>=0.2.4",
|
||||||
"mwreverts>=0.1.5",
|
"mwreverts>=0.1.5",
|
||||||
"mwtypes>=0.4.0",
|
"mwtypes>=0.4.0",
|
||||||
@@ -33,7 +34,7 @@ packages = ["src/wikiq"]
|
|||||||
yamlconf = { git = "https://github.com/groceryheist/yamlconf" }
|
yamlconf = { git = "https://github.com/groceryheist/yamlconf" }
|
||||||
mwxml = { git = "https://github.com/groceryheist/python-mwxml" }
|
mwxml = { git = "https://github.com/groceryheist/python-mwxml" }
|
||||||
deltas = { git = "https://github.com/groceryheist/deltas" }
|
deltas = { git = "https://github.com/groceryheist/deltas" }
|
||||||
pywikidiff2 = { git = "https://gitea.communitydata.science/groceryheist/pywikidiff2@1cd3b13aea89a8abb0cee38dfd7cc6ab4d0980f9" }
|
pywikidiff2 = { git = "ssh://gitea@gitea.communitydata.science:2200/groceryheist/pywikidiff2.git"}
|
||||||
|
|
||||||
[dependency-groups]
|
[dependency-groups]
|
||||||
dev = [
|
dev = [
|
||||||
|
|||||||
@@ -8,7 +8,10 @@ import gc
|
|||||||
import json
|
import json
|
||||||
import os.path
|
import os.path
|
||||||
import re
|
import re
|
||||||
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
from io import TextIOWrapper
|
from io import TextIOWrapper
|
||||||
@@ -23,14 +26,21 @@ import pywikidiff2
|
|||||||
from deltas.tokenizers import wikitext_split
|
from deltas.tokenizers import wikitext_split
|
||||||
from more_itertools import ichunked
|
from more_itertools import ichunked
|
||||||
from mwxml import Dump
|
from mwxml import Dump
|
||||||
import asyncio
|
|
||||||
import wikiq.tables as tables
|
import wikiq.tables as tables
|
||||||
from wikiq.tables import RevisionTable
|
from wikiq.tables import RevisionTable
|
||||||
from wikiq.wiki_diff_matcher import WikiDiffMatcher
|
from wikiq.wiki_diff_matcher import WikiDiffMatcher
|
||||||
|
from wikiq.wikitext_parser import WikitextParser
|
||||||
|
from wikiq.resume import (
|
||||||
|
get_resume_point,
|
||||||
|
setup_resume_temp_output,
|
||||||
|
finalize_resume_merge,
|
||||||
|
get_checkpoint_path,
|
||||||
|
cleanup_interrupted_resume,
|
||||||
|
)
|
||||||
|
|
||||||
TO_ENCODE = ("title", "editor")
|
TO_ENCODE = ("title", "editor")
|
||||||
PERSISTENCE_RADIUS = 7
|
PERSISTENCE_RADIUS = 7
|
||||||
DIFF_TIMEOUT = 60
|
DIFF_TIMEOUT_MS = 60000
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
@@ -47,15 +57,10 @@ class PersistMethod:
|
|||||||
wikidiff2 = 4
|
wikidiff2 = 4
|
||||||
|
|
||||||
|
|
||||||
async def diff_async(differ, last_text, text):
|
def diff_with_timeout(differ, last_text, text):
|
||||||
try:
|
"""Returns (result, timed_out) tuple using native pywikidiff2 timeout."""
|
||||||
loop = asyncio.get_running_loop()
|
result = differ.inline_json_diff(last_text, text, timeout_ms=DIFF_TIMEOUT_MS)
|
||||||
return await asyncio.wait_for(
|
return result, differ.timed_out()
|
||||||
asyncio.to_thread(differ.inline_json_diff, last_text, text),
|
|
||||||
timeout=DIFF_TIMEOUT
|
|
||||||
)
|
|
||||||
except TimeoutError as e:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def calculate_persistence(tokens_added):
|
def calculate_persistence(tokens_added):
|
||||||
@@ -241,10 +246,21 @@ class WikiqParser:
|
|||||||
output_parquet: bool = True,
|
output_parquet: bool = True,
|
||||||
batch_size: int = 1024,
|
batch_size: int = 1024,
|
||||||
partition_namespaces: bool = False,
|
partition_namespaces: bool = False,
|
||||||
|
resume_point: Union[tuple, dict, None] = None,
|
||||||
|
external_links: bool = False,
|
||||||
|
citations: bool = False,
|
||||||
|
wikilinks: bool = False,
|
||||||
|
templates: bool = False,
|
||||||
|
headings: bool = False,
|
||||||
|
time_limit_seconds: Union[float, None] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Parameters:
|
Parameters:
|
||||||
persist : what persistence method to use. Takes a PersistMethod value
|
persist : what persistence method to use. Takes a PersistMethod value
|
||||||
|
resume_point : if set, either a (pageid, revid) tuple for single-file output,
|
||||||
|
or a dict mapping namespace -> (pageid, revid) for partitioned output.
|
||||||
|
For single-file: skip all revisions up to
|
||||||
|
and including this point
|
||||||
"""
|
"""
|
||||||
self.input_file = input_file
|
self.input_file = input_file
|
||||||
|
|
||||||
@@ -255,6 +271,14 @@ class WikiqParser:
|
|||||||
self.diff = diff
|
self.diff = diff
|
||||||
self.text = text
|
self.text = text
|
||||||
self.partition_namespaces = partition_namespaces
|
self.partition_namespaces = partition_namespaces
|
||||||
|
self.resume_point = resume_point
|
||||||
|
self.external_links = external_links
|
||||||
|
self.citations = citations
|
||||||
|
self.wikilinks = wikilinks
|
||||||
|
self.templates = templates
|
||||||
|
self.headings = headings
|
||||||
|
self.shutdown_requested = False
|
||||||
|
self.time_limit_seconds = time_limit_seconds
|
||||||
if namespaces is not None:
|
if namespaces is not None:
|
||||||
self.namespace_filter = set(namespaces)
|
self.namespace_filter = set(namespaces)
|
||||||
else:
|
else:
|
||||||
@@ -283,6 +307,86 @@ class WikiqParser:
|
|||||||
else:
|
else:
|
||||||
self.output_file = open(output_file, "wb")
|
self.output_file = open(output_file, "wb")
|
||||||
|
|
||||||
|
# Checkpoint file for tracking resume point
|
||||||
|
self.checkpoint_file = None
|
||||||
|
self.checkpoint_state = {} # namespace -> (pageid, revid) or None -> (pageid, revid)
|
||||||
|
|
||||||
|
def request_shutdown(self):
|
||||||
|
"""Request graceful shutdown. The process() method will exit after completing the current batch."""
|
||||||
|
self.shutdown_requested = True
|
||||||
|
|
||||||
|
def _time_limit_expired(self):
|
||||||
|
"""Timer callback when time limit is reached."""
|
||||||
|
hours = self.time_limit_seconds / 3600
|
||||||
|
print(f"Time limit of {hours:.2f} hours reached, requesting shutdown...", file=sys.stderr)
|
||||||
|
self.request_shutdown()
|
||||||
|
|
||||||
|
def _start_time_limit_timer(self):
|
||||||
|
"""Start a background timer to trigger shutdown when time limit is reached."""
|
||||||
|
if self.time_limit_seconds is None:
|
||||||
|
return None
|
||||||
|
timer = threading.Timer(self.time_limit_seconds, self._time_limit_expired)
|
||||||
|
timer.daemon = True
|
||||||
|
timer.start()
|
||||||
|
return timer
|
||||||
|
|
||||||
|
def _cancel_time_limit_timer(self, timer):
|
||||||
|
"""Cancel the time limit timer if it's still running."""
|
||||||
|
if timer is not None:
|
||||||
|
timer.cancel()
|
||||||
|
|
||||||
|
def _open_checkpoint(self, output_file):
|
||||||
|
"""Open checkpoint file for writing. Keeps file open for performance."""
|
||||||
|
if not self.output_parquet or output_file == sys.stdout.buffer:
|
||||||
|
return
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, self.partition_namespaces)
|
||||||
|
Path(checkpoint_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.checkpoint_file = open(checkpoint_path, 'w')
|
||||||
|
print(f"Checkpoint file opened: {checkpoint_path}", file=sys.stderr)
|
||||||
|
|
||||||
|
def _update_checkpoint(self, pageid, revid, namespace=None):
|
||||||
|
"""Update checkpoint state and write to file."""
|
||||||
|
if self.checkpoint_file is None:
|
||||||
|
return
|
||||||
|
if self.partition_namespaces:
|
||||||
|
self.checkpoint_state[namespace] = {"pageid": pageid, "revid": revid}
|
||||||
|
else:
|
||||||
|
self.checkpoint_state = {"pageid": pageid, "revid": revid}
|
||||||
|
self.checkpoint_file.seek(0)
|
||||||
|
self.checkpoint_file.truncate()
|
||||||
|
json.dump(self.checkpoint_state, self.checkpoint_file)
|
||||||
|
self.checkpoint_file.flush()
|
||||||
|
|
||||||
|
def _close_checkpoint(self, delete=False):
|
||||||
|
"""Close checkpoint file, optionally deleting it."""
|
||||||
|
if self.checkpoint_file is None:
|
||||||
|
return
|
||||||
|
checkpoint_path = self.checkpoint_file.name
|
||||||
|
self.checkpoint_file.close()
|
||||||
|
self.checkpoint_file = None
|
||||||
|
if delete and os.path.exists(checkpoint_path):
|
||||||
|
os.remove(checkpoint_path)
|
||||||
|
print(f"Checkpoint file deleted (processing complete): {checkpoint_path}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
print(f"Checkpoint file preserved for resume: {checkpoint_path}", file=sys.stderr)
|
||||||
|
|
||||||
|
def _write_batch(self, row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace=None):
|
||||||
|
"""Write a batch of rows to the appropriate writer.
|
||||||
|
|
||||||
|
For partitioned output, creates writer lazily if needed.
|
||||||
|
Returns the writer used (for non-partitioned output, same as input).
|
||||||
|
"""
|
||||||
|
if self.partition_namespaces and namespace is not None:
|
||||||
|
if namespace not in pq_writers:
|
||||||
|
ns_path = ns_paths[namespace]
|
||||||
|
Path(ns_path).parent.mkdir(exist_ok=True, parents=True)
|
||||||
|
pq_writers[namespace] = pq.ParquetWriter(
|
||||||
|
ns_path, schema, flavor="spark", sorting_columns=sorting_cols
|
||||||
|
)
|
||||||
|
writer = pq_writers[namespace]
|
||||||
|
writer.write(pa.record_batch(row_buffer, schema=schema))
|
||||||
|
return writer
|
||||||
|
|
||||||
def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]:
|
def make_matchmake_pairs(self, patterns, labels) -> list[RegexPair]:
|
||||||
if (patterns is not None and labels is not None) and (
|
if (patterns is not None and labels is not None) and (
|
||||||
len(patterns) == len(labels)
|
len(patterns) == len(labels)
|
||||||
@@ -336,10 +440,32 @@ class WikiqParser:
|
|||||||
return default_ns
|
return default_ns
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
# create a regex that creates the output filename
|
# Start time limit timer if configured
|
||||||
# output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$',
|
time_limit_timer = self._start_time_limit_timer()
|
||||||
# r'output/wikiq-\1-\2.tsv',
|
|
||||||
# input_filename)
|
# Track whether we've passed the resume point
|
||||||
|
# For partitioned output, this is a dict mapping namespace -> bool
|
||||||
|
if self.resume_point is None:
|
||||||
|
found_resume_point = True
|
||||||
|
elif self.partition_namespaces:
|
||||||
|
found_resume_point = {}
|
||||||
|
else:
|
||||||
|
found_resume_point = False
|
||||||
|
|
||||||
|
# When resuming with parquet, write new data to temp file/directory and merge at the end
|
||||||
|
original_output_file = None
|
||||||
|
temp_output_file = None
|
||||||
|
original_partition_dir = None
|
||||||
|
if self.resume_point is not None and self.output_parquet:
|
||||||
|
original_output_file, temp_output_file, original_partition_dir = \
|
||||||
|
setup_resume_temp_output(self.output_file, self.partition_namespaces)
|
||||||
|
if temp_output_file is not None:
|
||||||
|
self.output_file = temp_output_file
|
||||||
|
|
||||||
|
# Open checkpoint file for tracking resume point
|
||||||
|
# Use original_output_file if resuming, otherwise self.output_file
|
||||||
|
checkpoint_output = original_output_file if original_output_file else self.output_file
|
||||||
|
self._open_checkpoint(checkpoint_output)
|
||||||
|
|
||||||
# Construct dump file iterator
|
# Construct dump file iterator
|
||||||
dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
|
dump = WikiqIterator(self.input_file, collapse_user=self.collapse_user)
|
||||||
@@ -371,6 +497,29 @@ class WikiqParser:
|
|||||||
if self.collapse_user:
|
if self.collapse_user:
|
||||||
table.columns.append(tables.RevisionCollapsed())
|
table.columns.append(tables.RevisionCollapsed())
|
||||||
|
|
||||||
|
# Create shared parser if any wikitext feature is enabled
|
||||||
|
if self.external_links or self.citations or self.wikilinks or self.templates or self.headings:
|
||||||
|
wikitext_parser = WikitextParser()
|
||||||
|
|
||||||
|
if self.external_links:
|
||||||
|
table.columns.append(tables.RevisionExternalLinks(wikitext_parser))
|
||||||
|
|
||||||
|
if self.citations:
|
||||||
|
table.columns.append(tables.RevisionCitations(wikitext_parser))
|
||||||
|
|
||||||
|
if self.wikilinks:
|
||||||
|
table.columns.append(tables.RevisionWikilinks(wikitext_parser))
|
||||||
|
|
||||||
|
if self.templates:
|
||||||
|
table.columns.append(tables.RevisionTemplates(wikitext_parser))
|
||||||
|
|
||||||
|
if self.headings:
|
||||||
|
table.columns.append(tables.RevisionHeadings(wikitext_parser))
|
||||||
|
|
||||||
|
# Add parser timeout tracking if any wikitext feature is enabled
|
||||||
|
if self.external_links or self.citations or self.wikilinks or self.templates or self.headings:
|
||||||
|
table.columns.append(tables.RevisionParserTimeout(wikitext_parser))
|
||||||
|
|
||||||
# extract list of namespaces
|
# extract list of namespaces
|
||||||
self.namespaces = {
|
self.namespaces = {
|
||||||
ns.name: ns.id for ns in dump.mwiterator.site_info.namespaces
|
ns.name: ns.id for ns in dump.mwiterator.site_info.namespaces
|
||||||
@@ -388,6 +537,7 @@ class WikiqParser:
|
|||||||
from wikiq.diff_pyarrow_schema import diff_field
|
from wikiq.diff_pyarrow_schema import diff_field
|
||||||
|
|
||||||
schema = schema.append(diff_field)
|
schema = schema.append(diff_field)
|
||||||
|
schema = schema.append(pa.field("diff_timeout", pa.bool_()))
|
||||||
|
|
||||||
if self.diff and self.persist == PersistMethod.none:
|
if self.diff and self.persist == PersistMethod.none:
|
||||||
table.columns.append(tables.RevisionText())
|
table.columns.append(tables.RevisionText())
|
||||||
@@ -422,6 +572,8 @@ class WikiqParser:
|
|||||||
flavor="spark",
|
flavor="spark",
|
||||||
sorting_columns=sorting_cols,
|
sorting_columns=sorting_cols,
|
||||||
)
|
)
|
||||||
|
ns_paths = {}
|
||||||
|
pq_writers = {}
|
||||||
else:
|
else:
|
||||||
output_path = Path(self.output_file)
|
output_path = Path(self.output_file)
|
||||||
if self.namespace_filter is not None:
|
if self.namespace_filter is not None:
|
||||||
@@ -432,14 +584,9 @@ class WikiqParser:
|
|||||||
ns: (output_path.parent / f"namespace={ns}") / output_path.name
|
ns: (output_path.parent / f"namespace={ns}") / output_path.name
|
||||||
for ns in namespaces
|
for ns in namespaces
|
||||||
}
|
}
|
||||||
for path in ns_paths.values():
|
# Writers are created lazily when first needed to avoid empty files on early exit
|
||||||
Path(path).parent.mkdir(exist_ok=True, parents=True)
|
pq_writers = {}
|
||||||
pq_writers = {
|
writer = None # Not used for partitioned output
|
||||||
ns: pq.ParquetWriter(
|
|
||||||
path, schema, flavor="spark", sorting_columns=sorting_cols
|
|
||||||
)
|
|
||||||
for ns, path in ns_paths.items()
|
|
||||||
}
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
writer = pacsv.CSVWriter(
|
writer = pacsv.CSVWriter(
|
||||||
@@ -447,6 +594,9 @@ class WikiqParser:
|
|||||||
schema,
|
schema,
|
||||||
write_options=pacsv.WriteOptions(delimiter="\t"),
|
write_options=pacsv.WriteOptions(delimiter="\t"),
|
||||||
)
|
)
|
||||||
|
ns_paths = {}
|
||||||
|
pq_writers = {}
|
||||||
|
sorting_cols = None
|
||||||
|
|
||||||
regex_matches = {}
|
regex_matches = {}
|
||||||
|
|
||||||
@@ -459,6 +609,42 @@ class WikiqParser:
|
|||||||
if page.mwpage.namespace not in self.namespace_filter:
|
if page.mwpage.namespace not in self.namespace_filter:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Resume logic: skip pages that come before the resume point.
|
||||||
|
# For partitioned output, each namespace has its own resume point.
|
||||||
|
is_resume_page = False
|
||||||
|
page_resume_point = None
|
||||||
|
if self.resume_point is not None:
|
||||||
|
page_id = page.mwpage.id
|
||||||
|
page_ns = page.mwpage.namespace
|
||||||
|
|
||||||
|
if self.partition_namespaces:
|
||||||
|
# Per-namespace resume: check if we've passed this namespace's resume point
|
||||||
|
if found_resume_point.get(page_ns, False):
|
||||||
|
pass # Already past resume point for this namespace
|
||||||
|
elif page_ns not in self.resume_point:
|
||||||
|
# No resume point for this namespace, process normally
|
||||||
|
found_resume_point[page_ns] = True
|
||||||
|
else:
|
||||||
|
resume_pageid, resume_revid = self.resume_point[page_ns]
|
||||||
|
if page_id < resume_pageid:
|
||||||
|
continue
|
||||||
|
elif page_id == resume_pageid:
|
||||||
|
is_resume_page = True
|
||||||
|
page_resume_point = (resume_pageid, resume_revid)
|
||||||
|
else:
|
||||||
|
found_resume_point[page_ns] = True
|
||||||
|
else:
|
||||||
|
# Single-file resume: global resume point
|
||||||
|
if not found_resume_point:
|
||||||
|
resume_pageid, resume_revid = self.resume_point
|
||||||
|
if page_id < resume_pageid:
|
||||||
|
continue
|
||||||
|
elif page_id == resume_pageid:
|
||||||
|
is_resume_page = True
|
||||||
|
page_resume_point = (resume_pageid, resume_revid)
|
||||||
|
else:
|
||||||
|
found_resume_point = True
|
||||||
|
|
||||||
# Disable detecting reverts if radius is 0.
|
# Disable detecting reverts if radius is 0.
|
||||||
if self.revert_radius > 0:
|
if self.revert_radius > 0:
|
||||||
reverts_column.rev_detector = mwreverts.Detector(
|
reverts_column.rev_detector = mwreverts.Detector(
|
||||||
@@ -565,6 +751,15 @@ class WikiqParser:
|
|||||||
regex_matches[k] = []
|
regex_matches[k] = []
|
||||||
regex_matches[k].append(v)
|
regex_matches[k].append(v)
|
||||||
|
|
||||||
|
# Check for shutdown after each revision
|
||||||
|
if self.shutdown_requested:
|
||||||
|
break
|
||||||
|
|
||||||
|
# If shutdown requested, skip all remaining processing and close writers
|
||||||
|
if self.shutdown_requested:
|
||||||
|
print("Shutdown requested, closing writers...", file=sys.stderr)
|
||||||
|
break
|
||||||
|
|
||||||
# Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them.
|
# Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them.
|
||||||
batch_row_buffer = table.pop()
|
batch_row_buffer = table.pop()
|
||||||
if self.persist != PersistMethod.none:
|
if self.persist != PersistMethod.none:
|
||||||
@@ -678,13 +873,20 @@ class WikiqParser:
|
|||||||
if self.diff:
|
if self.diff:
|
||||||
last_text = last_rev_text
|
last_text = last_rev_text
|
||||||
new_diffs = []
|
new_diffs = []
|
||||||
|
diff_timeouts = []
|
||||||
for i, text in enumerate(row_buffer["text"]):
|
for i, text in enumerate(row_buffer["text"]):
|
||||||
diff = asyncio.run(diff_async(differ, last_text, text))
|
if self.shutdown_requested:
|
||||||
if diff is None:
|
break
|
||||||
|
diff, timed_out = diff_with_timeout(differ, last_text, text)
|
||||||
|
if timed_out:
|
||||||
print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr)
|
print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr)
|
||||||
diff = fast_differ.inline_json_diff(last_text, text)
|
diff = fast_differ.inline_json_diff(last_text, text)
|
||||||
new_diffs.append(diff)
|
new_diffs.append(diff)
|
||||||
|
diff_timeouts.append(timed_out)
|
||||||
last_text = text
|
last_text = text
|
||||||
|
if self.shutdown_requested:
|
||||||
|
print("Shutdown requested, closing writers...", file=sys.stderr)
|
||||||
|
break
|
||||||
row_buffer["diff"] = [
|
row_buffer["diff"] = [
|
||||||
[
|
[
|
||||||
entry
|
entry
|
||||||
@@ -693,6 +895,7 @@ class WikiqParser:
|
|||||||
]
|
]
|
||||||
for diff in new_diffs
|
for diff in new_diffs
|
||||||
]
|
]
|
||||||
|
row_buffer["diff_timeout"] = diff_timeouts
|
||||||
|
|
||||||
# end persistence logic
|
# end persistence logic
|
||||||
if self.diff or self.persist != PersistMethod.none:
|
if self.diff or self.persist != PersistMethod.none:
|
||||||
@@ -702,12 +905,48 @@ class WikiqParser:
|
|||||||
if not self.text and self.persist != PersistMethod.none:
|
if not self.text and self.persist != PersistMethod.none:
|
||||||
del row_buffer["text"]
|
del row_buffer["text"]
|
||||||
|
|
||||||
if self.partition_namespaces is True:
|
# Filter for resume logic if on resume page
|
||||||
writer = pq_writers[page.mwpage.namespace]
|
should_write = True
|
||||||
writer.write(pa.record_batch(row_buffer, schema=schema))
|
if is_resume_page:
|
||||||
|
_, resume_revid = page_resume_point
|
||||||
|
revids = row_buffer["revid"]
|
||||||
|
resume_idx = next((i for i, r in enumerate(revids) if r == resume_revid), None)
|
||||||
|
|
||||||
|
if resume_idx is not None:
|
||||||
|
# Mark resume point as found
|
||||||
|
if self.partition_namespaces:
|
||||||
|
found_resume_point[page.mwpage.namespace] = True
|
||||||
|
else:
|
||||||
|
found_resume_point = True
|
||||||
|
is_resume_page = False
|
||||||
|
|
||||||
|
# Only write revisions after the resume point
|
||||||
|
if resume_idx + 1 < len(revids):
|
||||||
|
row_buffer = {k: v[resume_idx + 1:] for k, v in row_buffer.items()}
|
||||||
|
print(f"Resuming output starting at revid {row_buffer['revid'][0]}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
should_write = False
|
||||||
|
else:
|
||||||
|
should_write = False
|
||||||
|
|
||||||
|
# Write batch if there are rows
|
||||||
|
if should_write and len(row_buffer.get("revid", [])) > 0:
|
||||||
|
namespace = page.mwpage.namespace if self.partition_namespaces else None
|
||||||
|
self._write_batch(row_buffer, schema, writer, pq_writers, ns_paths, sorting_cols, namespace)
|
||||||
|
# Update checkpoint with last written position
|
||||||
|
last_pageid = row_buffer["articleid"][-1]
|
||||||
|
last_revid = row_buffer["revid"][-1]
|
||||||
|
self._update_checkpoint(last_pageid, last_revid, namespace)
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
|
# If shutdown was requested, break from page loop
|
||||||
|
if self.shutdown_requested:
|
||||||
|
break
|
||||||
page_count += 1
|
page_count += 1
|
||||||
|
|
||||||
|
# Cancel time limit timer
|
||||||
|
self._cancel_time_limit_timer(time_limit_timer)
|
||||||
|
|
||||||
print(
|
print(
|
||||||
"Done: %s revisions and %s pages." % (rev_count, page_count),
|
"Done: %s revisions and %s pages." % (rev_count, page_count),
|
||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
@@ -718,6 +957,17 @@ class WikiqParser:
|
|||||||
else:
|
else:
|
||||||
writer.close()
|
writer.close()
|
||||||
|
|
||||||
|
# If we were resuming, merge the original file with the new temp file
|
||||||
|
if original_output_file is not None and temp_output_file is not None:
|
||||||
|
finalize_resume_merge(
|
||||||
|
original_output_file,
|
||||||
|
temp_output_file,
|
||||||
|
self.partition_namespaces,
|
||||||
|
original_partition_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
# Close checkpoint file; delete it only if we completed without interruption
|
||||||
|
self._close_checkpoint(delete=not self.shutdown_requested)
|
||||||
|
|
||||||
def match_archive_suffix(input_filename):
|
def match_archive_suffix(input_filename):
|
||||||
if re.match(r".*\.7z$", input_filename):
|
if re.match(r".*\.7z$", input_filename):
|
||||||
@@ -886,6 +1136,46 @@ def main():
|
|||||||
help="Output the text of the revision.",
|
help="Output the text of the revision.",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--external-links",
|
||||||
|
dest="external_links",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Extract external links from each revision using mwparserfromhell.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--citations",
|
||||||
|
dest="citations",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Extract citations (ref tags and cite templates) from each revision.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--wikilinks",
|
||||||
|
dest="wikilinks",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Extract internal wikilinks from each revision.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--templates",
|
||||||
|
dest="templates",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Extract templates with their parameters from each revision.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--headings",
|
||||||
|
dest="headings",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Extract section headings from each revision.",
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-PNS",
|
"-PNS",
|
||||||
"--partition-namespaces",
|
"--partition-namespaces",
|
||||||
@@ -910,6 +1200,21 @@ def main():
|
|||||||
help="How many revisions to process in each batch. This ends up being the Parquet row group size",
|
help="How many revisions to process in each batch. This ends up being the Parquet row group size",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--resume",
|
||||||
|
dest="resume",
|
||||||
|
action="store_true",
|
||||||
|
help="Resume processing from the last successfully written revision in the output file.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--time-limit",
|
||||||
|
dest="time_limit",
|
||||||
|
type=float,
|
||||||
|
default=0,
|
||||||
|
help="Time limit in hours before graceful shutdown. Set to 0 to disable (default).",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# set persistence method
|
# set persistence method
|
||||||
@@ -933,9 +1238,7 @@ def main():
|
|||||||
print(args, file=sys.stderr)
|
print(args, file=sys.stderr)
|
||||||
if len(args.dumpfiles) > 0:
|
if len(args.dumpfiles) > 0:
|
||||||
for filename in args.dumpfiles:
|
for filename in args.dumpfiles:
|
||||||
input_file = open_input_file(filename, args.fandom_2020)
|
# Determine output file path before opening input (so resume errors are caught early)
|
||||||
|
|
||||||
# open directory for output
|
|
||||||
if args.output:
|
if args.output:
|
||||||
output = args.output[0]
|
output = args.output[0]
|
||||||
else:
|
else:
|
||||||
@@ -943,17 +1246,69 @@ def main():
|
|||||||
|
|
||||||
output_parquet = output.endswith(".parquet")
|
output_parquet = output.endswith(".parquet")
|
||||||
|
|
||||||
print("Processing file: %s" % filename, file=sys.stderr)
|
|
||||||
|
|
||||||
if args.stdout:
|
if args.stdout:
|
||||||
# Parquet libraries need a binary output, so just sys.stdout doesn't work.
|
|
||||||
output_file = sys.stdout.buffer
|
output_file = sys.stdout.buffer
|
||||||
elif os.path.isdir(output) or output_parquet:
|
elif os.path.isdir(output) or output_parquet:
|
||||||
filename = os.path.join(output, os.path.basename(filename))
|
output_filename = os.path.join(output, os.path.basename(filename))
|
||||||
output_file = get_output_filename(filename, parquet=output_parquet)
|
output_file = get_output_filename(output_filename, parquet=output_parquet)
|
||||||
else:
|
else:
|
||||||
output_file = output
|
output_file = output
|
||||||
|
|
||||||
|
# Handle resume functionality before opening input file
|
||||||
|
resume_point = None
|
||||||
|
start_fresh = False
|
||||||
|
if args.resume:
|
||||||
|
if output_parquet and not args.stdout:
|
||||||
|
# First, merge any leftover temp files from a previous interrupted run
|
||||||
|
cleanup_result = cleanup_interrupted_resume(output_file, args.partition_namespaces)
|
||||||
|
if cleanup_result == "start_fresh":
|
||||||
|
# All data was corrupted, start from beginning
|
||||||
|
start_fresh = True
|
||||||
|
print("Starting fresh due to data corruption.", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
resume_point = get_resume_point(output_file, args.partition_namespaces)
|
||||||
|
if resume_point is not None:
|
||||||
|
if args.partition_namespaces:
|
||||||
|
ns_list = sorted(resume_point.keys())
|
||||||
|
print(f"Resuming with per-namespace resume points for {len(ns_list)} namespaces", file=sys.stderr)
|
||||||
|
for ns in ns_list:
|
||||||
|
pageid, revid = resume_point[ns]
|
||||||
|
print(f" namespace={ns}: pageid={pageid}, revid={revid}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
pageid, revid = resume_point
|
||||||
|
print(f"Resuming from last written point: pageid={pageid}, revid={revid}", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
# resume_point is None - check if file exists but is corrupt
|
||||||
|
if args.partition_namespaces:
|
||||||
|
partition_dir = os.path.dirname(output_file)
|
||||||
|
output_filename = os.path.basename(output_file)
|
||||||
|
corrupt_files = []
|
||||||
|
if os.path.isdir(partition_dir):
|
||||||
|
for d in os.listdir(partition_dir):
|
||||||
|
if d.startswith('namespace='):
|
||||||
|
filepath = os.path.join(partition_dir, d, output_filename)
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
corrupt_files.append(filepath)
|
||||||
|
if corrupt_files:
|
||||||
|
print("Output files exist but are corrupt, deleting and starting fresh.", file=sys.stderr)
|
||||||
|
for filepath in corrupt_files:
|
||||||
|
os.remove(filepath)
|
||||||
|
start_fresh = True
|
||||||
|
else:
|
||||||
|
if os.path.exists(output_file):
|
||||||
|
# File exists but is corrupt - start fresh
|
||||||
|
print(f"Output file {output_file} exists but is corrupt, starting fresh.", file=sys.stderr)
|
||||||
|
os.remove(output_file)
|
||||||
|
start_fresh = True
|
||||||
|
else:
|
||||||
|
sys.exit("Error: --resume only works with parquet output (not stdout or TSV)")
|
||||||
|
|
||||||
|
# Now open the input file
|
||||||
|
print("Processing file: %s" % filename, file=sys.stderr)
|
||||||
|
input_file = open_input_file(filename, args.fandom_2020)
|
||||||
|
|
||||||
|
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
|
||||||
|
|
||||||
wikiq = WikiqParser(
|
wikiq = WikiqParser(
|
||||||
input_file,
|
input_file,
|
||||||
output_file,
|
output_file,
|
||||||
@@ -970,14 +1325,44 @@ def main():
|
|||||||
output_parquet=output_parquet,
|
output_parquet=output_parquet,
|
||||||
partition_namespaces=args.partition_namespaces,
|
partition_namespaces=args.partition_namespaces,
|
||||||
batch_size=args.batch_size,
|
batch_size=args.batch_size,
|
||||||
|
resume_point=resume_point,
|
||||||
|
external_links=args.external_links,
|
||||||
|
citations=args.citations,
|
||||||
|
wikilinks=args.wikilinks,
|
||||||
|
templates=args.templates,
|
||||||
|
headings=args.headings,
|
||||||
|
time_limit_seconds=time_limit_seconds,
|
||||||
)
|
)
|
||||||
|
|
||||||
wikiq.process()
|
# Register signal handlers for graceful shutdown (CLI only)
|
||||||
|
def handle_shutdown(signum, frame):
|
||||||
|
sig_name = signal.Signals(signum).name
|
||||||
|
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
|
||||||
|
wikiq.request_shutdown()
|
||||||
|
|
||||||
|
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
|
||||||
|
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
|
||||||
|
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
|
||||||
|
original_sigusr2 = signal.signal(signal.SIGUSR2, handle_shutdown)
|
||||||
|
|
||||||
|
try:
|
||||||
|
wikiq.process()
|
||||||
|
finally:
|
||||||
|
# Restore original signal handlers
|
||||||
|
signal.signal(signal.SIGTERM, original_sigterm)
|
||||||
|
signal.signal(signal.SIGINT, original_sigint)
|
||||||
|
signal.signal(signal.SIGUSR1, original_sigusr1)
|
||||||
|
signal.signal(signal.SIGUSR2, original_sigusr2)
|
||||||
|
|
||||||
# close things
|
# close things
|
||||||
input_file.close()
|
input_file.close()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
if args.resume:
|
||||||
|
print("Warning: --resume cannot be used with stdin/stdout", file=sys.stderr)
|
||||||
|
|
||||||
|
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
|
||||||
|
|
||||||
wikiq = WikiqParser(
|
wikiq = WikiqParser(
|
||||||
sys.stdin,
|
sys.stdin,
|
||||||
sys.stdout,
|
sys.stdout,
|
||||||
@@ -993,9 +1378,32 @@ def main():
|
|||||||
diff=args.diff,
|
diff=args.diff,
|
||||||
text=args.text,
|
text=args.text,
|
||||||
batch_size=args.batch_size,
|
batch_size=args.batch_size,
|
||||||
|
resume_point=None,
|
||||||
|
external_links=args.external_links,
|
||||||
|
citations=args.citations,
|
||||||
|
wikilinks=args.wikilinks,
|
||||||
|
templates=args.templates,
|
||||||
|
headings=args.headings,
|
||||||
|
time_limit_seconds=time_limit_seconds,
|
||||||
)
|
)
|
||||||
|
|
||||||
wikiq.process()
|
# Register signal handlers for graceful shutdown (CLI only)
|
||||||
|
def handle_shutdown(signum, frame):
|
||||||
|
sig_name = signal.Signals(signum).name
|
||||||
|
print(f"\nReceived {sig_name}, requesting graceful shutdown...", file=sys.stderr)
|
||||||
|
wikiq.request_shutdown()
|
||||||
|
|
||||||
|
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
|
||||||
|
original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
|
||||||
|
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
|
||||||
|
|
||||||
|
try:
|
||||||
|
wikiq.process()
|
||||||
|
finally:
|
||||||
|
# Restore original signal handlers
|
||||||
|
signal.signal(signal.SIGTERM, original_sigterm)
|
||||||
|
signal.signal(signal.SIGINT, original_sigint)
|
||||||
|
signal.signal(signal.SIGUSR1, original_sigusr1)
|
||||||
|
|
||||||
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
|
# stop_words = "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your"
|
||||||
# stop_words = stop_words.split(",")
|
# stop_words = stop_words.split(",")
|
||||||
|
|||||||
542
src/wikiq/resume.py
Normal file
542
src/wikiq/resume.py
Normal file
@@ -0,0 +1,542 @@
|
|||||||
|
"""
|
||||||
|
Checkpoint and resume functionality for wikiq parquet output.
|
||||||
|
|
||||||
|
This module handles:
|
||||||
|
- Finding resume points in existing parquet output
|
||||||
|
- Merging resumed data with existing output (streaming, memory-efficient)
|
||||||
|
- Checkpoint file management for fast resume point lookup
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pyarrow.parquet as pq
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_interrupted_resume(output_file, partition_namespaces):
|
||||||
|
"""
|
||||||
|
Merge any leftover .resume_temp files from a previous interrupted run.
|
||||||
|
|
||||||
|
This should be called BEFORE get_resume_point() so the resume point
|
||||||
|
is calculated from the merged data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None - no temp files found or normal merge completed
|
||||||
|
"start_fresh" - both original and temp were corrupted and deleted
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
if partition_namespaces:
|
||||||
|
partition_dir = os.path.dirname(output_file)
|
||||||
|
output_filename = os.path.basename(output_file)
|
||||||
|
temp_suffix = ".resume_temp"
|
||||||
|
|
||||||
|
if not os.path.isdir(partition_dir):
|
||||||
|
return
|
||||||
|
|
||||||
|
has_old_temp_files = False
|
||||||
|
for ns_dir in os.listdir(partition_dir):
|
||||||
|
if ns_dir.startswith('namespace='):
|
||||||
|
temp_path = os.path.join(partition_dir, ns_dir, output_filename + temp_suffix)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
has_old_temp_files = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if has_old_temp_files:
|
||||||
|
print(f"Found leftover temp files in {partition_dir} from previous interrupted partitioned run, merging first...", file=sys.stderr)
|
||||||
|
had_corruption = merge_partitioned_namespaces(partition_dir, temp_suffix, output_filename)
|
||||||
|
|
||||||
|
# Check if any valid data remains after merge
|
||||||
|
has_valid_data = False
|
||||||
|
for ns_dir in os.listdir(partition_dir):
|
||||||
|
if ns_dir.startswith('namespace='):
|
||||||
|
ns_path = os.path.join(partition_dir, ns_dir)
|
||||||
|
parquet_files = [f for f in os.listdir(ns_path) if f.endswith('.parquet') and not f.endswith('.resume_temp')]
|
||||||
|
if parquet_files:
|
||||||
|
has_valid_data = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if had_corruption and not has_valid_data:
|
||||||
|
# All data was corrupted, remove checkpoint and start fresh
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||||
|
if os.path.exists(checkpoint_path):
|
||||||
|
os.remove(checkpoint_path)
|
||||||
|
print("All partitioned files were corrupted, will start fresh.", file=sys.stderr)
|
||||||
|
return "start_fresh"
|
||||||
|
|
||||||
|
print("Previous temp files merged successfully.", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
temp_output_file = output_file + ".resume_temp"
|
||||||
|
if os.path.exists(temp_output_file) and not os.path.isdir(temp_output_file):
|
||||||
|
print(f"Found leftover temp file {temp_output_file} from previous interrupted run, merging first...", file=sys.stderr)
|
||||||
|
merged_path = output_file + ".merged"
|
||||||
|
merged = merge_parquet_files(output_file, temp_output_file, merged_path)
|
||||||
|
if merged == "original_only":
|
||||||
|
# Temp file was invalid, just remove it
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
elif merged == "temp_only":
|
||||||
|
# Original was corrupted or missing, use temp as new base
|
||||||
|
if os.path.exists(output_file):
|
||||||
|
os.remove(output_file)
|
||||||
|
os.rename(temp_output_file, output_file)
|
||||||
|
print("Recovered from temp file (original was corrupted or missing).", file=sys.stderr)
|
||||||
|
elif merged == "both_invalid":
|
||||||
|
# Both files corrupted or missing, remove both and start fresh
|
||||||
|
if os.path.exists(output_file):
|
||||||
|
os.remove(output_file)
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
# Also remove stale checkpoint file
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||||
|
if os.path.exists(checkpoint_path):
|
||||||
|
os.remove(checkpoint_path)
|
||||||
|
print("Both files were corrupted, will start fresh.", file=sys.stderr)
|
||||||
|
return "start_fresh"
|
||||||
|
elif merged == "merged":
|
||||||
|
os.remove(output_file)
|
||||||
|
os.rename(merged_path, output_file)
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
print("Previous temp file merged successfully.", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
# Both empty - unusual
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
|
||||||
|
|
||||||
|
def get_checkpoint_path(output_file, partition_namespaces=False):
|
||||||
|
"""Get the path to the checkpoint file for a given output file.
|
||||||
|
|
||||||
|
For partitioned output, the checkpoint is placed outside the partition directory
|
||||||
|
to avoid pyarrow trying to read it as a parquet file. The filename includes
|
||||||
|
the output filename to keep it unique per input file (for parallel jobs).
|
||||||
|
"""
|
||||||
|
if partition_namespaces:
|
||||||
|
# output_file is like partition_dir/output.parquet
|
||||||
|
# checkpoint should be at parent level: parent/output.parquet.checkpoint
|
||||||
|
partition_dir = os.path.dirname(output_file)
|
||||||
|
output_filename = os.path.basename(output_file)
|
||||||
|
parent_dir = os.path.dirname(partition_dir)
|
||||||
|
return os.path.join(parent_dir, output_filename + ".checkpoint")
|
||||||
|
return str(output_file) + ".checkpoint"
|
||||||
|
|
||||||
|
|
||||||
|
def read_checkpoint(output_file, partition_namespaces=False):
|
||||||
|
"""
|
||||||
|
Read resume point from checkpoint file if it exists.
|
||||||
|
|
||||||
|
Checkpoint format:
|
||||||
|
Single file: {"pageid": 54, "revid": 325}
|
||||||
|
Partitioned: {"0": {"pageid": 54, "revid": 325}, "1": {"pageid": 123, "revid": 456}}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
For single files: A tuple (pageid, revid), or None if not found.
|
||||||
|
For partitioned: A dict mapping namespace -> (pageid, revid), or None.
|
||||||
|
"""
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||||
|
if not os.path.exists(checkpoint_path):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(checkpoint_path, 'r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Single-file format: {"pageid": ..., "revid": ...}
|
||||||
|
if "pageid" in data and "revid" in data:
|
||||||
|
return (data["pageid"], data["revid"])
|
||||||
|
|
||||||
|
# Partitioned format: {"0": {"pageid": ..., "revid": ...}, ...}
|
||||||
|
result = {}
|
||||||
|
for key, value in data.items():
|
||||||
|
result[int(key)] = (value["pageid"], value["revid"])
|
||||||
|
|
||||||
|
return result if result else None
|
||||||
|
|
||||||
|
except (json.JSONDecodeError, IOError, KeyError, TypeError) as e:
|
||||||
|
print(f"Warning: Could not read checkpoint file {checkpoint_path}: {e}", file=sys.stderr)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_resume_point(output_file, partition_namespaces=False):
|
||||||
|
"""
|
||||||
|
Find the resume point(s) from existing parquet output.
|
||||||
|
|
||||||
|
First checks for a checkpoint file (fast), then falls back to scanning
|
||||||
|
the parquet output (slow, for backwards compatibility).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_file: Path to the output file. For single files, this is the parquet file path.
|
||||||
|
For partitioned namespaces, this is the path like dir/dump.parquet where
|
||||||
|
namespace=* subdirectories are in the parent dir.
|
||||||
|
partition_namespaces: Whether the output uses namespace partitioning.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
For single files: A tuple (pageid, revid) for the row with the highest pageid,
|
||||||
|
or None if not found.
|
||||||
|
For partitioned: A dict mapping namespace -> (pageid, revid) for each partition,
|
||||||
|
or None if no partitions exist.
|
||||||
|
"""
|
||||||
|
# First try checkpoint file (fast)
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces)
|
||||||
|
checkpoint_result = read_checkpoint(output_file, partition_namespaces)
|
||||||
|
if checkpoint_result is not None:
|
||||||
|
print(f"Resume point found in checkpoint file {checkpoint_path}", file=sys.stderr)
|
||||||
|
return checkpoint_result
|
||||||
|
|
||||||
|
# Fall back to scanning parquet (slow, for backwards compatibility)
|
||||||
|
print(f"No checkpoint file found at {checkpoint_path}, scanning parquet output...", file=sys.stderr)
|
||||||
|
try:
|
||||||
|
if partition_namespaces:
|
||||||
|
return _get_resume_point_partitioned(output_file)
|
||||||
|
else:
|
||||||
|
return _get_resume_point_single_file(output_file)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading resume point from {output_file}: {e}", file=sys.stderr)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_last_row_resume_point(pq_path):
|
||||||
|
"""Get resume point by reading only the last row group of a parquet file.
|
||||||
|
|
||||||
|
Since data is written in page/revision order, the last row group contains
|
||||||
|
the highest pageid/revid, and the last row in that group is the resume point.
|
||||||
|
"""
|
||||||
|
pf = pq.ParquetFile(pq_path)
|
||||||
|
if pf.metadata.num_row_groups == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
last_rg_idx = pf.metadata.num_row_groups - 1
|
||||||
|
table = pf.read_row_group(last_rg_idx, columns=['articleid', 'revid'])
|
||||||
|
if table.num_rows == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
max_pageid = table['articleid'][-1].as_py()
|
||||||
|
max_revid = table['revid'][-1].as_py()
|
||||||
|
return (max_pageid, max_revid)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_resume_point_partitioned(output_file):
|
||||||
|
"""Find per-namespace resume points from partitioned output.
|
||||||
|
|
||||||
|
Only looks for the specific output file in each namespace directory.
|
||||||
|
Returns a dict mapping namespace -> (max_pageid, max_revid) for each partition
|
||||||
|
where the output file exists.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_file: Path like 'dir/output.parquet' where namespace=* subdirectories
|
||||||
|
contain files named 'output.parquet'.
|
||||||
|
"""
|
||||||
|
partition_dir = os.path.dirname(output_file)
|
||||||
|
output_filename = os.path.basename(output_file)
|
||||||
|
|
||||||
|
if not os.path.exists(partition_dir) or not os.path.isdir(partition_dir):
|
||||||
|
return None
|
||||||
|
|
||||||
|
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
||||||
|
if not namespace_dirs:
|
||||||
|
return None
|
||||||
|
|
||||||
|
resume_points = {}
|
||||||
|
for ns_dir in namespace_dirs:
|
||||||
|
ns = int(ns_dir.split('=')[1])
|
||||||
|
pq_path = os.path.join(partition_dir, ns_dir, output_filename)
|
||||||
|
|
||||||
|
if not os.path.exists(pq_path):
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = _get_last_row_resume_point(pq_path)
|
||||||
|
if result is not None:
|
||||||
|
resume_points[ns] = result
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Could not read {pq_path}: {e}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
return resume_points if resume_points else None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_resume_point_single_file(output_file):
|
||||||
|
"""Find resume point from a single parquet file."""
|
||||||
|
if not os.path.exists(output_file):
|
||||||
|
return None
|
||||||
|
|
||||||
|
if os.path.isdir(output_file):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _get_last_row_resume_point(output_file)
|
||||||
|
|
||||||
|
|
||||||
|
def merge_parquet_files(original_path, temp_path, merged_path):
|
||||||
|
"""
|
||||||
|
Merge two parquet files by streaming row groups from original and temp into merged.
|
||||||
|
|
||||||
|
This is memory-efficient: only one row group is loaded at a time.
|
||||||
|
Returns:
|
||||||
|
"merged" - merged file was created from both sources
|
||||||
|
"original_only" - temp was invalid, keep original unchanged
|
||||||
|
"temp_only" - original was corrupted but temp is valid, use temp
|
||||||
|
"both_invalid" - both files invalid, delete both and start fresh
|
||||||
|
False - both files were valid but empty
|
||||||
|
"""
|
||||||
|
original_valid = False
|
||||||
|
temp_valid = False
|
||||||
|
original_pq = None
|
||||||
|
temp_pq = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
original_pq = pq.ParquetFile(original_path)
|
||||||
|
original_valid = True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: Original file {original_path} is corrupted or invalid: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if not os.path.exists(temp_path):
|
||||||
|
print(f"Note: Temp file {temp_path} does not exist (namespace had no records after resume point)", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
temp_pq = pq.ParquetFile(temp_path)
|
||||||
|
temp_valid = True
|
||||||
|
except Exception:
|
||||||
|
print(f"Note: No new data in temp file {temp_path} (file exists but is invalid)", file=sys.stderr)
|
||||||
|
|
||||||
|
if not original_valid and not temp_valid:
|
||||||
|
print(f"Both original and temp files are invalid, will start fresh", file=sys.stderr)
|
||||||
|
return "both_invalid"
|
||||||
|
|
||||||
|
if not original_valid and temp_valid:
|
||||||
|
print(f"Original file corrupted but temp file is valid, recovering from temp", file=sys.stderr)
|
||||||
|
return "temp_only"
|
||||||
|
|
||||||
|
if original_valid and not temp_valid:
|
||||||
|
return "original_only"
|
||||||
|
|
||||||
|
merged_writer = None
|
||||||
|
|
||||||
|
# Copy all row groups from the original file
|
||||||
|
for i in range(original_pq.num_row_groups):
|
||||||
|
row_group = original_pq.read_row_group(i)
|
||||||
|
if merged_writer is None:
|
||||||
|
merged_writer = pq.ParquetWriter(
|
||||||
|
merged_path,
|
||||||
|
row_group.schema,
|
||||||
|
flavor="spark"
|
||||||
|
)
|
||||||
|
merged_writer.write_table(row_group)
|
||||||
|
|
||||||
|
# Append all row groups from the temp file
|
||||||
|
for i in range(temp_pq.num_row_groups):
|
||||||
|
row_group = temp_pq.read_row_group(i)
|
||||||
|
if merged_writer is None:
|
||||||
|
merged_writer = pq.ParquetWriter(
|
||||||
|
merged_path,
|
||||||
|
row_group.schema,
|
||||||
|
flavor="spark"
|
||||||
|
)
|
||||||
|
merged_writer.write_table(row_group)
|
||||||
|
|
||||||
|
# Close the writer
|
||||||
|
if merged_writer is not None:
|
||||||
|
merged_writer.close()
|
||||||
|
return "merged"
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def merge_partitioned_namespaces(partition_dir, temp_suffix, file_filter):
|
||||||
|
"""
|
||||||
|
Merge partitioned namespace directories after resume.
|
||||||
|
|
||||||
|
For partitioned namespaces, temp files are written alongside the original files
|
||||||
|
in each namespace directory with the temp suffix appended to the filename.
|
||||||
|
E.g., original: namespace=0/file.parquet, temp: namespace=0/file.parquet.resume_temp
|
||||||
|
|
||||||
|
Args:
|
||||||
|
partition_dir: The partition directory containing namespace=* subdirs
|
||||||
|
temp_suffix: The suffix appended to temp files (e.g., '.resume_temp')
|
||||||
|
file_filter: Only process temp files matching this base name
|
||||||
|
(e.g., 'enwiki-20250123-pages-meta-history24-p53238682p53445302.parquet')
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if at least one namespace has valid data after merge
|
||||||
|
False if all namespaces ended up with corrupted/deleted data
|
||||||
|
"""
|
||||||
|
namespace_dirs = [d for d in os.listdir(partition_dir) if d.startswith('namespace=')]
|
||||||
|
had_corruption = False
|
||||||
|
expected_temp = file_filter + temp_suffix
|
||||||
|
|
||||||
|
for ns_dir in namespace_dirs:
|
||||||
|
ns_path = os.path.join(partition_dir, ns_dir)
|
||||||
|
temp_path = os.path.join(ns_path, expected_temp)
|
||||||
|
|
||||||
|
if not os.path.exists(temp_path):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Original file is the temp file without the suffix
|
||||||
|
original_file = file_filter
|
||||||
|
original_path = os.path.join(ns_path, original_file)
|
||||||
|
|
||||||
|
if os.path.exists(original_path):
|
||||||
|
# Merge the files
|
||||||
|
merged_path = original_path + ".merged"
|
||||||
|
merged = merge_parquet_files(original_path, temp_path, merged_path)
|
||||||
|
|
||||||
|
if merged == "original_only":
|
||||||
|
# Temp file was invalid (no new data), keep original unchanged
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
elif merged == "temp_only":
|
||||||
|
# Original was corrupted, use temp as new base
|
||||||
|
os.remove(original_path)
|
||||||
|
os.rename(temp_path, original_path)
|
||||||
|
elif merged == "both_invalid":
|
||||||
|
# Both files corrupted, remove both
|
||||||
|
if os.path.exists(original_path):
|
||||||
|
os.remove(original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
had_corruption = True
|
||||||
|
elif merged == "merged":
|
||||||
|
# Replace the original file with the merged file
|
||||||
|
os.remove(original_path)
|
||||||
|
os.rename(merged_path, original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
else:
|
||||||
|
# Both files were empty (False), just remove them
|
||||||
|
if os.path.exists(original_path):
|
||||||
|
os.remove(original_path)
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
else:
|
||||||
|
# No original file, rename temp to original only if valid
|
||||||
|
try:
|
||||||
|
pq.ParquetFile(temp_path)
|
||||||
|
os.rename(temp_path, original_path)
|
||||||
|
except Exception:
|
||||||
|
# Temp file invalid or missing, just remove it if it exists
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
had_corruption = True
|
||||||
|
|
||||||
|
return had_corruption
|
||||||
|
|
||||||
|
|
||||||
|
def finalize_resume_merge(
|
||||||
|
original_output_file,
|
||||||
|
temp_output_file,
|
||||||
|
partition_namespaces,
|
||||||
|
original_partition_dir
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Finalize the resume by merging temp output with original output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
original_output_file: Path to the original output file
|
||||||
|
temp_output_file: Path to the temp output file written during resume
|
||||||
|
partition_namespaces: Whether using partitioned namespace output
|
||||||
|
original_partition_dir: The partition directory (for partitioned output)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: If merge fails (temp file is preserved for recovery)
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
print("Merging resumed data with existing output...", file=sys.stderr)
|
||||||
|
try:
|
||||||
|
if partition_namespaces and original_partition_dir is not None:
|
||||||
|
# For partitioned namespaces, temp files are written alongside originals
|
||||||
|
# with '.resume_temp' suffix in each namespace directory.
|
||||||
|
# Only merge temp files for the current dump file, not other concurrent jobs.
|
||||||
|
file_filter = os.path.basename(original_output_file)
|
||||||
|
merge_partitioned_namespaces(original_partition_dir, ".resume_temp", file_filter)
|
||||||
|
# Clean up the empty temp directory we created
|
||||||
|
if os.path.exists(temp_output_file) and os.path.isdir(temp_output_file):
|
||||||
|
shutil.rmtree(temp_output_file)
|
||||||
|
else:
|
||||||
|
# Merge single parquet files
|
||||||
|
merged_output_file = original_output_file + ".merged"
|
||||||
|
merged = merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
|
||||||
|
|
||||||
|
if merged == "original_only":
|
||||||
|
# Temp file was invalid (no new data), keep original unchanged
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
elif merged == "temp_only":
|
||||||
|
# Original was corrupted, use temp as new base
|
||||||
|
os.remove(original_output_file)
|
||||||
|
os.rename(temp_output_file, original_output_file)
|
||||||
|
elif merged == "both_invalid":
|
||||||
|
# Both files corrupted, remove both
|
||||||
|
os.remove(original_output_file)
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
elif merged == "merged":
|
||||||
|
# Replace the original file with the merged file
|
||||||
|
os.remove(original_output_file)
|
||||||
|
os.rename(merged_output_file, original_output_file)
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
else:
|
||||||
|
# Both files were empty (False) - unusual, but clean up
|
||||||
|
os.remove(original_output_file)
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
|
||||||
|
print("Merge complete.", file=sys.stderr)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error merging resume data for {original_output_file}: {e}", file=sys.stderr)
|
||||||
|
print(f"New data saved in: {temp_output_file}", file=sys.stderr)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def setup_resume_temp_output(output_file, partition_namespaces):
|
||||||
|
"""
|
||||||
|
Set up temp output for resume mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_file: The original output file path
|
||||||
|
partition_namespaces: Whether using partitioned namespace output
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (original_output_file, temp_output_file, original_partition_dir)
|
||||||
|
or (None, None, None) if no existing output to resume from.
|
||||||
|
"""
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
original_output_file = None
|
||||||
|
temp_output_file = None
|
||||||
|
original_partition_dir = None
|
||||||
|
|
||||||
|
# For partitioned namespaces, check if the specific output file exists in any namespace
|
||||||
|
if partition_namespaces:
|
||||||
|
partition_dir = os.path.dirname(output_file)
|
||||||
|
output_filename = os.path.basename(output_file)
|
||||||
|
output_exists = False
|
||||||
|
if os.path.isdir(partition_dir):
|
||||||
|
for d in os.listdir(partition_dir):
|
||||||
|
if d.startswith('namespace='):
|
||||||
|
if os.path.exists(os.path.join(partition_dir, d, output_filename)):
|
||||||
|
output_exists = True
|
||||||
|
break
|
||||||
|
if output_exists:
|
||||||
|
original_partition_dir = partition_dir
|
||||||
|
else:
|
||||||
|
output_exists = isinstance(output_file, str) and os.path.exists(output_file)
|
||||||
|
|
||||||
|
if output_exists:
|
||||||
|
original_output_file = output_file
|
||||||
|
temp_output_file = output_file + ".resume_temp"
|
||||||
|
|
||||||
|
# Note: cleanup_interrupted_resume() should have been called before this
|
||||||
|
# to merge any leftover temp files from a previous interrupted run.
|
||||||
|
# Here we just clean up any remaining temp directory markers.
|
||||||
|
if os.path.exists(temp_output_file):
|
||||||
|
if os.path.isdir(temp_output_file):
|
||||||
|
shutil.rmtree(temp_output_file)
|
||||||
|
else:
|
||||||
|
os.remove(temp_output_file)
|
||||||
|
|
||||||
|
if partition_namespaces:
|
||||||
|
os.makedirs(temp_output_file, exist_ok=True)
|
||||||
|
|
||||||
|
return original_output_file, temp_output_file, original_partition_dir
|
||||||
@@ -2,7 +2,7 @@ import sys
|
|||||||
from abc import abstractmethod, ABC
|
from abc import abstractmethod, ABC
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
from typing import Generic, TypeVar, Union
|
from typing import Generic, TypeVar, Union, TYPE_CHECKING
|
||||||
|
|
||||||
import mwreverts
|
import mwreverts
|
||||||
import mwtypes
|
import mwtypes
|
||||||
@@ -10,6 +10,9 @@ import mwxml
|
|||||||
|
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from wikiq.wikitext_parser import WikitextParser
|
||||||
|
|
||||||
T = TypeVar('T')
|
T = TypeVar('T')
|
||||||
|
|
||||||
|
|
||||||
@@ -217,3 +220,108 @@ class RevisionText(RevisionField[str]):
|
|||||||
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
|
||||||
revision = revisions[-1]
|
revision = revisions[-1]
|
||||||
return revision.text
|
return revision.text
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionExternalLinks(RevisionField[Union[list[str], None]]):
|
||||||
|
"""Extract all external links from revision text."""
|
||||||
|
|
||||||
|
field = pa.field("external_links", pa.list_(pa.string()), nullable=True)
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
|
||||||
|
revision = revisions[-1]
|
||||||
|
if revision.deleted.text:
|
||||||
|
return None
|
||||||
|
return self.wikitext_parser.extract_external_links(revision.text)
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionCitations(RevisionField[Union[list[str], None]]):
|
||||||
|
"""Extract citations from ref tags and cite templates."""
|
||||||
|
|
||||||
|
field = pa.field("citations", pa.list_(pa.string()), nullable=True)
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[str], None]:
|
||||||
|
revision = revisions[-1]
|
||||||
|
if revision.deleted.text:
|
||||||
|
return None
|
||||||
|
return self.wikitext_parser.extract_citations(revision.text)
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionWikilinks(RevisionField[Union[list[dict], None]]):
|
||||||
|
"""Extract all internal wikilinks from revision text."""
|
||||||
|
|
||||||
|
# Struct type with title and optional display text
|
||||||
|
field = pa.field("wikilinks", pa.list_(pa.struct([
|
||||||
|
pa.field("title", pa.string()),
|
||||||
|
pa.field("text", pa.string(), nullable=True),
|
||||||
|
])), nullable=True)
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||||
|
revision = revisions[-1]
|
||||||
|
if revision.deleted.text:
|
||||||
|
return None
|
||||||
|
return self.wikitext_parser.extract_wikilinks(revision.text)
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionTemplates(RevisionField[Union[list[dict], None]]):
|
||||||
|
"""Extract all templates from revision text."""
|
||||||
|
|
||||||
|
# Struct type with name and params map
|
||||||
|
field = pa.field("templates", pa.list_(pa.struct([
|
||||||
|
pa.field("name", pa.string()),
|
||||||
|
pa.field("params", pa.map_(pa.string(), pa.string())),
|
||||||
|
])), nullable=True)
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||||
|
revision = revisions[-1]
|
||||||
|
if revision.deleted.text:
|
||||||
|
return None
|
||||||
|
return self.wikitext_parser.extract_templates(revision.text)
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionHeadings(RevisionField[Union[list[dict], None]]):
|
||||||
|
"""Extract all section headings from revision text."""
|
||||||
|
|
||||||
|
# Struct type with level and text
|
||||||
|
field = pa.field("headings", pa.list_(pa.struct([
|
||||||
|
pa.field("level", pa.int8()),
|
||||||
|
pa.field("text", pa.string()),
|
||||||
|
])), nullable=True)
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> Union[list[dict], None]:
|
||||||
|
revision = revisions[-1]
|
||||||
|
if revision.deleted.text:
|
||||||
|
return None
|
||||||
|
return self.wikitext_parser.extract_headings(revision.text)
|
||||||
|
|
||||||
|
|
||||||
|
class RevisionParserTimeout(RevisionField[bool]):
|
||||||
|
"""Track whether the wikitext parser timed out for this revision."""
|
||||||
|
|
||||||
|
field = pa.field("parser_timeout", pa.bool_())
|
||||||
|
|
||||||
|
def __init__(self, wikitext_parser: "WikitextParser"):
|
||||||
|
super().__init__()
|
||||||
|
self.wikitext_parser = wikitext_parser
|
||||||
|
|
||||||
|
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
|
||||||
|
return self.wikitext_parser.last_parse_timed_out
|
||||||
|
|||||||
126
src/wikiq/wikitext_parser.py
Normal file
126
src/wikiq/wikitext_parser.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
"""Shared wikitext parser with caching to avoid duplicate parsing."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import mwparserfromhell
|
||||||
|
|
||||||
|
PARSER_TIMEOUT = 60 # seconds
|
||||||
|
|
||||||
|
|
||||||
|
class WikitextParser:
|
||||||
|
"""Caches parsed wikicode to avoid re-parsing the same text."""
|
||||||
|
|
||||||
|
CITE_TEMPLATES = {
|
||||||
|
'cite web', 'cite book', 'cite journal', 'cite news',
|
||||||
|
'cite magazine', 'cite conference', 'cite encyclopedia',
|
||||||
|
'cite report', 'cite thesis', 'cite press release',
|
||||||
|
'citation', 'sfn', 'harvnb', 'harv'
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._cached_text: str | None = None
|
||||||
|
self._cached_wikicode = None
|
||||||
|
self.last_parse_timed_out: bool = False
|
||||||
|
|
||||||
|
async def _parse_async(self, text: str):
|
||||||
|
"""Parse wikitext with timeout protection."""
|
||||||
|
try:
|
||||||
|
result = await asyncio.wait_for(
|
||||||
|
asyncio.to_thread(mwparserfromhell.parse, text),
|
||||||
|
timeout=PARSER_TIMEOUT
|
||||||
|
)
|
||||||
|
return result, False
|
||||||
|
except TimeoutError:
|
||||||
|
return None, True
|
||||||
|
|
||||||
|
def _get_wikicode(self, text: str):
|
||||||
|
"""Parse text and cache result. Returns cached result if text unchanged."""
|
||||||
|
if text != self._cached_text:
|
||||||
|
self._cached_text = text
|
||||||
|
self._cached_wikicode, self.last_parse_timed_out = asyncio.run(self._parse_async(text))
|
||||||
|
return self._cached_wikicode
|
||||||
|
|
||||||
|
def extract_external_links(self, text: str | None) -> list[str] | None:
|
||||||
|
"""Extract all external link URLs from wikitext."""
|
||||||
|
if text is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
wikicode = self._get_wikicode(text)
|
||||||
|
return [str(link.url) for link in wikicode.filter_external_links()]
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_citations(self, text: str | None) -> list[str] | None:
|
||||||
|
"""Extract citations from ref tags and cite templates."""
|
||||||
|
if text is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
wikicode = self._get_wikicode(text)
|
||||||
|
citations = []
|
||||||
|
|
||||||
|
# Extract ref tag contents
|
||||||
|
for tag in wikicode.filter_tags():
|
||||||
|
if tag.tag.lower() == 'ref':
|
||||||
|
content = str(tag.contents).strip()
|
||||||
|
if content:
|
||||||
|
citations.append(f"ref:{content}")
|
||||||
|
|
||||||
|
# Extract cite templates
|
||||||
|
for template in wikicode.filter_templates():
|
||||||
|
template_name = str(template.name).strip().lower()
|
||||||
|
if any(template_name.startswith(cite) for cite in self.CITE_TEMPLATES):
|
||||||
|
citations.append(f"template:{str(template)}")
|
||||||
|
|
||||||
|
return citations
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_wikilinks(self, text: str | None) -> list[dict[str, str | None]] | None:
|
||||||
|
"""Extract all internal wikilinks with title and display text."""
|
||||||
|
if text is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
wikicode = self._get_wikicode(text)
|
||||||
|
result = []
|
||||||
|
for link in wikicode.filter_wikilinks():
|
||||||
|
title = str(link.title).strip()
|
||||||
|
# text is None if no pipe, otherwise the display text
|
||||||
|
display_text = str(link.text).strip() if link.text else None
|
||||||
|
result.append({"title": title, "text": display_text})
|
||||||
|
return result
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_templates(self, text: str | None) -> list[dict] | None:
|
||||||
|
"""Extract all templates with their names and parameters."""
|
||||||
|
if text is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
wikicode = self._get_wikicode(text)
|
||||||
|
result = []
|
||||||
|
for template in wikicode.filter_templates():
|
||||||
|
name = str(template.name).strip()
|
||||||
|
params = {}
|
||||||
|
for param in template.params:
|
||||||
|
param_name = str(param.name).strip()
|
||||||
|
param_value = str(param.value).strip()
|
||||||
|
params[param_name] = param_value
|
||||||
|
result.append({"name": name, "params": params})
|
||||||
|
return result
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_headings(self, text: str | None) -> list[dict] | None:
|
||||||
|
"""Extract all section headings with their levels."""
|
||||||
|
if text is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
wikicode = self._get_wikicode(text)
|
||||||
|
result = []
|
||||||
|
for heading in wikicode.filter_headings():
|
||||||
|
level = heading.level
|
||||||
|
heading_text = str(heading.title).strip()
|
||||||
|
result.append({"level": level, "text": heading_text})
|
||||||
|
return result
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
@@ -1,110 +1,48 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys
|
||||||
import tracemalloc
|
import tracemalloc
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
from typing import Final, Union
|
|
||||||
import pytest
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
import pyarrow as pa
|
||||||
|
import pytest
|
||||||
from pandas import DataFrame
|
from pandas import DataFrame
|
||||||
from pandas.testing import assert_frame_equal, assert_series_equal
|
from pandas.testing import assert_frame_equal, assert_series_equal
|
||||||
|
|
||||||
# Make references to files and wikiq relative to this file, not to the current working directory.
|
from wikiq_test_utils import (
|
||||||
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
BASELINE_DIR,
|
||||||
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR,".."), "src/wikiq/__init__.py")
|
IKWIKI,
|
||||||
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
REGEXTEST,
|
||||||
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
SAILORMOON,
|
||||||
|
TEST_DIR,
|
||||||
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
TEST_OUTPUT_DIR,
|
||||||
SAILORMOON: Final[str] = "sailormoon"
|
TWINPEAKS,
|
||||||
TWINPEAKS: Final[str] = "twinpeaks"
|
WIKIQ,
|
||||||
REGEXTEST: Final[str] = "regextest"
|
WikiqTester,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def setup():
|
def setup():
|
||||||
tracemalloc.start()
|
tracemalloc.start()
|
||||||
|
|
||||||
# Perform directory check and reset here as this is a one-time setup step as opposed to per-test setup.
|
|
||||||
if not os.path.exists(TEST_OUTPUT_DIR):
|
if not os.path.exists(TEST_OUTPUT_DIR):
|
||||||
os.mkdir(TEST_OUTPUT_DIR)
|
os.mkdir(TEST_OUTPUT_DIR)
|
||||||
|
|
||||||
|
|
||||||
# Always run setup, even if this is executed via "python -m unittest" rather
|
|
||||||
# than as __main__.
|
|
||||||
setup()
|
setup()
|
||||||
|
|
||||||
|
|
||||||
class WikiqTester:
|
# with / without pwr DONE
|
||||||
def __init__(
|
# with / without url encode DONE
|
||||||
self,
|
# with / without collapse user DONE
|
||||||
wiki: str,
|
# with output to stdout DONE
|
||||||
case_name: str,
|
# note that the persistence radius is 7 by default
|
||||||
suffix: Union[str, None] = None,
|
# reading various file formats including
|
||||||
in_compression: str = "bz2",
|
# 7z, gz, bz2, xml DONE
|
||||||
baseline_format: str = "tsv",
|
# wikia and wikipedia data DONE
|
||||||
out_format: str = "tsv",
|
# malformed xmls DONE
|
||||||
):
|
|
||||||
self.input_file = os.path.join(
|
|
||||||
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
|
|
||||||
)
|
|
||||||
|
|
||||||
basename = "{0}_{1}".format(case_name, wiki)
|
|
||||||
if suffix:
|
|
||||||
basename = "{0}_{1}".format(basename, suffix)
|
|
||||||
|
|
||||||
self.output = os.path.join(
|
|
||||||
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
|
|
||||||
)
|
|
||||||
|
|
||||||
if os.path.exists(self.output):
|
|
||||||
if os.path.isfile(self.output):
|
|
||||||
os.remove(self.output)
|
|
||||||
else:
|
|
||||||
shutil.rmtree(self.output)
|
|
||||||
|
|
||||||
if out_format == "parquet":
|
|
||||||
os.makedirs(self.output, exist_ok=True)
|
|
||||||
|
|
||||||
if suffix is None:
|
|
||||||
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
|
||||||
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
|
||||||
else:
|
|
||||||
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
|
|
||||||
wiki, suffix, baseline_format
|
|
||||||
)
|
|
||||||
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
|
||||||
|
|
||||||
# If case_name is unset, there are no relevant baseline or test files.
|
|
||||||
if case_name is not None:
|
|
||||||
self.baseline_file = os.path.join(
|
|
||||||
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
|
|
||||||
)
|
|
||||||
|
|
||||||
def call_wikiq(self, *args: str, out: bool = True):
|
|
||||||
"""
|
|
||||||
Calls wikiq with the passed arguments on the input file relevant to the test.
|
|
||||||
:param args: The command line arguments to pass to wikiq.
|
|
||||||
:param out: Whether to pass an output argument to wikiq.
|
|
||||||
:return: The output of the wikiq call.
|
|
||||||
"""
|
|
||||||
if out:
|
|
||||||
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
|
|
||||||
else:
|
|
||||||
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
|
|
||||||
|
|
||||||
print(call)
|
|
||||||
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
|
||||||
|
|
||||||
# with / without pwr DONE
|
|
||||||
# with / without url encode DONE
|
|
||||||
# with / without collapse user DONE
|
|
||||||
# with output to stdout DONE
|
|
||||||
# note that the persistence radius is 7 by default
|
|
||||||
# reading various file formats including
|
|
||||||
# 7z, gz, bz2, xml DONE
|
|
||||||
# wikia and wikipedia data DONE
|
|
||||||
# malformed xmls DONE
|
|
||||||
|
|
||||||
def test_WP_noargs():
|
def test_WP_noargs():
|
||||||
tester = WikiqTester(IKWIKI, "noargs")
|
tester = WikiqTester(IKWIKI, "noargs")
|
||||||
@@ -439,3 +377,338 @@ def test_parquet():
|
|||||||
pytest.fail(exc)
|
pytest.fail(exc)
|
||||||
|
|
||||||
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
|
# assert_frame_equal(test, baseline, check_like=True, check_dtype=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_external_links_only():
|
||||||
|
"""Test that --external-links extracts external links correctly."""
|
||||||
|
import mwparserfromhell
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "external_links_only", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Also include --text so we can verify extraction against actual wikitext
|
||||||
|
tester.call_wikiq("--external-links", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify external_links column exists
|
||||||
|
assert "external_links" in test.columns, "external_links column should exist"
|
||||||
|
|
||||||
|
# Verify citations column does NOT exist
|
||||||
|
assert "citations" not in test.columns, "citations column should NOT exist when only --external-links is used"
|
||||||
|
|
||||||
|
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
|
||||||
|
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||||
|
"external_links should be a list/array type or None"
|
||||||
|
|
||||||
|
# Verify that extracted URLs look like valid URIs (have a scheme or are protocol-relative)
|
||||||
|
all_urls = []
|
||||||
|
for links in test["external_links"]:
|
||||||
|
if links is not None and len(links) > 0:
|
||||||
|
all_urls.extend(links)
|
||||||
|
|
||||||
|
for url in all_urls:
|
||||||
|
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
|
||||||
|
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
|
||||||
|
is_protocol_relative = url.startswith("//")
|
||||||
|
assert has_scheme or is_protocol_relative, \
|
||||||
|
f"External link should be a valid URI, got: {url}"
|
||||||
|
|
||||||
|
# Verify extraction matches mwparserfromhell for a sample of rows with text
|
||||||
|
rows_with_links = test[test["external_links"].apply(lambda x: x is not None and len(x) > 0)]
|
||||||
|
if len(rows_with_links) > 0:
|
||||||
|
# Test up to 5 rows
|
||||||
|
sample = rows_with_links.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
wikicode = mwparserfromhell.parse(text)
|
||||||
|
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
|
||||||
|
actual_links = list(row["external_links"])
|
||||||
|
assert actual_links == expected_links, \
|
||||||
|
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
|
||||||
|
|
||||||
|
print(f"External links only test passed! {len(test)} rows, {len(all_urls)} total URLs extracted")
|
||||||
|
|
||||||
|
|
||||||
|
def test_citations_only():
|
||||||
|
"""Test that --citations extracts citations correctly."""
|
||||||
|
import mwparserfromhell
|
||||||
|
from wikiq.wikitext_parser import WikitextParser
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "citations_only", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Also include --text so we can verify extraction against actual wikitext
|
||||||
|
tester.call_wikiq("--citations", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify citations column exists
|
||||||
|
assert "citations" in test.columns, "citations column should exist"
|
||||||
|
|
||||||
|
# Verify external_links column does NOT exist
|
||||||
|
assert "external_links" not in test.columns, "external_links column should NOT exist when only --citations is used"
|
||||||
|
|
||||||
|
# Verify column has list/array type (pandas reads parquet lists as numpy arrays)
|
||||||
|
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||||
|
"citations should be a list/array type or None"
|
||||||
|
|
||||||
|
# Verify that extracted citations have correct prefixes (ref: or template:)
|
||||||
|
all_citations = []
|
||||||
|
for citations in test["citations"]:
|
||||||
|
if citations is not None and len(citations) > 0:
|
||||||
|
all_citations.extend(citations)
|
||||||
|
|
||||||
|
for citation in all_citations:
|
||||||
|
assert citation.startswith("ref:") or citation.startswith("template:"), \
|
||||||
|
f"Citation should start with 'ref:' or 'template:', got: {citation}"
|
||||||
|
|
||||||
|
# Verify extraction matches WikitextParser for a sample of rows with text
|
||||||
|
rows_with_citations = test[test["citations"].apply(lambda x: x is not None and len(x) > 0)]
|
||||||
|
if len(rows_with_citations) > 0:
|
||||||
|
parser = WikitextParser()
|
||||||
|
# Test up to 5 rows
|
||||||
|
sample = rows_with_citations.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
expected_citations = parser.extract_citations(text)
|
||||||
|
actual_citations = list(row["citations"])
|
||||||
|
assert actual_citations == expected_citations, \
|
||||||
|
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
|
||||||
|
|
||||||
|
print(f"Citations only test passed! {len(test)} rows, {len(all_citations)} total citations extracted")
|
||||||
|
|
||||||
|
|
||||||
|
def test_external_links_and_citations():
|
||||||
|
"""Test that both --external-links and --citations work together (shared parser)."""
|
||||||
|
import mwparserfromhell
|
||||||
|
from wikiq.wikitext_parser import WikitextParser
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "external_links_and_citations", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Also include --text so we can verify extraction against actual wikitext
|
||||||
|
tester.call_wikiq("--external-links", "--citations", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify both columns exist
|
||||||
|
assert "external_links" in test.columns, "external_links column should exist"
|
||||||
|
assert "citations" in test.columns, "citations column should exist"
|
||||||
|
|
||||||
|
# Verify both columns have list/array types (pandas reads parquet lists as numpy arrays)
|
||||||
|
assert test["external_links"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||||
|
"external_links should be a list/array type or None"
|
||||||
|
assert test["citations"].apply(lambda x: x is None or hasattr(x, '__len__')).all(), \
|
||||||
|
"citations should be a list/array type or None"
|
||||||
|
|
||||||
|
# Verify URLs look like valid URIs (have a scheme or are protocol-relative)
|
||||||
|
all_urls = []
|
||||||
|
for links in test["external_links"]:
|
||||||
|
if links is not None and len(links) > 0:
|
||||||
|
all_urls.extend(links)
|
||||||
|
|
||||||
|
for url in all_urls:
|
||||||
|
# External links can be http, https, mailto, ftp, etc. or protocol-relative (//)
|
||||||
|
has_scheme = ":" in url and url.index(":") < 10 # scheme:... with short scheme
|
||||||
|
is_protocol_relative = url.startswith("//")
|
||||||
|
assert has_scheme or is_protocol_relative, \
|
||||||
|
f"External link should be a valid URI, got: {url}"
|
||||||
|
|
||||||
|
# Verify citations have correct prefixes
|
||||||
|
all_citations = []
|
||||||
|
for citations in test["citations"]:
|
||||||
|
if citations is not None and len(citations) > 0:
|
||||||
|
all_citations.extend(citations)
|
||||||
|
|
||||||
|
for citation in all_citations:
|
||||||
|
assert citation.startswith("ref:") or citation.startswith("template:"), \
|
||||||
|
f"Citation should start with 'ref:' or 'template:', got: {citation}"
|
||||||
|
|
||||||
|
# Verify extraction matches WikitextParser for a sample of rows with text
|
||||||
|
# This tests that the shared parser optimization works correctly
|
||||||
|
parser = WikitextParser()
|
||||||
|
rows_with_content = test[
|
||||||
|
(test["external_links"].apply(lambda x: x is not None and len(x) > 0)) |
|
||||||
|
(test["citations"].apply(lambda x: x is not None and len(x) > 0))
|
||||||
|
]
|
||||||
|
if len(rows_with_content) > 0:
|
||||||
|
# Test up to 5 rows
|
||||||
|
sample = rows_with_content.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
# Verify external links
|
||||||
|
wikicode = mwparserfromhell.parse(text)
|
||||||
|
expected_links = [str(link.url) for link in wikicode.filter_external_links()]
|
||||||
|
actual_links = list(row["external_links"]) if row["external_links"] is not None else []
|
||||||
|
assert actual_links == expected_links, \
|
||||||
|
f"Row {idx}: external_links mismatch. Expected {expected_links}, got {actual_links}"
|
||||||
|
|
||||||
|
# Verify citations
|
||||||
|
expected_citations = parser.extract_citations(text)
|
||||||
|
actual_citations = list(row["citations"]) if row["citations"] is not None else []
|
||||||
|
assert actual_citations == expected_citations, \
|
||||||
|
f"Row {idx}: citations mismatch. Expected {expected_citations}, got {actual_citations}"
|
||||||
|
|
||||||
|
print(f"External links and citations test passed! {len(test)} rows, {len(all_urls)} URLs, {len(all_citations)} citations")
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_wikitext_columns():
|
||||||
|
"""Test that neither external_links nor citations columns exist without flags."""
|
||||||
|
tester = WikiqTester(SAILORMOON, "no_wikitext_columns", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester.call_wikiq("--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify neither column exists
|
||||||
|
assert "external_links" not in test.columns, "external_links column should NOT exist without --external-links flag"
|
||||||
|
assert "citations" not in test.columns, "citations column should NOT exist without --citations flag"
|
||||||
|
|
||||||
|
print(f"No wikitext columns test passed! {len(test)} rows processed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_wikilinks():
|
||||||
|
"""Test that --wikilinks extracts internal wikilinks correctly."""
|
||||||
|
import mwparserfromhell
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "wikilinks", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester.call_wikiq("--wikilinks", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify wikilinks column exists
|
||||||
|
assert "wikilinks" in test.columns, "wikilinks column should exist"
|
||||||
|
|
||||||
|
# Verify column has list/array type
|
||||||
|
assert test["wikilinks"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||||
|
|
||||||
|
# Verify extraction matches mwparserfromhell for sample rows
|
||||||
|
rows_with_links = test[test["wikilinks"].apply(lambda x: x is not None and len(x) > 0)]
|
||||||
|
if len(rows_with_links) > 0:
|
||||||
|
sample = rows_with_links.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
wikicode = mwparserfromhell.parse(text)
|
||||||
|
expected = []
|
||||||
|
for link in wikicode.filter_wikilinks():
|
||||||
|
title = str(link.title).strip()
|
||||||
|
display_text = str(link.text).strip() if link.text else None
|
||||||
|
expected.append({"title": title, "text": display_text})
|
||||||
|
|
||||||
|
actual = list(row["wikilinks"])
|
||||||
|
# Convert to comparable format (pandas may read as dicts or named tuples)
|
||||||
|
actual_dicts = [{"title": item["title"], "text": item["text"]} for item in actual]
|
||||||
|
assert actual_dicts == expected, f"Row {idx}: wikilinks mismatch"
|
||||||
|
|
||||||
|
print(f"Wikilinks test passed! {len(test)} rows processed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_templates():
|
||||||
|
"""Test that --templates extracts templates correctly."""
|
||||||
|
import mwparserfromhell
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "templates", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester.call_wikiq("--templates", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify templates column exists
|
||||||
|
assert "templates" in test.columns, "templates column should exist"
|
||||||
|
|
||||||
|
# Verify column has list/array type
|
||||||
|
assert test["templates"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||||
|
|
||||||
|
# Verify extraction matches mwparserfromhell for sample rows
|
||||||
|
rows_with_templates = test[test["templates"].apply(lambda x: x is not None and len(x) > 0)]
|
||||||
|
if len(rows_with_templates) > 0:
|
||||||
|
sample = rows_with_templates.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
wikicode = mwparserfromhell.parse(text)
|
||||||
|
expected = []
|
||||||
|
for template in wikicode.filter_templates():
|
||||||
|
name = str(template.name).strip()
|
||||||
|
params = {}
|
||||||
|
for param in template.params:
|
||||||
|
param_name = str(param.name).strip()
|
||||||
|
param_value = str(param.value).strip()
|
||||||
|
params[param_name] = param_value
|
||||||
|
expected.append({"name": name, "params": params})
|
||||||
|
|
||||||
|
actual = list(row["templates"])
|
||||||
|
# Convert to comparable format
|
||||||
|
actual_list = []
|
||||||
|
for item in actual:
|
||||||
|
actual_list.append({
|
||||||
|
"name": item["name"],
|
||||||
|
"params": dict(item["params"]) if item["params"] else {}
|
||||||
|
})
|
||||||
|
assert actual_list == expected, f"Row {idx}: templates mismatch"
|
||||||
|
|
||||||
|
print(f"Templates test passed! {len(test)} rows processed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_headings():
|
||||||
|
"""Test that --headings extracts section headings correctly."""
|
||||||
|
import mwparserfromhell
|
||||||
|
|
||||||
|
tester = WikiqTester(SAILORMOON, "headings", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester.call_wikiq("--headings", "--text", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
# Verify headings column exists
|
||||||
|
assert "headings" in test.columns, "headings column should exist"
|
||||||
|
|
||||||
|
# Verify column has list/array type
|
||||||
|
assert test["headings"].apply(lambda x: x is None or hasattr(x, '__len__')).all()
|
||||||
|
|
||||||
|
# Verify extraction matches mwparserfromhell for sample rows
|
||||||
|
rows_with_headings = test[test["headings"].apply(lambda x: x is not None and len(x) > 0)]
|
||||||
|
if len(rows_with_headings) > 0:
|
||||||
|
sample = rows_with_headings.head(5)
|
||||||
|
for idx, row in sample.iterrows():
|
||||||
|
text = row["text"]
|
||||||
|
if text:
|
||||||
|
wikicode = mwparserfromhell.parse(text)
|
||||||
|
expected = []
|
||||||
|
for heading in wikicode.filter_headings():
|
||||||
|
level = heading.level
|
||||||
|
heading_text = str(heading.title).strip()
|
||||||
|
expected.append({"level": level, "text": heading_text})
|
||||||
|
|
||||||
|
actual = list(row["headings"])
|
||||||
|
# Convert to comparable format
|
||||||
|
actual_list = [{"level": item["level"], "text": item["text"]} for item in actual]
|
||||||
|
assert actual_list == expected, f"Row {idx}: headings mismatch"
|
||||||
|
|
||||||
|
print(f"Headings test passed! {len(test)} rows processed")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
Binary file not shown.
5
test/conftest.py
Normal file
5
test/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Add the test directory to Python path so test utilities can be imported
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
466
test/test_resume.py
Normal file
466
test/test_resume.py
Normal file
@@ -0,0 +1,466 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pyarrow as pa
|
||||||
|
import pyarrow.dataset as ds
|
||||||
|
import pyarrow.parquet as pq
|
||||||
|
import pytest
|
||||||
|
from pandas.testing import assert_frame_equal
|
||||||
|
|
||||||
|
from wikiq.resume import (
|
||||||
|
cleanup_interrupted_resume,
|
||||||
|
get_checkpoint_path,
|
||||||
|
get_resume_point,
|
||||||
|
merge_parquet_files,
|
||||||
|
)
|
||||||
|
from wikiq_test_utils import (
|
||||||
|
SAILORMOON,
|
||||||
|
TEST_DIR,
|
||||||
|
TEST_OUTPUT_DIR,
|
||||||
|
WIKIQ,
|
||||||
|
WikiqTester,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume():
|
||||||
|
"""Test that --resume properly resumes processing from the last written revid."""
|
||||||
|
tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_full.call_wikiq("--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
||||||
|
full_table = pq.read_table(full_output_path)
|
||||||
|
|
||||||
|
middle_idx = len(full_table) // 2
|
||||||
|
resume_revid = full_table.column("revid")[middle_idx].as_py()
|
||||||
|
|
||||||
|
print(f"Total revisions: {len(full_table)}, Resume point: {middle_idx}, Resume revid: {resume_revid}")
|
||||||
|
|
||||||
|
tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="parquet")
|
||||||
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
partial_table = full_table.slice(0, middle_idx + 1)
|
||||||
|
pq.write_table(partial_table, partial_output_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_partial.call_wikiq("--fandom-2020", "--resume")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
resumed_table = pq.read_table(partial_output_path)
|
||||||
|
|
||||||
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
|
||||||
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
||||||
|
|
||||||
|
print(f"Resume test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_with_diff():
|
||||||
|
"""Test that --resume works correctly with diff computation."""
|
||||||
|
tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_full.call_wikiq("--diff", "--fandom-2020")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
||||||
|
full_table = pq.read_table(full_output_path)
|
||||||
|
|
||||||
|
resume_idx = len(full_table) // 3
|
||||||
|
resume_revid = full_table.column("revid")[resume_idx].as_py()
|
||||||
|
|
||||||
|
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
|
||||||
|
|
||||||
|
tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="parquet")
|
||||||
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
partial_table = full_table.slice(0, resume_idx + 1)
|
||||||
|
pq.write_table(partial_table, partial_output_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
resumed_table = pq.read_table(partial_output_path)
|
||||||
|
|
||||||
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
|
||||||
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
||||||
|
|
||||||
|
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_with_partition_namespaces():
|
||||||
|
"""Test that --resume works correctly with --partition-namespaces.
|
||||||
|
|
||||||
|
Interrupts wikiq partway through processing, then resumes and verifies
|
||||||
|
the result matches an uninterrupted run. Uses --flush-per-batch to ensure
|
||||||
|
data is written to disk after each batch, making interruption deterministic.
|
||||||
|
"""
|
||||||
|
full_dir = os.path.join(TEST_OUTPUT_DIR, "resume_full")
|
||||||
|
partial_dir = os.path.join(TEST_OUTPUT_DIR, "resume_partial")
|
||||||
|
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||||
|
|
||||||
|
for output_dir in [full_dir, partial_dir]:
|
||||||
|
if os.path.exists(output_dir):
|
||||||
|
shutil.rmtree(output_dir)
|
||||||
|
os.makedirs(output_dir)
|
||||||
|
|
||||||
|
full_output = os.path.join(full_dir, f"{SAILORMOON}.parquet")
|
||||||
|
partial_output = os.path.join(partial_dir, f"{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
cmd_full = f"{WIKIQ} {input_file} -o {full_output} --batch-size 10 --partition-namespaces"
|
||||||
|
try:
|
||||||
|
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
full_dataset = ds.dataset(full_output, format="parquet", partitioning="hive")
|
||||||
|
full_df = full_dataset.to_table().to_pandas()
|
||||||
|
total_rows = len(full_df)
|
||||||
|
print(f"Full run produced {total_rows} rows")
|
||||||
|
|
||||||
|
batch_size = 10
|
||||||
|
cmd_partial = [
|
||||||
|
sys.executable, WIKIQ, input_file,
|
||||||
|
"-o", partial_output,
|
||||||
|
"--batch-size", str(batch_size),
|
||||||
|
"--partition-namespaces"
|
||||||
|
]
|
||||||
|
print(f"Starting: {' '.join(cmd_partial)}")
|
||||||
|
|
||||||
|
proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE)
|
||||||
|
|
||||||
|
interrupt_delay = 5
|
||||||
|
time.sleep(interrupt_delay)
|
||||||
|
|
||||||
|
if proc.poll() is not None:
|
||||||
|
pytest.fail(f"wikiq completed in {interrupt_delay}s before we could interrupt")
|
||||||
|
|
||||||
|
print(f"Sending SIGUSR1 after {interrupt_delay}s")
|
||||||
|
proc.send_signal(signal.SIGUSR1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
proc.wait(timeout=5)
|
||||||
|
print("Process exited gracefully after SIGUSR1")
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
print("Sending SIGTERM after SIGUSR1 timeout")
|
||||||
|
proc.send_signal(signal.SIGTERM)
|
||||||
|
proc.wait(timeout=30)
|
||||||
|
|
||||||
|
interrupted_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
|
||||||
|
interrupted_rows = interrupted_dataset.count_rows()
|
||||||
|
print(f"Interrupted run wrote {interrupted_rows} rows")
|
||||||
|
|
||||||
|
assert interrupted_rows < total_rows, \
|
||||||
|
f"Process wrote all {interrupted_rows} rows before being killed"
|
||||||
|
|
||||||
|
cmd_resume = f"{WIKIQ} {input_file} -o {partial_output} --batch-size {batch_size} --partition-namespaces --resume"
|
||||||
|
try:
|
||||||
|
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
resumed_dataset = ds.dataset(partial_output, format="parquet", partitioning="hive")
|
||||||
|
resumed_df = resumed_dataset.to_table().to_pandas()
|
||||||
|
|
||||||
|
full_revids = set(full_df['revid'])
|
||||||
|
resumed_revids = set(resumed_df['revid'])
|
||||||
|
missing_revids = full_revids - resumed_revids
|
||||||
|
extra_revids = resumed_revids - full_revids
|
||||||
|
assert missing_revids == set() and extra_revids == set(), \
|
||||||
|
f"Revision ID mismatch: {len(missing_revids)} missing, {len(extra_revids)} extra. Missing: {sorted(missing_revids)[:10]}"
|
||||||
|
assert len(resumed_df) == len(full_df), \
|
||||||
|
f"Row count mismatch: {len(resumed_df)} vs {len(full_df)}"
|
||||||
|
|
||||||
|
print(f"Resume test passed! Full: {len(full_df)}, Interrupted: {interrupted_rows}, Resumed: {len(resumed_df)}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_file_not_found():
|
||||||
|
"""Test that --resume starts fresh when output file doesn't exist."""
|
||||||
|
tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
expected_output = os.path.join(tester.output, f"{SAILORMOON}.parquet")
|
||||||
|
if os.path.exists(expected_output):
|
||||||
|
os.remove(expected_output)
|
||||||
|
|
||||||
|
# Should succeed by starting fresh
|
||||||
|
tester.call_wikiq("--resume")
|
||||||
|
|
||||||
|
# Verify output was created
|
||||||
|
assert os.path.exists(expected_output), "Output file should be created when starting fresh"
|
||||||
|
table = pq.read_table(expected_output)
|
||||||
|
assert table.num_rows > 0, "Output should have data"
|
||||||
|
|
||||||
|
print("Resume file not found test passed - started fresh!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_simple():
|
||||||
|
"""Test that --resume works without --fandom-2020 and --partition-namespaces."""
|
||||||
|
tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="parquet")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_full.call_wikiq()
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
full_output_path = os.path.join(tester_full.output, f"{SAILORMOON}.parquet")
|
||||||
|
full_table = pq.read_table(full_output_path)
|
||||||
|
|
||||||
|
resume_idx = len(full_table) // 3
|
||||||
|
resume_revid = full_table.column("revid")[resume_idx].as_py()
|
||||||
|
|
||||||
|
print(f"Total revisions: {len(full_table)}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
|
||||||
|
|
||||||
|
tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="parquet")
|
||||||
|
partial_output_path = os.path.join(tester_partial.output, f"{SAILORMOON}.parquet")
|
||||||
|
|
||||||
|
partial_table = full_table.slice(0, resume_idx + 1)
|
||||||
|
pq.write_table(partial_table, partial_output_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
tester_partial.call_wikiq("--resume")
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
pytest.fail(exc.stderr.decode("utf8"))
|
||||||
|
|
||||||
|
resumed_table = pq.read_table(partial_output_path)
|
||||||
|
|
||||||
|
resumed_df = resumed_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
full_df = full_table.to_pandas().sort_values("revid").reset_index(drop=True)
|
||||||
|
|
||||||
|
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
|
||||||
|
|
||||||
|
print(f"Resume simple test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_merge_with_invalid_temp_file():
|
||||||
|
"""Test that resume handles invalid/empty temp files gracefully.
|
||||||
|
|
||||||
|
This can happen when a namespace has no records after the resume point,
|
||||||
|
resulting in a temp file that was created but never written to.
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
original_path = os.path.join(tmpdir, "original.parquet")
|
||||||
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
||||||
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
||||||
|
|
||||||
|
table = pa.table({"articleid": [1, 2, 3], "revid": [10, 20, 30]})
|
||||||
|
pq.write_table(table, original_path)
|
||||||
|
|
||||||
|
with open(temp_path, 'w') as f:
|
||||||
|
f.write("")
|
||||||
|
|
||||||
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
||||||
|
assert result == "original_only", f"Expected 'original_only' when temp file is invalid, got {result}"
|
||||||
|
|
||||||
|
assert os.path.exists(original_path), "Original file should still exist"
|
||||||
|
original_table = pq.read_table(original_path)
|
||||||
|
assert len(original_table) == 3, "Original file should be unchanged"
|
||||||
|
|
||||||
|
assert not os.path.exists(merged_path), "Merged file should not be created"
|
||||||
|
|
||||||
|
print("Resume merge with invalid temp file test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_merge_with_corrupted_original():
|
||||||
|
"""Test that resume recovers from a corrupted original file if temp is valid.
|
||||||
|
|
||||||
|
This can happen if the original file was being written when the process
|
||||||
|
was killed, leaving it in a corrupted state.
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
original_path = os.path.join(tmpdir, "original.parquet")
|
||||||
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
||||||
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
||||||
|
|
||||||
|
with open(original_path, 'w') as f:
|
||||||
|
f.write("corrupted data")
|
||||||
|
|
||||||
|
table = pa.table({"articleid": [4, 5, 6], "revid": [40, 50, 60]})
|
||||||
|
pq.write_table(table, temp_path)
|
||||||
|
|
||||||
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
||||||
|
assert result == "temp_only", f"Expected 'temp_only' when original is corrupted, got {result}"
|
||||||
|
|
||||||
|
assert not os.path.exists(merged_path), "Merged file should not be created for temp_only case"
|
||||||
|
|
||||||
|
print("Resume merge with corrupted original test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_resume_merge_both_invalid():
|
||||||
|
"""Test that resume handles both files being invalid."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
original_path = os.path.join(tmpdir, "original.parquet")
|
||||||
|
temp_path = os.path.join(tmpdir, "temp.parquet")
|
||||||
|
merged_path = os.path.join(tmpdir, "merged.parquet")
|
||||||
|
|
||||||
|
with open(original_path, 'w') as f:
|
||||||
|
f.write("corrupted original")
|
||||||
|
|
||||||
|
with open(temp_path, 'w') as f:
|
||||||
|
f.write("corrupted temp")
|
||||||
|
|
||||||
|
result = merge_parquet_files(original_path, temp_path, merged_path)
|
||||||
|
assert result == "both_invalid", f"Expected 'both_invalid' when both files corrupted, got {result}"
|
||||||
|
|
||||||
|
print("Resume merge with both invalid test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_interrupted_resume_both_corrupted():
|
||||||
|
"""Test that cleanup_interrupted_resume returns 'start_fresh' when both files are corrupted."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
output_file = os.path.join(tmpdir, "output.parquet")
|
||||||
|
temp_file = output_file + ".resume_temp"
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
|
||||||
|
|
||||||
|
with open(output_file, 'w') as f:
|
||||||
|
f.write("corrupted original")
|
||||||
|
|
||||||
|
with open(temp_file, 'w') as f:
|
||||||
|
f.write("corrupted temp")
|
||||||
|
|
||||||
|
with open(checkpoint_path, 'w') as f:
|
||||||
|
json.dump({"pageid": 100, "revid": 200}, f)
|
||||||
|
|
||||||
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
||||||
|
assert result == "start_fresh", f"Expected 'start_fresh', got {result}"
|
||||||
|
|
||||||
|
assert not os.path.exists(output_file), "Corrupted original should be deleted"
|
||||||
|
assert not os.path.exists(temp_file), "Corrupted temp should be deleted"
|
||||||
|
assert not os.path.exists(checkpoint_path), "Stale checkpoint should be deleted"
|
||||||
|
|
||||||
|
print("Cleanup interrupted resume with both corrupted test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_interrupted_resume_original_corrupted_temp_valid():
|
||||||
|
"""Test that cleanup recovers from temp when original is corrupted."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
output_file = os.path.join(tmpdir, "output.parquet")
|
||||||
|
temp_file = output_file + ".resume_temp"
|
||||||
|
|
||||||
|
with open(output_file, 'w') as f:
|
||||||
|
f.write("corrupted original")
|
||||||
|
|
||||||
|
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
|
||||||
|
pq.write_table(table, temp_file)
|
||||||
|
|
||||||
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
||||||
|
assert result is None, f"Expected None (normal recovery), got {result}"
|
||||||
|
|
||||||
|
assert os.path.exists(output_file), "Output file should exist after recovery"
|
||||||
|
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
|
||||||
|
|
||||||
|
recovered_table = pq.read_table(output_file)
|
||||||
|
assert len(recovered_table) == 3, "Recovered file should have 3 rows"
|
||||||
|
|
||||||
|
resume_point = get_resume_point(output_file, partition_namespaces=False)
|
||||||
|
assert resume_point is not None, "Should find resume point from recovered file"
|
||||||
|
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
|
||||||
|
|
||||||
|
print("Cleanup with original corrupted, temp valid test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_cleanup_original_missing_temp_valid_no_checkpoint():
|
||||||
|
"""Test recovery when original is missing, temp is valid, and no checkpoint exists."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
output_file = os.path.join(tmpdir, "output.parquet")
|
||||||
|
temp_file = output_file + ".resume_temp"
|
||||||
|
checkpoint_path = get_checkpoint_path(output_file, partition_namespaces=False)
|
||||||
|
|
||||||
|
assert not os.path.exists(output_file)
|
||||||
|
|
||||||
|
table = pa.table({"articleid": [10, 20, 30], "revid": [100, 200, 300]})
|
||||||
|
pq.write_table(table, temp_file)
|
||||||
|
|
||||||
|
assert not os.path.exists(checkpoint_path)
|
||||||
|
|
||||||
|
result = cleanup_interrupted_resume(output_file, partition_namespaces=False)
|
||||||
|
assert result is None, f"Expected None (normal recovery), got {result}"
|
||||||
|
|
||||||
|
assert os.path.exists(output_file), "Output file should exist after recovery"
|
||||||
|
assert not os.path.exists(temp_file), "Temp file should be renamed to output"
|
||||||
|
|
||||||
|
resume_point = get_resume_point(output_file, partition_namespaces=False)
|
||||||
|
assert resume_point is not None, "Should find resume point from recovered file"
|
||||||
|
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
|
||||||
|
|
||||||
|
print("Original missing, temp valid, no checkpoint test passed!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_jobs_different_input_files():
|
||||||
|
"""Test that merge only processes temp files for the current input file.
|
||||||
|
|
||||||
|
When multiple wikiq processes write to the same partitioned output directory
|
||||||
|
with different input files, each process should only merge its own temp files.
|
||||||
|
"""
|
||||||
|
from wikiq.resume import merge_partitioned_namespaces
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
# Create partitioned output structure
|
||||||
|
ns0_dir = os.path.join(tmpdir, "namespace=0")
|
||||||
|
ns1_dir = os.path.join(tmpdir, "namespace=1")
|
||||||
|
os.makedirs(ns0_dir)
|
||||||
|
os.makedirs(ns1_dir)
|
||||||
|
|
||||||
|
# Simulate two different input files producing output
|
||||||
|
file1 = "enwiki-20250123-pages-meta-history24-p1p100.parquet"
|
||||||
|
file2 = "enwiki-20250123-pages-meta-history24-p101p200.parquet"
|
||||||
|
|
||||||
|
# Create original and temp files for file1
|
||||||
|
table1_orig = pa.table({"articleid": [1, 2], "revid": [10, 20]})
|
||||||
|
table1_temp = pa.table({"articleid": [3, 4], "revid": [30, 40]})
|
||||||
|
pq.write_table(table1_orig, os.path.join(ns0_dir, file1))
|
||||||
|
pq.write_table(table1_temp, os.path.join(ns0_dir, file1 + ".resume_temp"))
|
||||||
|
pq.write_table(table1_orig, os.path.join(ns1_dir, file1))
|
||||||
|
pq.write_table(table1_temp, os.path.join(ns1_dir, file1 + ".resume_temp"))
|
||||||
|
|
||||||
|
# Create original and temp files for file2 (simulating another concurrent job)
|
||||||
|
table2_orig = pa.table({"articleid": [100, 200], "revid": [1000, 2000]})
|
||||||
|
table2_temp = pa.table({"articleid": [300, 400], "revid": [3000, 4000]})
|
||||||
|
pq.write_table(table2_orig, os.path.join(ns0_dir, file2))
|
||||||
|
pq.write_table(table2_temp, os.path.join(ns0_dir, file2 + ".resume_temp"))
|
||||||
|
pq.write_table(table2_orig, os.path.join(ns1_dir, file2))
|
||||||
|
pq.write_table(table2_temp, os.path.join(ns1_dir, file2 + ".resume_temp"))
|
||||||
|
|
||||||
|
# Merge only file1's temp files
|
||||||
|
merge_partitioned_namespaces(tmpdir, ".resume_temp", file1)
|
||||||
|
|
||||||
|
# Verify file1's temp files were merged and removed
|
||||||
|
assert not os.path.exists(os.path.join(ns0_dir, file1 + ".resume_temp")), \
|
||||||
|
"file1 temp should be merged in ns0"
|
||||||
|
assert not os.path.exists(os.path.join(ns1_dir, file1 + ".resume_temp")), \
|
||||||
|
"file1 temp should be merged in ns1"
|
||||||
|
|
||||||
|
# Verify file1's original now has merged data
|
||||||
|
merged1_ns0 = pq.read_table(os.path.join(ns0_dir, file1))
|
||||||
|
merged1_ns1 = pq.read_table(os.path.join(ns1_dir, file1))
|
||||||
|
assert merged1_ns0.num_rows == 4, f"file1 ns0 should have 4 rows after merge, got {merged1_ns0.num_rows}"
|
||||||
|
assert merged1_ns1.num_rows == 4, f"file1 ns1 should have 4 rows after merge, got {merged1_ns1.num_rows}"
|
||||||
|
|
||||||
|
# Verify file2's temp files are UNTOUCHED (still exist)
|
||||||
|
assert os.path.exists(os.path.join(ns0_dir, file2 + ".resume_temp")), \
|
||||||
|
"file2 temp should NOT be touched in ns0"
|
||||||
|
assert os.path.exists(os.path.join(ns1_dir, file2 + ".resume_temp")), \
|
||||||
|
"file2 temp should NOT be touched in ns1"
|
||||||
|
|
||||||
|
# Verify file2's original is unchanged
|
||||||
|
orig2_ns0 = pq.read_table(os.path.join(ns0_dir, file2))
|
||||||
|
orig2_ns1 = pq.read_table(os.path.join(ns1_dir, file2))
|
||||||
|
assert orig2_ns0.num_rows == 2, "file2 ns0 should still have 2 rows"
|
||||||
|
assert orig2_ns1.num_rows == 2, "file2 ns1 should still have 2 rows"
|
||||||
|
|
||||||
|
print("Concurrent jobs with different input files test passed!")
|
||||||
75
test/wikiq_test_utils.py
Normal file
75
test/wikiq_test_utils.py
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
from typing import Final, Union
|
||||||
|
|
||||||
|
TEST_DIR: Final[str] = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
WIKIQ: Final[str] = os.path.join(os.path.join(TEST_DIR, ".."), "src/wikiq/__init__.py")
|
||||||
|
TEST_OUTPUT_DIR: Final[str] = os.path.join(TEST_DIR, "test_output")
|
||||||
|
BASELINE_DIR: Final[str] = os.path.join(TEST_DIR, "baseline_output")
|
||||||
|
|
||||||
|
IKWIKI: Final[str] = "ikwiki-20180301-pages-meta-history"
|
||||||
|
SAILORMOON: Final[str] = "sailormoon"
|
||||||
|
TWINPEAKS: Final[str] = "twinpeaks"
|
||||||
|
REGEXTEST: Final[str] = "regextest"
|
||||||
|
|
||||||
|
|
||||||
|
class WikiqTester:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
wiki: str,
|
||||||
|
case_name: str,
|
||||||
|
suffix: Union[str, None] = None,
|
||||||
|
in_compression: str = "bz2",
|
||||||
|
baseline_format: str = "tsv",
|
||||||
|
out_format: str = "tsv",
|
||||||
|
):
|
||||||
|
self.input_file = os.path.join(
|
||||||
|
TEST_DIR, "dumps", "{0}.xml.{1}".format(wiki, in_compression)
|
||||||
|
)
|
||||||
|
|
||||||
|
basename = "{0}_{1}".format(case_name, wiki)
|
||||||
|
if suffix:
|
||||||
|
basename = "{0}_{1}".format(basename, suffix)
|
||||||
|
|
||||||
|
self.output = os.path.join(
|
||||||
|
TEST_OUTPUT_DIR, "{0}.{1}".format(basename, out_format)
|
||||||
|
)
|
||||||
|
|
||||||
|
if os.path.exists(self.output):
|
||||||
|
if os.path.isfile(self.output):
|
||||||
|
os.remove(self.output)
|
||||||
|
else:
|
||||||
|
shutil.rmtree(self.output)
|
||||||
|
|
||||||
|
if out_format == "parquet":
|
||||||
|
os.makedirs(self.output, exist_ok=True)
|
||||||
|
|
||||||
|
if suffix is None:
|
||||||
|
self.wikiq_baseline_name = "{0}.{1}".format(wiki, baseline_format)
|
||||||
|
self.wikiq_out_name = "{0}.{1}".format(wiki, out_format)
|
||||||
|
else:
|
||||||
|
self.wikiq_baseline_name = "{0}_{1}.{2}".format(
|
||||||
|
wiki, suffix, baseline_format
|
||||||
|
)
|
||||||
|
self.wikiq_out_name = "{0}_{1}.{2}".format(wiki, suffix, out_format)
|
||||||
|
|
||||||
|
if case_name is not None:
|
||||||
|
self.baseline_file = os.path.join(
|
||||||
|
BASELINE_DIR, "{0}_{1}".format(case_name, self.wikiq_baseline_name)
|
||||||
|
)
|
||||||
|
|
||||||
|
def call_wikiq(self, *args: str, out: bool = True):
|
||||||
|
"""
|
||||||
|
Calls wikiq with the passed arguments on the input file relevant to the test.
|
||||||
|
:param args: The command line arguments to pass to wikiq.
|
||||||
|
:param out: Whether to pass an output argument to wikiq.
|
||||||
|
:return: The output of the wikiq call.
|
||||||
|
"""
|
||||||
|
if out:
|
||||||
|
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
|
||||||
|
else:
|
||||||
|
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
|
||||||
|
|
||||||
|
print(call)
|
||||||
|
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
|
||||||
Reference in New Issue
Block a user