diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index fa4449d..5a69b6a 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -5,23 +5,21 @@ # additions_size deletions_size import argparse -import sys import os.path import re -from io import TextIOWrapper -from itertools import groupby - -from subprocess import Popen, PIPE +import sys from collections import deque from hashlib import sha1 -from typing import Any, IO, TextIO, Generator, Union +from io import TextIOWrapper +from itertools import groupby +from subprocess import PIPE, Popen +from typing import IO, Any, Generator, TextIO, Union -import mwxml -from mwxml import Dump - -from deltas.tokenizers import wikitext_split import mwpersistence import mwreverts +import mwxml +from deltas.tokenizers import wikitext_split +from mwxml import Dump import wikiq.tables as tables from wikiq.tables import RevisionTable @@ -29,11 +27,13 @@ from wikiq.wiki_diff_matcher import WikiDiffMatcher TO_ENCODE = ('title', 'editor') PERSISTENCE_RADIUS = 7 -from deltas import SequenceMatcher, SegmentMatcher +from pathlib import Path import pyarrow as pa -import pyarrow.parquet as pq import pyarrow.csv as pacsv +import pyarrow.parquet as pq +from deltas import SegmentMatcher, SequenceMatcher + class PersistMethod: none = 0 @@ -215,7 +215,8 @@ class WikiqParser: namespaces: Union[list[int], None] = None, revert_radius: int = 15, output_parquet: bool = True, - parquet_buffer_size: int = 2000 + parquet_buffer_size: int = 2000, + partition_namespaces: bool = False, ): """ @@ -230,6 +231,7 @@ class WikiqParser: self.revert_radius = revert_radius self.diff = diff self.text = text + self.partition_namespaces = partition_namespaces if namespaces is not None: self.namespace_filter = set(namespaces) else: @@ -374,7 +376,25 @@ class WikiqParser: if self.output_parquet: pageid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid')) revid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid')) - writer = pq.ParquetWriter(self.output_file, schema, flavor='spark', sorting_columns=[pageid_sortingcol, revid_sortingcol]) + sorting_cols = [pageid_sortingcol, revid_sortingcol] + + if self.partition_namespaces is False: + writer = pq.ParquetWriter(self.output_file, schema, flavor='spark', sorting_columns=sorting_cols) + else: + output_path = Path(self.output_file) + if self.namespace_filter is not None: + namespaces = self.namespace_filter + else: + namespaces = self.namespaces.values() + ns_paths = {ns: (output_path.parent / f"namespace={ns}") / output_path.name for ns in namespaces} + for path in ns_paths.values(): + Path(path).parent.mkdir(exist_ok=True, parents=True) + pq_writers = {ns: + pq.ParquetWriter(path, + schema, + flavor='spark', + sorting_columns=sorting_cols) for ns, path in ns_paths.items()} + else: writer = pacsv.CSVWriter(self.output_file, schema, write_options=pacsv.WriteOptions(delimiter='\t')) @@ -507,6 +527,9 @@ class WikiqParser: if not self.text: del row_buffer['text'] + if self.partition_namespaces is True: + writer = pq_writers[page.mwpage.namespace] + writer.write(pa.table(row_buffer, schema=schema)) page_count += 1 @@ -609,6 +632,10 @@ def main(): action='store_true', help="Output the text of the revision.") + parser.add_argument('-PNS', '--partition-namespaces', dest="partition_namespaces", default=False, + action='store_true', + help="Partition parquet files by namespace.") + parser.add_argument('--fandom-2020', dest="fandom_2020", action='store_true', help="Whether the archive is from the fandom 2020 dumps by Wikiteam. These dumps can have multiple .xml files in their archives.") @@ -670,6 +697,7 @@ def main(): text=args.text, diff=args.diff, output_parquet=output_parquet, + partition_namespaces=args.partition_namespaces ) wikiq.process() diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 8191860..a29fab1 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -199,6 +199,19 @@ def test_collapse_user(): baseline = pd.read_table(tester.baseline_file) assert_frame_equal(test, baseline, check_like=True) +def test_partition_namespaces(): + tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z", out_format='parquet', baseline_format='parquet') + + try: + tester.call_wikiq("--collapse-user", "--fandom-2020", "--partition-namespaces") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + test = pd.read_parquet(os.path.join(tester.output,"namespace=10/sailormoon.parquet")) + baseline = pd.read_parquet(tester.baseline_file) + assert_frame_equal(test, baseline, check_like=True) + + def test_pwr_wikidiff2(): tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z") diff --git a/test/baseline_output/collapse-user_sailormoon.parquet b/test/baseline_output/collapse-user_sailormoon.parquet new file mode 100644 index 0000000..0cb1db1 Binary files /dev/null and b/test/baseline_output/collapse-user_sailormoon.parquet differ