diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index b007357..ea6bb76 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -422,6 +422,7 @@ class WikiqParser: } for path in ns_paths.values(): Path(path).parent.mkdir(exist_ok=True, parents=True) + output_buffers = {ns: [] for ns, path in ns_paths.values()} pq_writers = { ns: pq.ParquetWriter( path, schema, flavor="spark", sorting_columns=sorting_cols @@ -430,6 +431,7 @@ class WikiqParser: } else: + output_buffer = [] writer = pacsv.CSVWriter( self.output_file, schema, @@ -493,9 +495,9 @@ class WikiqParser: num_context_lines=1000000, max_word_level_diff_complexity=-1, moved_paragraph_detection_cutoff=-1, - words_cache_capacity=500, - diff_cache_capacity=500, - stats_cache_capacity=500, + words_cache_capacity=10000, + diff_cache_capacity=10000, + stats_cache_capacity=10000, ) while not on_last_batch: @@ -677,7 +679,10 @@ class WikiqParser: del row_buffer["text"] if self.partition_namespaces is True: + output_buffer = output_buffers[page.mwpage.namespace] writer = pq_writers[page.mwpage.namespace] + output_buffer += row_buffer + if(len(output_buffer) > writer.write(pa.record_batch(row_buffer, schema=schema)) gc.collect() page_count += 1 @@ -875,7 +880,7 @@ def main(): parser.add_argument( "--batch-size", dest="batch_size", - default=1500, + default=16000, type=int, help="How many revisions to process in each batch. This ends up being the Parquet row group size", ) diff --git a/src/wikiq/wiki_diff_matcher.py b/src/wikiq/wiki_diff_matcher.py index 8470f05..a4fdc0a 100644 --- a/src/wikiq/wiki_diff_matcher.py +++ b/src/wikiq/wiki_diff_matcher.py @@ -339,9 +339,9 @@ class WikiDiffMatcher: num_context_lines=1000000, max_word_level_diff_complexity=-1, moved_paragraph_detection_cutoff=-1, - words_cache_capacity=5000, - diff_cache_capacity=5000, - stats_cache_capacity=5000, + words_cache_capacity=10000, + diff_cache_capacity=10000, + stats_cache_capacity=10000, ) self.last_diff = None