support partitioning output parquet by namespace.

This commit is contained in:
Nathan TeBlunthuis 2025-07-07 20:58:43 -07:00
parent c9fb94ccc0
commit 76d54ae597
3 changed files with 55 additions and 14 deletions

View File

@ -5,23 +5,21 @@
# additions_size deletions_size # additions_size deletions_size
import argparse import argparse
import sys
import os.path import os.path
import re import re
from io import TextIOWrapper import sys
from itertools import groupby
from subprocess import Popen, PIPE
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from typing import Any, IO, TextIO, Generator, Union from io import TextIOWrapper
from itertools import groupby
from subprocess import PIPE, Popen
from typing import IO, Any, Generator, TextIO, Union
import mwxml
from mwxml import Dump
from deltas.tokenizers import wikitext_split
import mwpersistence import mwpersistence
import mwreverts import mwreverts
import mwxml
from deltas.tokenizers import wikitext_split
from mwxml import Dump
import wikiq.tables as tables import wikiq.tables as tables
from wikiq.tables import RevisionTable from wikiq.tables import RevisionTable
@ -29,11 +27,13 @@ from wikiq.wiki_diff_matcher import WikiDiffMatcher
TO_ENCODE = ('title', 'editor') TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS = 7 PERSISTENCE_RADIUS = 7
from deltas import SequenceMatcher, SegmentMatcher from pathlib import Path
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pacsv import pyarrow.csv as pacsv
import pyarrow.parquet as pq
from deltas import SegmentMatcher, SequenceMatcher
class PersistMethod: class PersistMethod:
none = 0 none = 0
@ -215,7 +215,8 @@ class WikiqParser:
namespaces: Union[list[int], None] = None, namespaces: Union[list[int], None] = None,
revert_radius: int = 15, revert_radius: int = 15,
output_parquet: bool = True, output_parquet: bool = True,
parquet_buffer_size: int = 2000 parquet_buffer_size: int = 2000,
partition_namespaces: bool = False,
): ):
""" """
@ -230,6 +231,7 @@ class WikiqParser:
self.revert_radius = revert_radius self.revert_radius = revert_radius
self.diff = diff self.diff = diff
self.text = text self.text = text
self.partition_namespaces = partition_namespaces
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
else: else:
@ -374,7 +376,25 @@ class WikiqParser:
if self.output_parquet: if self.output_parquet:
pageid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid')) pageid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid'))
revid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid')) revid_sortingcol = pq.SortingColumn(schema.get_field_index('pageid'))
writer = pq.ParquetWriter(self.output_file, schema, flavor='spark', sorting_columns=[pageid_sortingcol, revid_sortingcol]) sorting_cols = [pageid_sortingcol, revid_sortingcol]
if self.partition_namespaces is False:
writer = pq.ParquetWriter(self.output_file, schema, flavor='spark', sorting_columns=sorting_cols)
else:
output_path = Path(self.output_file)
if self.namespace_filter is not None:
namespaces = self.namespace_filter
else:
namespaces = self.namespaces.values()
ns_paths = {ns: (output_path.parent / f"namespace={ns}") / output_path.name for ns in namespaces}
for path in ns_paths.values():
Path(path).parent.mkdir(exist_ok=True, parents=True)
pq_writers = {ns:
pq.ParquetWriter(path,
schema,
flavor='spark',
sorting_columns=sorting_cols) for ns, path in ns_paths.items()}
else: else:
writer = pacsv.CSVWriter(self.output_file, schema, write_options=pacsv.WriteOptions(delimiter='\t')) writer = pacsv.CSVWriter(self.output_file, schema, write_options=pacsv.WriteOptions(delimiter='\t'))
@ -507,6 +527,9 @@ class WikiqParser:
if not self.text: if not self.text:
del row_buffer['text'] del row_buffer['text']
if self.partition_namespaces is True:
writer = pq_writers[page.mwpage.namespace]
writer.write(pa.table(row_buffer, schema=schema)) writer.write(pa.table(row_buffer, schema=schema))
page_count += 1 page_count += 1
@ -609,6 +632,10 @@ def main():
action='store_true', action='store_true',
help="Output the text of the revision.") help="Output the text of the revision.")
parser.add_argument('-PNS', '--partition-namespaces', dest="partition_namespaces", default=False,
action='store_true',
help="Partition parquet files by namespace.")
parser.add_argument('--fandom-2020', dest="fandom_2020", parser.add_argument('--fandom-2020', dest="fandom_2020",
action='store_true', action='store_true',
help="Whether the archive is from the fandom 2020 dumps by Wikiteam. These dumps can have multiple .xml files in their archives.") help="Whether the archive is from the fandom 2020 dumps by Wikiteam. These dumps can have multiple .xml files in their archives.")
@ -670,6 +697,7 @@ def main():
text=args.text, text=args.text,
diff=args.diff, diff=args.diff,
output_parquet=output_parquet, output_parquet=output_parquet,
partition_namespaces=args.partition_namespaces
) )
wikiq.process() wikiq.process()

View File

@ -199,6 +199,19 @@ def test_collapse_user():
baseline = pd.read_table(tester.baseline_file) baseline = pd.read_table(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True) assert_frame_equal(test, baseline, check_like=True)
def test_partition_namespaces():
tester = WikiqTester(SAILORMOON, "collapse-user", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--collapse-user", "--fandom-2020", "--partition-namespaces")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(os.path.join(tester.output,"namespace=10/sailormoon.parquet"))
baseline = pd.read_parquet(tester.baseline_file)
assert_frame_equal(test, baseline, check_like=True)
def test_pwr_wikidiff2(): def test_pwr_wikidiff2():
tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z") tester = WikiqTester(SAILORMOON, "persistence_wikidiff2", in_compression="7z")

Binary file not shown.