fix bug by truncating corrupted jsonl lines.
This commit is contained in:
@@ -175,15 +175,32 @@ def get_jsonl_resume_point(output_file, input_file=None):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(output_file) as f:
|
# Track positions of last two valid lines for potential truncation
|
||||||
# Stream through file, keeping only last 2 lines in memory
|
valid_lines = deque(maxlen=2) # (end_position, record)
|
||||||
for line in reversed(deque(f, maxlen=2)):
|
with open(output_file, 'rb') as f:
|
||||||
|
while True:
|
||||||
|
line = f.readline()
|
||||||
|
if not line:
|
||||||
|
break
|
||||||
try:
|
try:
|
||||||
record = json.loads(line)
|
record = json.loads(line.decode('utf-8'))
|
||||||
return (record['articleid'], record['revid'])
|
valid_lines.append((f.tell(), record))
|
||||||
except (json.JSONDecodeError, KeyError):
|
except (json.JSONDecodeError, KeyError, UnicodeDecodeError):
|
||||||
continue
|
pass
|
||||||
|
|
||||||
|
if not valid_lines:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
last_valid_pos, last_valid_record = valid_lines[-1]
|
||||||
|
|
||||||
|
# Truncate if file extends past last valid line (corrupted trailing data)
|
||||||
|
file_size = os.path.getsize(output_file)
|
||||||
|
if last_valid_pos < file_size:
|
||||||
|
print(f"Truncating corrupted data from {output_file} ({file_size - last_valid_pos} bytes)", file=sys.stderr)
|
||||||
|
with open(output_file, 'r+b') as f:
|
||||||
|
f.truncate(last_valid_pos)
|
||||||
|
|
||||||
|
return (last_valid_record['articleid'], last_valid_record['revid'])
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr)
|
print(f"Warning: Could not read {output_file}: {e}", file=sys.stderr)
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -458,7 +458,10 @@ def test_resume_corrupted_jsonl_last_line():
|
|||||||
"""Test that JSONL resume correctly handles corrupted/incomplete last line.
|
"""Test that JSONL resume correctly handles corrupted/incomplete last line.
|
||||||
|
|
||||||
When the previous run was interrupted mid-write leaving an incomplete JSON
|
When the previous run was interrupted mid-write leaving an incomplete JSON
|
||||||
line, the resume should detect and remove the corrupted line before appending.
|
line, the resume should:
|
||||||
|
1. Find the resume point from the last valid line (no checkpoint file needed)
|
||||||
|
2. Truncate the corrupted trailing data
|
||||||
|
3. Append new data, resulting in valid JSONL
|
||||||
"""
|
"""
|
||||||
tester_full = WikiqTester(SAILORMOON, "resume_corrupt_full", in_compression="7z", out_format="jsonl")
|
tester_full = WikiqTester(SAILORMOON, "resume_corrupt_full", in_compression="7z", out_format="jsonl")
|
||||||
|
|
||||||
@@ -481,19 +484,20 @@ def test_resume_corrupted_jsonl_last_line():
|
|||||||
# Write incomplete JSON (simulates crash mid-write)
|
# Write incomplete JSON (simulates crash mid-write)
|
||||||
f.write('{"revid": 999, "articleid": 123, "incomplet')
|
f.write('{"revid": 999, "articleid": 123, "incomplet')
|
||||||
|
|
||||||
# Write checkpoint pointing to a valid revision (last complete row)
|
# Record file size before resume
|
||||||
checkpoint_path = get_checkpoint_path(corrupt_output_path)
|
size_before = os.path.getsize(corrupt_output_path)
|
||||||
with open(checkpoint_path, 'w') as f:
|
|
||||||
json.dump({"pageid": full_rows[resume_idx - 1]["articleid"],
|
|
||||||
"revid": full_rows[resume_idx - 1]["revid"]}, f)
|
|
||||||
|
|
||||||
# Resume should detect and remove the corrupted line, then append new data
|
# NO checkpoint file - JSONL resume works from last valid line in the file
|
||||||
|
checkpoint_path = get_checkpoint_path(corrupt_output_path)
|
||||||
|
assert not os.path.exists(checkpoint_path), "Test setup error: checkpoint should not exist"
|
||||||
|
|
||||||
|
# Resume should detect corrupted line, truncate it, then append new data
|
||||||
try:
|
try:
|
||||||
tester_corrupt.call_wikiq("--fandom-2020", "--resume")
|
tester_corrupt.call_wikiq("--fandom-2020", "--resume")
|
||||||
except subprocess.CalledProcessError as exc:
|
except subprocess.CalledProcessError as exc:
|
||||||
pytest.fail(f"Resume failed unexpectedly: {exc.stderr.decode('utf8')}")
|
pytest.fail(f"Resume failed unexpectedly: {exc.stderr.decode('utf8')}")
|
||||||
|
|
||||||
# Verify the file is valid JSONL and readable
|
# Verify the file is valid JSONL and readable (no corrupted lines)
|
||||||
resumed_rows = read_jsonl(corrupt_output_path)
|
resumed_rows = read_jsonl(corrupt_output_path)
|
||||||
|
|
||||||
# Full data equivalence check
|
# Full data equivalence check
|
||||||
|
|||||||
Reference in New Issue
Block a user