fix jsonl.d output.
This commit is contained in:
@@ -527,6 +527,7 @@ class WikiqParser:
|
||||
headings: bool = False,
|
||||
time_limit_seconds: Union[float, None] = None,
|
||||
max_revisions_per_file: int = 0,
|
||||
input_filename: Union[str, None] = None,
|
||||
):
|
||||
"""
|
||||
Parameters:
|
||||
@@ -535,8 +536,10 @@ class WikiqParser:
|
||||
or a dict mapping namespace -> (pageid, revid) for partitioned output.
|
||||
For single-file: skip all revisions up to and including this point.
|
||||
max_revisions_per_file : if > 0, close and rotate output files after this many revisions
|
||||
input_filename : original input filename (needed for .jsonl.d output to derive output filename)
|
||||
"""
|
||||
self.input_file = input_file
|
||||
self.input_filename = input_filename
|
||||
|
||||
self.collapse_user: bool = collapse_user
|
||||
self.persist: int = persist
|
||||
@@ -845,13 +848,12 @@ class WikiqParser:
|
||||
if self.output_jsonl_dir:
|
||||
# Create directory for JSONL output
|
||||
Path(self.output_file).mkdir(parents=True, exist_ok=True)
|
||||
part_num = 0
|
||||
if self.resume_point is not None and len(self.resume_point) > 2:
|
||||
part_num = self.resume_point[2]
|
||||
part_numbers[None] = part_num
|
||||
jsonl_path = self._get_part_path(
|
||||
Path(self.output_file) / "data.jsonl", part_num
|
||||
)
|
||||
# Derive JSONL filename from input filename
|
||||
if self.input_filename:
|
||||
jsonl_basename = os.path.basename(get_output_filename(self.input_filename, 'jsonl'))
|
||||
else:
|
||||
jsonl_basename = "data.jsonl"
|
||||
jsonl_path = Path(self.output_file) / jsonl_basename
|
||||
writer = JSONLWriter(str(jsonl_path), schema, append=append_mode)
|
||||
else:
|
||||
writer = JSONLWriter(self.output_file, schema, append=append_mode)
|
||||
@@ -1527,6 +1529,7 @@ def main():
|
||||
headings=args.headings,
|
||||
time_limit_seconds=time_limit_seconds,
|
||||
max_revisions_per_file=args.max_revisions_per_file,
|
||||
input_filename=filename,
|
||||
)
|
||||
|
||||
# Register signal handlers for graceful shutdown (CLI only)
|
||||
|
||||
@@ -668,6 +668,88 @@ def test_resume_page_boundary():
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_jsonl_dir_output():
|
||||
"""Test that .jsonl.d output creates files named after input files.
|
||||
|
||||
When output is a .jsonl.d directory, each input file should write to
|
||||
a separate JSONL file named after the input (e.g., sailormoon.jsonl),
|
||||
not a generic data.jsonl.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_test.jsonl.d")
|
||||
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# Run wikiq with .jsonl.d output
|
||||
cmd = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
|
||||
try:
|
||||
subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
# Verify output file is named after input, not "data.jsonl"
|
||||
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
|
||||
wrong_output = os.path.join(output_dir, "data.jsonl")
|
||||
|
||||
assert os.path.exists(expected_output), \
|
||||
f"Expected {expected_output} to exist, but it doesn't. Directory contents: {os.listdir(output_dir)}"
|
||||
assert not os.path.exists(wrong_output), \
|
||||
f"Expected {wrong_output} NOT to exist (should be named after input file)"
|
||||
|
||||
# Verify output has data
|
||||
rows = read_jsonl(expected_output)
|
||||
assert len(rows) > 0, "Output file should have data"
|
||||
|
||||
|
||||
def test_jsonl_dir_resume():
|
||||
"""Test that resume works correctly with .jsonl.d directory output.
|
||||
|
||||
The resume logic must derive the same filename from the input file
|
||||
as the write logic does.
|
||||
"""
|
||||
import pandas as pd
|
||||
from pandas.testing import assert_frame_equal
|
||||
|
||||
output_dir = os.path.join(TEST_OUTPUT_DIR, "jsonl_dir_resume.jsonl.d")
|
||||
input_file = os.path.join(TEST_DIR, "dumps", f"{SAILORMOON}.xml.7z")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
# First run: complete
|
||||
cmd_full = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10"
|
||||
try:
|
||||
subprocess.check_output(cmd_full, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
expected_output = os.path.join(output_dir, f"{SAILORMOON}.jsonl")
|
||||
full_rows = read_jsonl(expected_output)
|
||||
|
||||
# Truncate to partial
|
||||
partial_idx = len(full_rows) // 2
|
||||
with open(expected_output, 'w') as f:
|
||||
for row in full_rows[:partial_idx]:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
# Resume
|
||||
cmd_resume = f"{WIKIQ} {input_file} -o {output_dir} --fandom-2020 --batch-size 10 --resume"
|
||||
try:
|
||||
subprocess.check_output(cmd_resume, stderr=subprocess.PIPE, shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
pytest.fail(exc.stderr.decode("utf8"))
|
||||
|
||||
resumed_rows = read_jsonl(expected_output)
|
||||
|
||||
df_full = pd.DataFrame(full_rows)
|
||||
df_resumed = pd.DataFrame(resumed_rows)
|
||||
assert_frame_equal(df_full, df_resumed)
|
||||
|
||||
|
||||
def test_resume_revert_detection():
|
||||
"""Test that revert detection works correctly after resume.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user