support .jsonl.d

This commit is contained in:
Nathan TeBlunthuis
2025-12-22 20:13:04 -08:00
parent 618c343898
commit d822085698
2 changed files with 20 additions and 4 deletions

View File

@@ -515,6 +515,7 @@ class WikiqParser:
namespaces: Union[list[int], None] = None, namespaces: Union[list[int], None] = None,
revert_radius: int = 15, revert_radius: int = 15,
output_jsonl: bool = False, output_jsonl: bool = False,
output_jsonl_dir: bool = False,
output_parquet: bool = False, output_parquet: bool = False,
batch_size: int = 1024, batch_size: int = 1024,
resume_point: Union[tuple, dict, None] = None, resume_point: Union[tuple, dict, None] = None,
@@ -569,6 +570,7 @@ class WikiqParser:
# Initialize output # Initialize output
self.batch_size = batch_size self.batch_size = batch_size
self.output_jsonl = output_jsonl self.output_jsonl = output_jsonl
self.output_jsonl_dir = output_jsonl_dir
self.output_parquet = output_parquet self.output_parquet = output_parquet
self.output_file = output_file self.output_file = output_file
@@ -836,6 +838,18 @@ class WikiqParser:
part_numbers[ns] = 0 part_numbers[ns] = 0
elif self.output_jsonl: elif self.output_jsonl:
append_mode = self.resume_point is not None append_mode = self.resume_point is not None
if self.output_jsonl_dir:
# Create directory for JSONL output
Path(self.output_file).mkdir(parents=True, exist_ok=True)
part_num = 0
if self.resume_point is not None and len(self.resume_point) > 2:
part_num = self.resume_point[2]
part_numbers[None] = part_num
jsonl_path = self._get_part_path(
Path(self.output_file) / "data.jsonl", part_num
)
writer = JSONLWriter(str(jsonl_path), schema, append=append_mode)
else:
writer = JSONLWriter(self.output_file, schema, append=append_mode) writer = JSONLWriter(self.output_file, schema, append=append_mode)
else: else:
writer = pacsv.CSVWriter( writer = pacsv.CSVWriter(
@@ -1437,7 +1451,8 @@ def main():
output = "." output = "."
# Detect output format from extension # Detect output format from extension
output_jsonl = output.endswith(".jsonl") or output.endswith(".jsonl.d") output_jsonl_dir = output.endswith(".jsonl.d")
output_jsonl = output.endswith(".jsonl") or output_jsonl_dir
output_parquet = output.endswith(".parquet") output_parquet = output.endswith(".parquet")
partition_namespaces = args.partition_namespaces and output_parquet partition_namespaces = args.partition_namespaces and output_parquet
@@ -1495,6 +1510,7 @@ def main():
text=args.text, text=args.text,
diff=args.diff, diff=args.diff,
output_jsonl=output_jsonl, output_jsonl=output_jsonl,
output_jsonl_dir=output_jsonl_dir,
output_parquet=output_parquet, output_parquet=output_parquet,
partition_namespaces=partition_namespaces, partition_namespaces=partition_namespaces,
batch_size=args.batch_size, batch_size=args.batch_size,

View File

@@ -43,8 +43,8 @@ class WikiqTester:
shutil.rmtree(self.output) shutil.rmtree(self.output)
# Also clean up resume-related files # Also clean up resume-related files
for suffix in [".resume_temp", ".checkpoint", ".merged"]: for temp_suffix in [".resume_temp", ".checkpoint", ".merged"]:
temp_path = self.output + suffix temp_path = self.output + temp_suffix
if os.path.exists(temp_path): if os.path.exists(temp_path):
if os.path.isfile(temp_path): if os.path.isfile(temp_path):
os.remove(temp_path) os.remove(temp_path)