Revert changes related to row-buffering to just "increase cache size."
This reverts commit 1f08c01cf1
.
This commit is contained in:
parent
1f08c01cf1
commit
77f367d95e
@ -422,7 +422,6 @@ class WikiqParser:
|
||||
}
|
||||
for path in ns_paths.values():
|
||||
Path(path).parent.mkdir(exist_ok=True, parents=True)
|
||||
output_buffers = {ns: [] for ns, path in ns_paths.values()}
|
||||
pq_writers = {
|
||||
ns: pq.ParquetWriter(
|
||||
path, schema, flavor="spark", sorting_columns=sorting_cols
|
||||
@ -431,7 +430,6 @@ class WikiqParser:
|
||||
}
|
||||
|
||||
else:
|
||||
output_buffer = []
|
||||
writer = pacsv.CSVWriter(
|
||||
self.output_file,
|
||||
schema,
|
||||
@ -679,10 +677,7 @@ class WikiqParser:
|
||||
del row_buffer["text"]
|
||||
|
||||
if self.partition_namespaces is True:
|
||||
output_buffer = output_buffers[page.mwpage.namespace]
|
||||
writer = pq_writers[page.mwpage.namespace]
|
||||
output_buffer += row_buffer
|
||||
if(len(output_buffer) >
|
||||
writer.write(pa.record_batch(row_buffer, schema=schema))
|
||||
gc.collect()
|
||||
page_count += 1
|
||||
@ -880,7 +875,7 @@ def main():
|
||||
parser.add_argument(
|
||||
"--batch-size",
|
||||
dest="batch_size",
|
||||
default=16000,
|
||||
default=1500,
|
||||
type=int,
|
||||
help="How many revisions to process in each batch. This ends up being the Parquet row group size",
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user