From 70a10db228a4a1c7281e0b2c13cab61dc7585030 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Thu, 11 Dec 2025 08:30:32 -0800 Subject: [PATCH] save work after a time limit. --- src/wikiq/__init__.py | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 98ef4dc..776ff75 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -10,6 +10,7 @@ import os.path import re import signal import sys +import time from collections import deque from hashlib import sha1 from io import TextIOWrapper @@ -258,6 +259,7 @@ class WikiqParser: wikilinks: bool = False, templates: bool = False, headings: bool = False, + time_limit_seconds: Union[float, None] = None, ): """ Parameters: @@ -283,6 +285,8 @@ class WikiqParser: self.templates = templates self.headings = headings self.shutdown_requested = False + self.time_limit_seconds = time_limit_seconds + self.start_time = None if namespaces is not None: self.namespace_filter = set(namespaces) else: @@ -319,6 +323,16 @@ class WikiqParser: """Request graceful shutdown. The process() method will exit after completing the current batch.""" self.shutdown_requested = True + def _check_time_limit(self): + """Check if the time limit has been exceeded. If so, request shutdown.""" + if self.time_limit_seconds is None or self.start_time is None: + return + elapsed = time.time() - self.start_time + if elapsed >= self.time_limit_seconds: + hours = elapsed / 3600 + print(f"Time limit of {self.time_limit_seconds/3600:.2f} hours reached ({hours:.2f} hours elapsed), requesting shutdown...", file=sys.stderr) + self.request_shutdown() + def _open_checkpoint(self, output_file): """Open checkpoint file for writing. Keeps file open for performance.""" if not self.output_parquet or output_file == sys.stdout.buffer: @@ -424,10 +438,8 @@ class WikiqParser: return default_ns def process(self): - # create a regex that creates the output filename - # output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$', - # r'output/wikiq-\1-\2.tsv', - # input_filename) + # Record start time for time limit checking + self.start_time = time.time() # Track whether we've passed the resume point # For partitioned output, this is a dict mapping namespace -> bool @@ -741,6 +753,9 @@ class WikiqParser: if self.shutdown_requested: break + # Check time limit after each batch + self._check_time_limit() + # If shutdown requested, skip all remaining processing and close writers if self.shutdown_requested: print("Shutdown requested, closing writers...", file=sys.stderr) @@ -1185,6 +1200,14 @@ def main(): help="Resume processing from the last successfully written revision in the output file.", ) + parser.add_argument( + "--time-limit", + dest="time_limit", + type=float, + default=0, + help="Time limit in hours before graceful shutdown. Set to 0 to disable (default).", + ) + args = parser.parse_args() # set persistence method @@ -1281,6 +1304,8 @@ def main(): print("Processing file: %s" % filename, file=sys.stderr) input_file = open_input_file(filename, args.fandom_2020) + time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None + wikiq = WikiqParser( input_file, output_file, @@ -1303,6 +1328,7 @@ def main(): wikilinks=args.wikilinks, templates=args.templates, headings=args.headings, + time_limit_seconds=time_limit_seconds, ) # Register signal handlers for graceful shutdown (CLI only) @@ -1314,6 +1340,7 @@ def main(): original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown) original_sigint = signal.signal(signal.SIGINT, handle_shutdown) original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown) + original_sigusr2 = signal.signal(signal.SIGUSR2, handle_shutdown) try: wikiq.process() @@ -1322,6 +1349,7 @@ def main(): signal.signal(signal.SIGTERM, original_sigterm) signal.signal(signal.SIGINT, original_sigint) signal.signal(signal.SIGUSR1, original_sigusr1) + signal.signal(signal.SIGUSR2, original_sigusr2) # close things input_file.close() @@ -1330,6 +1358,8 @@ def main(): if args.resume: print("Warning: --resume cannot be used with stdin/stdout", file=sys.stderr) + time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None + wikiq = WikiqParser( sys.stdin, sys.stdout, @@ -1351,6 +1381,7 @@ def main(): wikilinks=args.wikilinks, templates=args.templates, headings=args.headings, + time_limit_seconds=time_limit_seconds, ) # Register signal handlers for graceful shutdown (CLI only)