use the wikidiff2 diff timeout instead of async.

This commit is contained in:
Nathan TeBlunthuis 2025-12-13 14:29:16 -08:00
parent 5d1a246898
commit 2c54425726

View File

@ -10,6 +10,7 @@ import os.path
import re import re
import signal import signal
import sys import sys
import threading
import time import time
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
@ -25,7 +26,6 @@ import pywikidiff2
from deltas.tokenizers import wikitext_split from deltas.tokenizers import wikitext_split
from more_itertools import ichunked from more_itertools import ichunked
from mwxml import Dump from mwxml import Dump
import asyncio
import wikiq.tables as tables import wikiq.tables as tables
from wikiq.tables import RevisionTable from wikiq.tables import RevisionTable
from wikiq.wiki_diff_matcher import WikiDiffMatcher from wikiq.wiki_diff_matcher import WikiDiffMatcher
@ -40,7 +40,7 @@ from wikiq.resume import (
TO_ENCODE = ("title", "editor") TO_ENCODE = ("title", "editor")
PERSISTENCE_RADIUS = 7 PERSISTENCE_RADIUS = 7
DIFF_TIMEOUT = 60 DIFF_TIMEOUT_MS = 60000
from pathlib import Path from pathlib import Path
import pyarrow as pa import pyarrow as pa
@ -57,17 +57,10 @@ class PersistMethod:
wikidiff2 = 4 wikidiff2 = 4
async def diff_async(differ, last_text, text): def diff_with_timeout(differ, last_text, text):
"""Returns (result, timed_out) tuple.""" """Returns (result, timed_out) tuple using native pywikidiff2 timeout."""
try: result = differ.inline_json_diff(last_text, text, timeout_ms=DIFF_TIMEOUT_MS)
loop = asyncio.get_running_loop() return result, differ.timed_out()
result = await asyncio.wait_for(
asyncio.to_thread(differ.inline_json_diff, last_text, text),
timeout=DIFF_TIMEOUT
)
return result, False
except TimeoutError as e:
return None, True
def calculate_persistence(tokens_added): def calculate_persistence(tokens_added):
@ -286,7 +279,6 @@ class WikiqParser:
self.headings = headings self.headings = headings
self.shutdown_requested = False self.shutdown_requested = False
self.time_limit_seconds = time_limit_seconds self.time_limit_seconds = time_limit_seconds
self.start_time = None
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
else: else:
@ -323,15 +315,25 @@ class WikiqParser:
"""Request graceful shutdown. The process() method will exit after completing the current batch.""" """Request graceful shutdown. The process() method will exit after completing the current batch."""
self.shutdown_requested = True self.shutdown_requested = True
def _check_time_limit(self): def _time_limit_expired(self):
"""Check if the time limit has been exceeded. If so, request shutdown.""" """Timer callback when time limit is reached."""
if self.time_limit_seconds is None or self.start_time is None: hours = self.time_limit_seconds / 3600
return print(f"Time limit of {hours:.2f} hours reached, requesting shutdown...", file=sys.stderr)
elapsed = time.time() - self.start_time self.request_shutdown()
if elapsed >= self.time_limit_seconds:
hours = elapsed / 3600 def _start_time_limit_timer(self):
print(f"Time limit of {self.time_limit_seconds/3600:.2f} hours reached ({hours:.2f} hours elapsed), requesting shutdown...", file=sys.stderr) """Start a background timer to trigger shutdown when time limit is reached."""
self.request_shutdown() if self.time_limit_seconds is None:
return None
timer = threading.Timer(self.time_limit_seconds, self._time_limit_expired)
timer.daemon = True
timer.start()
return timer
def _cancel_time_limit_timer(self, timer):
"""Cancel the time limit timer if it's still running."""
if timer is not None:
timer.cancel()
def _open_checkpoint(self, output_file): def _open_checkpoint(self, output_file):
"""Open checkpoint file for writing. Keeps file open for performance.""" """Open checkpoint file for writing. Keeps file open for performance."""
@ -438,8 +440,8 @@ class WikiqParser:
return default_ns return default_ns
def process(self): def process(self):
# Record start time for time limit checking # Start time limit timer if configured
self.start_time = time.time() time_limit_timer = self._start_time_limit_timer()
# Track whether we've passed the resume point # Track whether we've passed the resume point
# For partitioned output, this is a dict mapping namespace -> bool # For partitioned output, this is a dict mapping namespace -> bool
@ -753,9 +755,6 @@ class WikiqParser:
if self.shutdown_requested: if self.shutdown_requested:
break break
# Check time limit after each batch
self._check_time_limit()
# If shutdown requested, skip all remaining processing and close writers # If shutdown requested, skip all remaining processing and close writers
if self.shutdown_requested: if self.shutdown_requested:
print("Shutdown requested, closing writers...", file=sys.stderr) print("Shutdown requested, closing writers...", file=sys.stderr)
@ -876,7 +875,7 @@ class WikiqParser:
new_diffs = [] new_diffs = []
diff_timeouts = [] diff_timeouts = []
for i, text in enumerate(row_buffer["text"]): for i, text in enumerate(row_buffer["text"]):
diff, timed_out = asyncio.run(diff_async(differ, last_text, text)) diff, timed_out = diff_with_timeout(differ, last_text, text)
if timed_out: if timed_out:
print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr) print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr)
diff = fast_differ.inline_json_diff(last_text, text) diff = fast_differ.inline_json_diff(last_text, text)
@ -940,6 +939,9 @@ class WikiqParser:
break break
page_count += 1 page_count += 1
# Cancel time limit timer
self._cancel_time_limit_timer(time_limit_timer)
print( print(
"Done: %s revisions and %s pages." % (rev_count, page_count), "Done: %s revisions and %s pages." % (rev_count, page_count),
file=sys.stderr, file=sys.stderr,