try fixing the memory problem.

This commit is contained in:
Nathan TeBlunthuis 2025-07-14 18:58:27 -07:00
parent 76d54ae597
commit e53e7ada5d
2 changed files with 113 additions and 97 deletions

View File

@ -7,6 +7,7 @@ requires-python = ">=3.9"
dependencies = [ dependencies = [
"deltas>=0.7.0", "deltas>=0.7.0",
"mediawiki-utilities>=0.4.18", "mediawiki-utilities>=0.4.18",
"more-itertools>=10.7.0",
"mwpersistence>=0.2.4", "mwpersistence>=0.2.4",
"mwreverts>=0.1.5", "mwreverts>=0.1.5",
"mwtypes>=0.4.0", "mwtypes>=0.4.0",

View File

@ -11,6 +11,7 @@ import sys
from collections import deque from collections import deque
from hashlib import sha1 from hashlib import sha1
from io import TextIOWrapper from io import TextIOWrapper
from more_itertools import chunked
from itertools import groupby from itertools import groupby
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from typing import IO, Any, Generator, TextIO, Union from typing import IO, Any, Generator, TextIO, Union
@ -215,7 +216,7 @@ class WikiqParser:
namespaces: Union[list[int], None] = None, namespaces: Union[list[int], None] = None,
revert_radius: int = 15, revert_radius: int = 15,
output_parquet: bool = True, output_parquet: bool = True,
parquet_buffer_size: int = 2000, buffer_size: int = 200,
partition_namespaces: bool = False, partition_namespaces: bool = False,
): ):
@ -243,12 +244,13 @@ class WikiqParser:
self.regex_comment_pairs: list[RegexPair] = self.make_matchmake_pairs(regex_match_comment, regex_comment_label) self.regex_comment_pairs: list[RegexPair] = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
# here we initialize the variables we need for output. # here we initialize the variables we need for output.
self.buffer_size = buffer_size
if output_parquet is True: if output_parquet is True:
self.output_parquet = True self.output_parquet = True
self.pq_writer = None self.pq_writer = None
self.output_file = output_file self.output_file = output_file
self.parquet_buffer = [] self.parquet_buffer = []
self.parquet_buffer_size = parquet_buffer_size
else: else:
self.print_header = True self.print_header = True
if output_file == sys.stdout.buffer: if output_file == sys.stdout.buffer:
@ -400,9 +402,8 @@ class WikiqParser:
regex_matches = {} regex_matches = {}
# Iterate through pages # Iterate through pages; note that collapse_revs has already been applied.
for page in dump: for page in dump:
revision_texts = []
# skip namespaces not in the filter # skip namespaces not in the filter
if self.namespace_filter is not None: if self.namespace_filter is not None:
@ -415,21 +416,26 @@ class WikiqParser:
else: else:
reverts_column.rev_detector = None reverts_column.rev_detector = None
# Iterate through a page's revisions # Iterate through a page in batches of edit (sessions)
for revs in page: batches = chunked(page, self.buffer_size)
last_rev_text = None
for batch in batches:
output_buffer = None
revision_texts = [] if last_rev_text is None else [last_rev_text]
for revs in batch:
revs = fix_hex_digests(list(revs))
# Revisions may or may not be grouped into lists of contiguous revisions by the # Revisions may or may not be grouped into lists of contiguous revisions by the
# same user. We call these "edit sessions". Otherwise revs is a list containing # same user. We call these "edit sessions". Otherwise revs is a list containing
# exactly one revision. # exactly one revision.
revs = list(revs) # This is very broken; we can't bring all revisions into memory at once.
revs = fix_hex_digests(revs) # reverts_column.reset()
table.add(page.mwpage, revs) table.add(page.mwpage, revs)
# if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I): # if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I):
# redirect = True # redirect = True
# else: # else:
# redirect = False # redirect = False
# TODO missing: additions_size deletions_size # TODO missing: additions_size deletions_size
rev_count += 1 rev_count += 1
@ -443,6 +449,7 @@ class WikiqParser:
regex_matches[k].append(v) regex_matches[k].append(v)
revision_texts.append(rev.text) revision_texts.append(rev.text)
last_rev_text = rev.text
wikidiff_matcher = None wikidiff_matcher = None
if self.diff or self.persist == PersistMethod.wikidiff2: if self.diff or self.persist == PersistMethod.wikidiff2:
@ -454,22 +461,23 @@ class WikiqParser:
row_buffer = table.pop() row_buffer = table.pop()
if self.diff: if self.diff:
row_buffer['diff'] = [[entry for entry in wikidiff_matcher.diffs[i]['diff'] if entry['type'] != 0 ] for i in range(len(revision_texts))] row_buffer['diff'] = [[entry for entry in wikidiff_matcher.diffs[i]['diff'] if entry['type'] != 0 ] for i in range(len(revision_texts) - (1 if last_rev_text is not None else 0))]
is_revert_column: list[Union[bool, None]] = [] is_revert_column: list[Union[bool, None]] = []
print("row_buffer:deleted" + str(row_buffer['deleted']), file=open("debug",'w'))
print("row_buffer:reverteds" + str(row_buffer['reverteds']), file=open("debug",'a'))
for r, d in zip(row_buffer['reverteds'], row_buffer['deleted']): for r, d in zip(row_buffer['reverteds'], row_buffer['deleted']):
if self.revert_radius == 0 or d: if self.revert_radius == 0 or d:
is_revert_column.append(None) is_revert_column.append(None)
else: else:
is_revert_column.append(r is not None) is_revert_column.append(r is not None)
row_buffer['revert'] = is_revert_column
row_buffer['revert'] = is_revert_column
print("row_buffer:revert" + str(row_buffer['revert']),file=open("debug",'a'))
for k, v in regex_matches.items(): for k, v in regex_matches.items():
row_buffer[k] = v row_buffer[k] = v
regex_matches = {} regex_matches = {}
if self.persist != PersistMethod.none: if self.persist != PersistMethod.none:
window = deque(maxlen=PERSISTENCE_RADIUS) window = deque(maxlen=PERSISTENCE_RADIUS)
@ -485,15 +493,15 @@ class WikiqParser:
state = mwpersistence.DiffState(SegmentMatcher(tokenizer=wikitext_split), state = mwpersistence.DiffState(SegmentMatcher(tokenizer=wikitext_split),
revert_radius=PERSISTENCE_RADIUS) revert_radius=PERSISTENCE_RADIUS)
elif self.persist == PersistMethod.wikidiff2: elif self.persist == PersistMethod.wikidiff2:
state = mwpersistence.DiffState(wikidiff_matcher, state = mwpersistence.DiffState(wikidiff_matcher,
revert_radius=PERSISTENCE_RADIUS) revert_radius=PERSISTENCE_RADIUS)
else: else:
from mw.lib import persistence from mw.lib import persistence
state = persistence.State() state = persistence.State()
for idx, text in enumerate(row_buffer['text']): for idx, text in enumerate(revision_texts[(0 if last_rev_text is not None else 1):]):
rev_id = row_buffer['revid'][idx] rb_idx = idx - (0 if last_rev_text is not None else 1)
rev_id = row_buffer['revid'][rb_idx]
if self.persist != PersistMethod.legacy: if self.persist != PersistMethod.legacy:
_, tokens_added, tokens_removed = state.update(text, rev_id) _, tokens_added, tokens_removed = state.update(text, rev_id)
else: else:
@ -530,8 +538,15 @@ class WikiqParser:
if self.partition_namespaces is True: if self.partition_namespaces is True:
writer = pq_writers[page.mwpage.namespace] writer = pq_writers[page.mwpage.namespace]
writer.write(pa.table(row_buffer, schema=schema)) print("output_buffer:" + str(output_buffer), file=open('debug','a'))
if output_buffer is None:
output_buffer = row_buffer
else:
[output_buffer[k].extend(row_buffer[k]) for k in output_buffer.keys()]
print("output_buffer:" + str(output_buffer), file=open('debug','a'))
record_batch = pa.record_batch(output_buffer, schema=schema)
writer.write_batch(record_batch)
page_count += 1 page_count += 1
print("Done: %s revisions and %s pages." % (rev_count, page_count), print("Done: %s revisions and %s pages." % (rev_count, page_count),