save work after a time limit.

This commit is contained in:
Nathan TeBlunthuis 2025-12-11 08:30:32 -08:00
parent 1001c780fa
commit 70a10db228

View File

@ -10,6 +10,7 @@ import os.path
import re import re
import signal import signal
import sys import sys
import time
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from io import TextIOWrapper from io import TextIOWrapper
@ -258,6 +259,7 @@ class WikiqParser:
wikilinks: bool = False, wikilinks: bool = False,
templates: bool = False, templates: bool = False,
headings: bool = False, headings: bool = False,
time_limit_seconds: Union[float, None] = None,
): ):
""" """
Parameters: Parameters:
@ -283,6 +285,8 @@ class WikiqParser:
self.templates = templates self.templates = templates
self.headings = headings self.headings = headings
self.shutdown_requested = False self.shutdown_requested = False
self.time_limit_seconds = time_limit_seconds
self.start_time = None
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
else: else:
@ -319,6 +323,16 @@ class WikiqParser:
"""Request graceful shutdown. The process() method will exit after completing the current batch.""" """Request graceful shutdown. The process() method will exit after completing the current batch."""
self.shutdown_requested = True self.shutdown_requested = True
def _check_time_limit(self):
"""Check if the time limit has been exceeded. If so, request shutdown."""
if self.time_limit_seconds is None or self.start_time is None:
return
elapsed = time.time() - self.start_time
if elapsed >= self.time_limit_seconds:
hours = elapsed / 3600
print(f"Time limit of {self.time_limit_seconds/3600:.2f} hours reached ({hours:.2f} hours elapsed), requesting shutdown...", file=sys.stderr)
self.request_shutdown()
def _open_checkpoint(self, output_file): def _open_checkpoint(self, output_file):
"""Open checkpoint file for writing. Keeps file open for performance.""" """Open checkpoint file for writing. Keeps file open for performance."""
if not self.output_parquet or output_file == sys.stdout.buffer: if not self.output_parquet or output_file == sys.stdout.buffer:
@ -424,10 +438,8 @@ class WikiqParser:
return default_ns return default_ns
def process(self): def process(self):
# create a regex that creates the output filename # Record start time for time limit checking
# output_filename = re.sub(r'^.*/(enwiki\-\d+)\-.*p(\d+)p.*$', self.start_time = time.time()
# r'output/wikiq-\1-\2.tsv',
# input_filename)
# Track whether we've passed the resume point # Track whether we've passed the resume point
# For partitioned output, this is a dict mapping namespace -> bool # For partitioned output, this is a dict mapping namespace -> bool
@ -741,6 +753,9 @@ class WikiqParser:
if self.shutdown_requested: if self.shutdown_requested:
break break
# Check time limit after each batch
self._check_time_limit()
# If shutdown requested, skip all remaining processing and close writers # If shutdown requested, skip all remaining processing and close writers
if self.shutdown_requested: if self.shutdown_requested:
print("Shutdown requested, closing writers...", file=sys.stderr) print("Shutdown requested, closing writers...", file=sys.stderr)
@ -1185,6 +1200,14 @@ def main():
help="Resume processing from the last successfully written revision in the output file.", help="Resume processing from the last successfully written revision in the output file.",
) )
parser.add_argument(
"--time-limit",
dest="time_limit",
type=float,
default=0,
help="Time limit in hours before graceful shutdown. Set to 0 to disable (default).",
)
args = parser.parse_args() args = parser.parse_args()
# set persistence method # set persistence method
@ -1281,6 +1304,8 @@ def main():
print("Processing file: %s" % filename, file=sys.stderr) print("Processing file: %s" % filename, file=sys.stderr)
input_file = open_input_file(filename, args.fandom_2020) input_file = open_input_file(filename, args.fandom_2020)
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
wikiq = WikiqParser( wikiq = WikiqParser(
input_file, input_file,
output_file, output_file,
@ -1303,6 +1328,7 @@ def main():
wikilinks=args.wikilinks, wikilinks=args.wikilinks,
templates=args.templates, templates=args.templates,
headings=args.headings, headings=args.headings,
time_limit_seconds=time_limit_seconds,
) )
# Register signal handlers for graceful shutdown (CLI only) # Register signal handlers for graceful shutdown (CLI only)
@ -1314,6 +1340,7 @@ def main():
original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown) original_sigterm = signal.signal(signal.SIGTERM, handle_shutdown)
original_sigint = signal.signal(signal.SIGINT, handle_shutdown) original_sigint = signal.signal(signal.SIGINT, handle_shutdown)
original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown) original_sigusr1 = signal.signal(signal.SIGUSR1, handle_shutdown)
original_sigusr2 = signal.signal(signal.SIGUSR2, handle_shutdown)
try: try:
wikiq.process() wikiq.process()
@ -1322,6 +1349,7 @@ def main():
signal.signal(signal.SIGTERM, original_sigterm) signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint) signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1) signal.signal(signal.SIGUSR1, original_sigusr1)
signal.signal(signal.SIGUSR2, original_sigusr2)
# close things # close things
input_file.close() input_file.close()
@ -1330,6 +1358,8 @@ def main():
if args.resume: if args.resume:
print("Warning: --resume cannot be used with stdin/stdout", file=sys.stderr) print("Warning: --resume cannot be used with stdin/stdout", file=sys.stderr)
time_limit_seconds = args.time_limit * 3600 if args.time_limit > 0 else None
wikiq = WikiqParser( wikiq = WikiqParser(
sys.stdin, sys.stdin,
sys.stdout, sys.stdout,
@ -1351,6 +1381,7 @@ def main():
wikilinks=args.wikilinks, wikilinks=args.wikilinks,
templates=args.templates, templates=args.templates,
headings=args.headings, headings=args.headings,
time_limit_seconds=time_limit_seconds,
) )
# Register signal handlers for graceful shutdown (CLI only) # Register signal handlers for graceful shutdown (CLI only)