From 2c5442572648209a8d549f969eb57705620a91e7 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Sat, 13 Dec 2025 14:29:16 -0800 Subject: [PATCH] use the wikidiff2 diff timeout instead of async. --- src/wikiq/__init__.py | 60 ++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 776ff75..3bfbcf3 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -10,6 +10,7 @@ import os.path import re import signal import sys +import threading import time from collections import deque from hashlib import sha1 @@ -25,7 +26,6 @@ import pywikidiff2 from deltas.tokenizers import wikitext_split from more_itertools import ichunked from mwxml import Dump -import asyncio import wikiq.tables as tables from wikiq.tables import RevisionTable from wikiq.wiki_diff_matcher import WikiDiffMatcher @@ -40,7 +40,7 @@ from wikiq.resume import ( TO_ENCODE = ("title", "editor") PERSISTENCE_RADIUS = 7 -DIFF_TIMEOUT = 60 +DIFF_TIMEOUT_MS = 60000 from pathlib import Path import pyarrow as pa @@ -57,17 +57,10 @@ class PersistMethod: wikidiff2 = 4 -async def diff_async(differ, last_text, text): - """Returns (result, timed_out) tuple.""" - try: - loop = asyncio.get_running_loop() - result = await asyncio.wait_for( - asyncio.to_thread(differ.inline_json_diff, last_text, text), - timeout=DIFF_TIMEOUT - ) - return result, False - except TimeoutError as e: - return None, True +def diff_with_timeout(differ, last_text, text): + """Returns (result, timed_out) tuple using native pywikidiff2 timeout.""" + result = differ.inline_json_diff(last_text, text, timeout_ms=DIFF_TIMEOUT_MS) + return result, differ.timed_out() def calculate_persistence(tokens_added): @@ -286,7 +279,6 @@ class WikiqParser: self.headings = headings self.shutdown_requested = False self.time_limit_seconds = time_limit_seconds - self.start_time = None if namespaces is not None: self.namespace_filter = set(namespaces) else: @@ -323,15 +315,25 @@ class WikiqParser: """Request graceful shutdown. The process() method will exit after completing the current batch.""" self.shutdown_requested = True - def _check_time_limit(self): - """Check if the time limit has been exceeded. If so, request shutdown.""" - if self.time_limit_seconds is None or self.start_time is None: - return - elapsed = time.time() - self.start_time - if elapsed >= self.time_limit_seconds: - hours = elapsed / 3600 - print(f"Time limit of {self.time_limit_seconds/3600:.2f} hours reached ({hours:.2f} hours elapsed), requesting shutdown...", file=sys.stderr) - self.request_shutdown() + def _time_limit_expired(self): + """Timer callback when time limit is reached.""" + hours = self.time_limit_seconds / 3600 + print(f"Time limit of {hours:.2f} hours reached, requesting shutdown...", file=sys.stderr) + self.request_shutdown() + + def _start_time_limit_timer(self): + """Start a background timer to trigger shutdown when time limit is reached.""" + if self.time_limit_seconds is None: + return None + timer = threading.Timer(self.time_limit_seconds, self._time_limit_expired) + timer.daemon = True + timer.start() + return timer + + def _cancel_time_limit_timer(self, timer): + """Cancel the time limit timer if it's still running.""" + if timer is not None: + timer.cancel() def _open_checkpoint(self, output_file): """Open checkpoint file for writing. Keeps file open for performance.""" @@ -438,8 +440,8 @@ class WikiqParser: return default_ns def process(self): - # Record start time for time limit checking - self.start_time = time.time() + # Start time limit timer if configured + time_limit_timer = self._start_time_limit_timer() # Track whether we've passed the resume point # For partitioned output, this is a dict mapping namespace -> bool @@ -753,9 +755,6 @@ class WikiqParser: if self.shutdown_requested: break - # Check time limit after each batch - self._check_time_limit() - # If shutdown requested, skip all remaining processing and close writers if self.shutdown_requested: print("Shutdown requested, closing writers...", file=sys.stderr) @@ -876,7 +875,7 @@ class WikiqParser: new_diffs = [] diff_timeouts = [] for i, text in enumerate(row_buffer["text"]): - diff, timed_out = asyncio.run(diff_async(differ, last_text, text)) + diff, timed_out = diff_with_timeout(differ, last_text, text) if timed_out: print(f"WARNING! wikidiff2 timeout for rev: {row_buffer['revid'][i]}. Falling back to default limits.", file=sys.stderr) diff = fast_differ.inline_json_diff(last_text, text) @@ -940,6 +939,9 @@ class WikiqParser: break page_count += 1 + # Cancel time limit timer + self._cancel_time_limit_timer(time_limit_timer) + print( "Done: %s revisions and %s pages." % (rev_count, page_count), file=sys.stderr,