Refactor revision parsing logic to be columnar #1

Merged
beason merged 27 commits from test-parquet into parquet_support 2025-06-17 18:22:26 +00:00
Showing only changes of commit 3e8ae205e8 - Show all commits

68
wikiq
View File

@ -47,6 +47,18 @@ def calculate_persistence(tokens_added):
len(tokens_added))
def fix_hex_digests(revs: list[mwxml.Revision]) -> list[mwxml.Revision]:
i = 0
for rev in revs:
if rev.text is None:
rev.text = ""
if not rev.sha1 and not rev.deleted.text:
rev.sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
revs[i] = rev
i+=1
return revs
class WikiqIterator:
def __init__(self, fh, collapse_user=False):
self.fh = fh
@ -370,16 +382,11 @@ class WikiqParser:
beason marked this conversation as resolved Outdated

This logic is to 1. replace None with "" and then fix some edits from Fandom that didn't come with sha1s. Could we move this to a function so it looks like revs = repair_revs(revs). I'd like the mutation of revs to be super clear.

This logic is to 1. replace `None` with "" and then fix some edits from Fandom that didn't come with sha1s. Could we move this to a function so it looks like `revs = repair_revs(revs)`. I'd like the mutation of revs to be super clear.

Done!

Done!
# Iterate through a page's revisions
for revs in page:
beason marked this conversation as resolved Outdated

Don't think it's necessary to call list(revs) here.

Don't think it's necessary to call `list(revs)` here.
# Revisions may or may not be grouped into lists of contiguous revisions by the
# same user. We call these "edit sessions". Otherwise revs is a list containing
# exactly one revision.
revs = list(revs)
rev = revs[-1]
if rev.text is None:
rev.text = ""
if not rev.sha1 and not rev.deleted.text:
rev.sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
revs[-1] = rev
revs = fix_hex_digests(revs)
table.add(page.mwpage, list(revs))
@ -392,34 +399,37 @@ class WikiqParser:
rev_count += 1
beason marked this conversation as resolved Outdated

can we give buffer a more descriptive name?

can we give `buffer` a more descriptive name?

Done!

Done!
# Get the last revision in the edit session.
rev = revs[-1]
regex_dict = self.matchmake_revision(rev)
for k, v in regex_dict.items():
if regex_matches.get(k) is None:
regex_matches[k] = []
regex_matches[k].append(v)
buffer = table.pop()
# Collect the set of pages currently buffered in the table so we can run multi-page functions on them.
row_buffer = table.pop()
is_revert_column: list[bool | None] = []
for r, d in zip(buffer['reverteds'], buffer['deleted']):
for r, d in zip(row_buffer['reverteds'], row_buffer['deleted']):
if self.revert_radius == 0 or d:
is_revert_column.append(None)
else:
is_revert_column.append(r is not None)
buffer['revert'] = is_revert_column
row_buffer['revert'] = is_revert_column
for k, v in regex_matches.items():
buffer[k] = v
row_buffer[k] = v
regex_matches = {}
if self.persist != PersistMethod.none:
window = deque(maxlen=PERSISTENCE_RADIUS)
buffer['token_revs'] = []
buffer['tokens_added'] = []
buffer['tokens_removed'] = []
buffer['tokens_window'] = []
row_buffer['token_revs'] = []
row_buffer['tokens_added'] = []
row_buffer['tokens_removed'] = []
row_buffer['tokens_window'] = []
if self.persist == PersistMethod.sequence:
state = mwpersistence.DiffState(SequenceMatcher(tokenizer=wikitext_split),
@ -431,8 +441,8 @@ class WikiqParser:
from mw.lib import persistence
state = persistence.State()
for idx, text in enumerate(buffer['text']):
rev_id = buffer['revid'][idx]
for idx, text in enumerate(row_buffer['text']):
rev_id = row_buffer['revid'][idx]
if self.persist != PersistMethod.legacy:
_, tokens_added, tokens_removed = state.update(text, rev_id)
else:
@ -444,12 +454,12 @@ class WikiqParser:
old_rev_id, old_tokens_added, old_tokens_removed = window.popleft()
num_token_revs, num_tokens = calculate_persistence(old_tokens_added)
buffer['token_revs'].append(num_token_revs)
buffer['tokens_added'].append(num_tokens)
buffer['tokens_removed'].append(len(old_tokens_removed))
buffer['tokens_window'].append(PERSISTENCE_RADIUS - 1)
row_buffer['token_revs'].append(num_token_revs)
row_buffer['tokens_added'].append(num_tokens)
row_buffer['tokens_removed'].append(len(old_tokens_removed))
row_buffer['tokens_window'].append(PERSISTENCE_RADIUS - 1)
del buffer['text']
del row_buffer['text']
# print out metadata for the last RADIUS revisions
for i, item in enumerate(window):
@ -460,12 +470,12 @@ class WikiqParser:
rev_id, tokens_added, tokens_removed = item
num_token_revs, num_tokens = calculate_persistence(tokens_added)
buffer['token_revs'].append(num_token_revs)
buffer['tokens_added'].append(num_tokens)
buffer['tokens_removed'].append(len(tokens_removed))
buffer['tokens_window'].append(len(window) - (i + 1))
row_buffer['token_revs'].append(num_token_revs)
row_buffer['tokens_added'].append(num_tokens)
row_buffer['tokens_removed'].append(len(tokens_removed))
row_buffer['tokens_window'].append(len(window) - (i + 1))
writer.write(pa.table(buffer, schema=schema))
writer.write(pa.table(row_buffer, schema=schema))
page_count += 1