Begin move to columnar types

This will allow making columns optional, as desired, and make
adding new columns straightforward without impacting existing
behavior.

Signed-off-by: Will Beason <willbeason@gmail.com>
This commit is contained in:
Will Beason 2025-06-03 08:52:57 -05:00
parent f916af9836
commit 8b0f775610
2 changed files with 155 additions and 16 deletions

136
tables.py Normal file
View File

@ -0,0 +1,136 @@
from abc import abstractmethod, ABC
from datetime import datetime, timezone
from hashlib import sha1
from typing import Generic, TypeVar
import mwtypes
import mwxml
import pyarrow as pa
T = TypeVar('T')
class RevisionField(ABC, Generic[T]):
"""
Abstract type which represents a field in a table of page revisions.
"""
def __init__(self, field: pa.Field):
self.field = field
@abstractmethod
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> T:
"""
:param page: The page for this set of revisions.
:param revisions: The set of revisions to compute the field from.
Revisions are passed in chronological order, so use revisions[-1] to
access the most recent revision in the set.
"""
pass
class RevisionTableColumn(Generic[T]):
def __init__(self, field: RevisionField[T]):
self.field: RevisionField = field
self.data: list[T] = []
def add(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> None:
self.data.append(self.field.extract(page, revisions))
def pop_column(self) -> list[T]:
data = self.data
self.data = []
return data
class RevisionTable:
columns: list[RevisionTableColumn]
def add_revision_set(self, page: mwtypes.Page, revisions: list[mwxml.Revision]):
for column in self.columns:
column.add(page, revisions)
class RevisionId(RevisionField[int]):
def extract(self, _: mwtypes.Page, revisions: list[mwxml.Revision]) -> int:
revision = revisions[-1]
return revision.id
class RevisionTimestamp(RevisionField[datetime]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> datetime:
revision = revisions[-1]
return revision.timestamp
class RevisionEditorId(RevisionField[int | None]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> int | None:
revision = revisions[-1]
if revision.deleted.user or revision.user.id is None:
return None
return revision.user.id
class RevisionAnon(RevisionField[bool | None]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool | None:
revision = revisions[-1]
if revision.deleted.user:
return None
return revision.user.id is None
class RevisionEditorText(RevisionField[str | None]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str | None:
revision = revisions[-1]
if revision.deleted.user:
return None
return revision.user.text
class RevisionPageTitle(RevisionField[str]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
return page.title
class RevisionDeleted(RevisionField[bool]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
revision = revisions[-1]
return revision.deleted.text
class RevisionNamespace(RevisionField[int]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> int:
return page.namespace
class RevisionSha1(RevisionField[str]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
revision = revisions[-1]
if revision.sha1:
return revision.sha1
return sha1(revision.sha1).hexdigest()
class RevisionTextChars(RevisionField[int]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> int:
revision = revisions[-1]
return len(revision.text)
class RevisionMinor(RevisionField[bool]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> bool:
revision = revisions[-1]
return revision.minor
class RevisionCollapse(RevisionField[int]):
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> int:
return len(revisions)

35
wikiq
View File

@ -15,16 +15,16 @@ from itertools import groupby
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from typing import Any, IO, TextIO, Final from typing import Any, IO, TextIO, Final, Generator
import mwxml
from mwxml import Dump from mwxml import Dump
from deltas.tokenizers import wikitext_split from deltas.tokenizers import wikitext_split
import mwpersistence import mwpersistence
import mwreverts import mwreverts
from pyarrow import Array, Table, Schema, DataType from pyarrow import Schema
from pyarrow.parquet import ParquetWriter
TO_ENCODE = ('title', 'editor') TO_ENCODE = ('title', 'editor')
PERSISTENCE_RADIUS = 7 PERSISTENCE_RADIUS = 7
@ -57,7 +57,7 @@ class WikiqIterator:
self.mwiterator = Dump.from_file(self.fh) self.mwiterator = Dump.from_file(self.fh)
self.namespace_map = {ns.id: ns.name for ns in self.namespace_map = {ns.id: ns.name for ns in
self.mwiterator.site_info.namespaces} self.mwiterator.site_info.namespaces}
self.__pages = self.load_pages() self.__pages: Generator[WikiqPage] = self.load_pages()
def load_pages(self): def load_pages(self):
for page in self.mwiterator: for page in self.mwiterator:
@ -92,7 +92,7 @@ class WikiqPage:
self.restrictions = page.restrictions self.restrictions = page.restrictions
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.mwpage = page self.mwpage = page
self.__revisions = self.rev_list() self.__revisions: Generator[list[mwxml.Revision]] = self.rev_list()
@staticmethod @staticmethod
def user_text(rev) -> str | None: def user_text(rev) -> str | None:
@ -116,7 +116,7 @@ class WikiqPage:
for _, revs in groupby(self.mwpage, self.user_text): for _, revs in groupby(self.mwpage, self.user_text):
# All revisions are either from the same user, or this is a single # All revisions are either from the same user, or this is a single
# revision where the user is missing. # revision where the user is missing.
yield revs yield list(revs)
def __iter__(self): def __iter__(self):
return self.__revisions return self.__revisions
@ -152,7 +152,7 @@ class RegexPair(object):
def _make_key(self, cap_group): def _make_key(self, cap_group):
return "{}_{}".format(self.label, cap_group) return "{}_{}".format(self.label, cap_group)
def matchmake(self, content, rev_data): def matchmake(self, content: str, rev_data):
temp_dict = {} temp_dict = {}
# if there are named capture groups in the regex # if there are named capture groups in the regex
@ -202,7 +202,7 @@ class RegexPair(object):
def pa_schema() -> pa.Schema: def pa_schema() -> pa.Schema:
fields = [ fields: list[pa.Field] = [
pa.field("revid", pa.int64()), pa.field("revid", pa.int64()),
pa.field("date_time", pa.timestamp('s')), pa.field("date_time", pa.timestamp('s')),
pa.field("articleid", pa.int64()), pa.field("articleid", pa.int64()),
@ -222,6 +222,8 @@ def pa_schema() -> pa.Schema:
] ]
return pa.schema(fields) return pa.schema(fields)
""" """
We used to use a dictionary to collect fields for the output. We used to use a dictionary to collect fields for the output.
@ -430,18 +432,19 @@ class WikiqParser:
else: else:
sys.exit('Each regular expression *must* come with a corresponding label and vice versa.') sys.exit('Each regular expression *must* come with a corresponding label and vice versa.')
def matchmake_revision(self, rev, rev_data): def matchmake_revision(self, rev: mwxml.Revision, rev_data: Revision):
rev_data = self.matchmake_text(rev.text, rev_data) rev_data = self.matchmake_text(rev.text, rev_data)
rev_data = self.matchmake_comment(rev.comment, rev_data) rev_data = self.matchmake_comment(rev.comment, rev_data)
return rev_data return rev_data
def matchmake_text(self, text, rev_data): def matchmake_text(self, text: str, rev_data: Revision):
return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs) return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs)
def matchmake_comment(self, comment, rev_data): def matchmake_comment(self, comment: str, rev_data: Revision):
return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs) return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs)
def matchmake_pairs(self, text, rev_data, pairs): @staticmethod
def matchmake_pairs(text, rev_data, pairs):
for pair in pairs: for pair in pairs:
rev_data = pair.matchmake(text, rev_data) rev_data = pair.matchmake(text, rev_data)
return rev_data return rev_data
@ -485,11 +488,12 @@ class WikiqParser:
# Iterate through pages # Iterate through pages
for page in dump: for page in dump:
namespace = page.namespace if page.namespace is not None else self.__get_namespace_from_title(page.title) if page.namespace is None:
page.namespace = self.__get_namespace_from_title(page.title)
# skip namespaces not in the filter # skip namespaces not in the filter
if self.namespace_filter is not None: if self.namespace_filter is not None:
if namespace not in self.namespace_filter: if page.namespace not in self.namespace_filter:
continue continue
# Disable detecting reverts if radius is 0. # Disable detecting reverts if radius is 0.
@ -517,7 +521,6 @@ class WikiqParser:
# Iterate through a page's revisions # Iterate through a page's revisions
prev_text_chars = 0 prev_text_chars = 0
for revs in page: for revs in page:
revs = list(revs)
rev = revs[-1] rev = revs[-1]
editorid = None if rev.deleted.user or rev.user.id is None else rev.user.id editorid = None if rev.deleted.user or rev.user.id is None else rev.user.id
@ -529,7 +532,7 @@ class WikiqParser:
editorid=editorid, editorid=editorid,
title=page.title, title=page.title,
deleted=rev.deleted.text, deleted=rev.deleted.text,
namespace=namespace namespace=page.namespace
) )
rev_data = self.matchmake_revision(rev, rev_data) rev_data = self.matchmake_revision(rev, rev_data)