import json import os import shutil import signal import subprocess import sys import tempfile import time import pytest from wikiq.resume import ( get_checkpoint_path, read_checkpoint, ) from wikiq_test_utils import ( SAILORMOON, TEST_DIR, TEST_OUTPUT_DIR, WIKIQ, WikiqTester, ) def read_jsonl(filepath): """Read JSONL file and return list of dicts.""" rows = [] with open(filepath, 'r') as f: for line in f: if line.strip(): rows.append(json.loads(line)) return rows def test_resume(): """Test that --resume properly resumes processing from the last checkpoint.""" import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) middle_idx = len(full_rows) // 2 resume_revid = full_rows[middle_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:middle_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[middle_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_with_diff(): """Test that --resume correctly computes diff values after resume. The diff computation depends on having the correct prev_text state. This test verifies that diff values (text_chars, added_chars, etc.) are identical between a full run and a resumed run. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_diff_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--diff", "--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) resume_idx = len(full_rows) // 3 resume_revid = full_rows[resume_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_diff_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) # Verify diff columns are present diff_columns = ["text_chars", "diff", "diff_timeout"] for col in diff_columns: assert col in df_full.columns, f"Diff column {col} should exist in full output" assert col in df_resumed.columns, f"Diff column {col} should exist in resumed output" assert_frame_equal(df_full, df_resumed) def test_resume_file_not_found(): """Test that --resume starts fresh when output file doesn't exist.""" tester = WikiqTester(SAILORMOON, "resume_not_found", in_compression="7z", out_format="jsonl") expected_output = tester.output if os.path.exists(expected_output): os.remove(expected_output) # Should succeed by starting fresh tester.call_wikiq("--fandom-2020", "--resume") # Verify output was created assert os.path.exists(expected_output), "Output file should be created when starting fresh" rows = read_jsonl(expected_output) assert len(rows) > 0, "Output should have data" print("Resume file not found test passed - started fresh!") def test_resume_simple(): """Test that --resume works without --fandom-2020.""" import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_simple_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq() except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) resume_idx = len(full_rows) // 3 resume_revid = full_rows[resume_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_simple_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_checkpoint_read(): """Test that read_checkpoint correctly reads checkpoint files.""" with tempfile.TemporaryDirectory() as tmpdir: checkpoint_path = os.path.join(tmpdir, "test.jsonl.checkpoint") # Test reading valid checkpoint with open(checkpoint_path, 'w') as f: json.dump({"pageid": 100, "revid": 200}, f) result = read_checkpoint(checkpoint_path) assert result == (100, 200), f"Expected (100, 200), got {result}" # Test reading non-existent checkpoint result = read_checkpoint(os.path.join(tmpdir, "nonexistent.checkpoint")) assert result is None, f"Expected None for non-existent file, got {result}" # Test reading empty checkpoint empty_path = os.path.join(tmpdir, "empty.checkpoint") with open(empty_path, 'w') as f: f.write("{}") result = read_checkpoint(empty_path) assert result is None, f"Expected None for empty checkpoint, got {result}" # Test reading corrupted checkpoint corrupt_path = os.path.join(tmpdir, "corrupt.checkpoint") with open(corrupt_path, 'w') as f: f.write("not valid json") result = read_checkpoint(corrupt_path) assert result is None, f"Expected None for corrupted checkpoint, got {result}" print("Checkpoint read test passed!") def test_resume_with_interruption(): """Test that resume works correctly after interruption.""" import pandas as pd from pandas.testing import assert_frame_equal output_dir = os.path.join(TEST_OUTPUT_DIR, "resume_interrupt") input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") if os.path.exists(output_dir): shutil.rmtree(output_dir) os.makedirs(output_dir) output_file = os.path.join(output_dir, f"{SAILORMOON}.jsonl") # First, run to completion to know expected output cmd_full = f"{WIKIQ} {input_file} -o {output_file} --fandom-2020" try: subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_rows = read_jsonl(output_file) # Clean up for interrupted run if os.path.exists(output_file): os.remove(output_file) checkpoint_path = get_checkpoint_path(output_file) if os.path.exists(checkpoint_path): os.remove(checkpoint_path) # Start wikiq and interrupt it cmd_partial = [ sys.executable, WIKIQ, input_file, "-o", output_file, "--batch-size", "10", "--fandom-2020" ] proc = subprocess.Popen(cmd_partial, stderr=subprocess.PIPE) interrupt_delay = 3 time.sleep(interrupt_delay) if proc.poll() is not None: # Process completed before we could interrupt interrupted_rows = read_jsonl(output_file) df_full = pd.DataFrame(full_rows) df_interrupted = pd.DataFrame(interrupted_rows) assert_frame_equal(df_full, df_interrupted) return proc.send_signal(signal.SIGUSR1) try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.send_signal(signal.SIGTERM) proc.wait(timeout=30) interrupted_rows = read_jsonl(output_file) if len(interrupted_rows) >= len(full_rows): # Process completed before interrupt df_full = pd.DataFrame(full_rows) df_interrupted = pd.DataFrame(interrupted_rows) assert_frame_equal(df_full, df_interrupted) return # Now resume cmd_resume = f"{WIKIQ} {input_file} -o {output_file} --batch-size 10 --fandom-2020 --resume" try: subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(output_file) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_parquet(): """Test that --resume works correctly with Parquet output format.""" import pandas as pd from pandas.testing import assert_frame_equal import pyarrow.parquet as pq tester_full = WikiqTester(SAILORMOON, "resume_parquet_full", in_compression="7z", out_format="parquet") try: tester_full.call_wikiq("--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_table = pq.read_table(full_output_path) # Use unsorted indices consistently - slice the table and get checkpoint from same position resume_idx = len(full_table) // 3 resume_revid = int(full_table.column("revid")[resume_idx].as_py()) resume_pageid = int(full_table.column("articleid")[resume_idx].as_py()) tester_partial = WikiqTester(SAILORMOON, "resume_parquet_partial", in_compression="7z", out_format="parquet") partial_output_path = tester_partial.output # Write partial Parquet file using the SAME schema as the full file partial_table = full_table.slice(0, resume_idx + 1) pq.write_table(partial_table, partial_output_path) checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": resume_pageid, "revid": resume_revid}, f) try: tester_partial.call_wikiq("--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) df_full = full_table.to_pandas() df_resumed = pd.read_parquet(partial_output_path) assert_frame_equal(df_full, df_resumed) def test_resume_tsv_error(): """Test that --resume with TSV output produces a proper error message.""" tester = WikiqTester(SAILORMOON, "resume_tsv_error", in_compression="7z", out_format="tsv") try: tester.call_wikiq("--fandom-2020", "--resume") pytest.fail("Expected error for --resume with TSV output") except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf8") assert "Error: --resume only works with JSONL or Parquet" in stderr, \ f"Expected proper error message, got: {stderr}" print("TSV resume error test passed!") def test_resume_data_equivalence(): """Test that resumed output produces exactly equivalent data to a full run. The revert detector state is maintained during the skip phase, so revert detection should be identical to a full run. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_equiv_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) resume_idx = len(full_rows) // 3 resume_revid = full_rows[resume_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_equiv_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_with_persistence(): """Test that --resume correctly handles persistence state after resume. Persistence (PWR) depends on maintaining token state across revisions. This test verifies that persistence values (token_revs) are identical between a full run and a resumed run. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_persist_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--persistence wikidiff2", "--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) resume_idx = len(full_rows) // 4 resume_revid = full_rows[resume_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_persist_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--persistence wikidiff2", "--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) # Check persistence columns are present assert "token_revs" in df_full.columns, "token_revs should exist in full output" assert "token_revs" in df_resumed.columns, "token_revs should exist in resumed output" assert_frame_equal(df_full, df_resumed) def test_resume_corrupted_jsonl_last_line(): """Test that JSONL resume correctly handles corrupted/incomplete last line. When the previous run was interrupted mid-write leaving an incomplete JSON line, the resume should: 1. Find the resume point from the last valid line (no checkpoint file needed) 2. Truncate the corrupted trailing data 3. Append new data, resulting in valid JSONL """ tester_full = WikiqTester(SAILORMOON, "resume_corrupt_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_rows = read_jsonl(tester_full.output) # Create a partial file with a corrupted last line tester_corrupt = WikiqTester(SAILORMOON, "resume_corrupt_test", in_compression="7z", out_format="jsonl") corrupt_output_path = tester_corrupt.output resume_idx = len(full_rows) // 2 with open(corrupt_output_path, 'w') as f: for row in full_rows[:resume_idx]: f.write(json.dumps(row) + "\n") # Write incomplete JSON (simulates crash mid-write) f.write('{"revid": 999, "articleid": 123, "incomplet') # Record file size before resume size_before = os.path.getsize(corrupt_output_path) # NO checkpoint file - JSONL resume works from last valid line in the file checkpoint_path = get_checkpoint_path(corrupt_output_path) assert not os.path.exists(checkpoint_path), "Test setup error: checkpoint should not exist" # Resume should detect corrupted line, truncate it, then append new data try: tester_corrupt.call_wikiq("--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(f"Resume failed unexpectedly: {exc.stderr.decode('utf8')}") # Verify the file is valid JSONL and readable (no corrupted lines) resumed_rows = read_jsonl(corrupt_output_path) # Full data equivalence check import pandas as pd from pandas.testing import assert_frame_equal df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_diff_persistence_combined(): """Test that --resume correctly handles both diff and persistence state together. This tests that multiple stateful features work correctly when combined. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_combined_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_output_path = tester_full.output full_rows = read_jsonl(full_output_path) resume_idx = len(full_rows) // 3 resume_revid = full_rows[resume_idx]["revid"] tester_partial = WikiqTester(SAILORMOON, "resume_combined_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": full_rows[resume_idx]["articleid"], "revid": resume_revid}, f) try: tester_partial.call_wikiq("--diff", "--persistence wikidiff2", "--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) # Verify both diff and persistence columns exist assert "diff" in df_full.columns assert "token_revs" in df_full.columns assert_frame_equal(df_full, df_resumed) def test_resume_mid_page(): """Test resume from the middle of a page with many revisions. This specifically tests that state restoration works when resuming partway through a page's revision history. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_midpage_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--diff", "--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_rows = read_jsonl(tester_full.output) df_full = pd.DataFrame(full_rows) # Find a page with many revisions page_counts = df_full.groupby("articleid").size() large_page_id = page_counts[page_counts >= 10].index[0] if any(page_counts >= 10) else page_counts.idxmax() page_revs = df_full[df_full["articleid"] == large_page_id].sort_values("revid") # Resume from middle of this page mid_idx = len(page_revs) // 2 resume_rev = page_revs.iloc[mid_idx] resume_revid = int(resume_rev["revid"]) resume_pageid = int(resume_rev["articleid"]) # Find global index for checkpoint global_idx = df_full[df_full["revid"] == resume_revid].index[0] tester_partial = WikiqTester(SAILORMOON, "resume_midpage_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output # Write all rows up to and including the resume point rows_to_write = [full_rows[i] for i in range(global_idx + 1)] with open(partial_output_path, 'w') as f: for row in rows_to_write: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": resume_pageid, "revid": resume_revid}, f) try: tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_page_boundary(): """Test resume at the exact start of a new page. This tests for off-by-one errors at page boundaries. """ import pandas as pd from pandas.testing import assert_frame_equal tester_full = WikiqTester(SAILORMOON, "resume_boundary_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--diff", "--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_rows = read_jsonl(tester_full.output) df_full = pd.DataFrame(full_rows) # Find a page boundary - last revision of one page page_last_revs = df_full.groupby("articleid")["revid"].max() # Pick a page that's not the very last one for page_id in page_last_revs.index[:-1]: last_rev_of_page = page_last_revs[page_id] row_idx = df_full[df_full["revid"] == last_rev_of_page].index[0] if row_idx < len(df_full) - 1: break resume_revid = int(last_rev_of_page) resume_pageid = int(page_id) tester_partial = WikiqTester(SAILORMOON, "resume_boundary_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output rows_to_write = [full_rows[i] for i in range(row_idx + 1)] with open(partial_output_path, 'w') as f: for row in rows_to_write: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": resume_pageid, "revid": resume_revid}, f) try: tester_partial.call_wikiq("--diff", "--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_jsonl_dir_output(): """Test that .jsonl.d output creates files named after input files. When output is a .jsonl.d directory, each input file should write to a separate JSONL file named after the input (e.g., sailormoon.jsonl), not a generic data.jsonl. """ import pandas as pd from pandas.testing import assert_frame_equal output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_test.jsonl.d") input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") if os.path.exists(output_dir): shutil.rmtree(output_dir) # Run wikiq with .jsonl.d output cmd = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10" try: subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) # Verify output file is named after input, not "data.jsonl" expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl") wrong_output = os.path.join(output_dir, "data.jsonl") assert os.path.exists(expected_output), \ f"Expected {expected_output} to exist, but it doesn't. Directory contents: {os.listdir(output_dir)}" assert not os.path.exists(wrong_output), \ f"Expected {wrong_output} NOT to exist (should be named after input file)" # Verify output has data rows = read_jsonl(expected_output) assert len(rows) > 0, "Output file should have data" def test_jsonl_dir_resume(): """Test that resume works correctly with .jsonl.d directory output. The resume logic must derive the same filename from the input file as the write logic does. """ import pandas as pd from pandas.testing import assert_frame_equal output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_resume.jsonl.d") input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z") if os.path.exists(output_dir): shutil.rmtree(output_dir) # First run: complete cmd_full = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10" try: subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl") full_rows = read_jsonl(expected_output) # Truncate to partial partial_idx = len(full_rows) // 2 with open(expected_output, 'w') as f: for row in full_rows[:partial_idx]: f.write(json.dumps(row) + "\n") # Resume cmd_resume = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10 --resume" try: subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True) except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(expected_output) df_full = pd.DataFrame(full_rows) df_resumed = pd.DataFrame(resumed_rows) assert_frame_equal(df_full, df_resumed) def test_resume_revert_detection(): """Test that revert detection works correctly after resume. Verifies that the revert detector state is properly maintained during the skip phase so that reverts are correctly detected after resume. """ import pandas as pd from pandas.testing import assert_series_equal tester_full = WikiqTester(SAILORMOON, "resume_revert_full", in_compression="7z", out_format="jsonl") try: tester_full.call_wikiq("--fandom-2020") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) full_rows = read_jsonl(tester_full.output) df_full = pd.DataFrame(full_rows) # Find rows with reverts revert_rows = df_full[df_full["revert"] == True] if len(revert_rows) == 0: pytest.skip("No reverts found in test data") # Resume from before a known revert so we can verify it's detected first_revert_idx = revert_rows.index[0] if first_revert_idx < 2: pytest.skip("First revert too early in dataset") resume_idx = first_revert_idx - 1 resume_revid = full_rows[resume_idx]["revid"] resume_pageid = full_rows[resume_idx]["articleid"] tester_partial = WikiqTester(SAILORMOON, "resume_revert_partial", in_compression="7z", out_format="jsonl") partial_output_path = tester_partial.output with open(partial_output_path, 'w') as f: for row in full_rows[:resume_idx + 1]: f.write(json.dumps(row) + "\n") checkpoint_path = get_checkpoint_path(partial_output_path) with open(checkpoint_path, 'w') as f: json.dump({"pageid": resume_pageid, "revid": resume_revid}, f) try: tester_partial.call_wikiq("--fandom-2020", "--resume") except subprocess.CalledProcessError as exc: pytest.fail(exc.stderr.decode("utf8")) resumed_rows = read_jsonl(partial_output_path) df_resumed = pd.DataFrame(resumed_rows) # Verify revert column matches exactly assert_series_equal(df_full["revert"], df_resumed["revert"]) assert_series_equal(df_full["reverteds"], df_resumed["reverteds"])