make wikiq memory efficient again via batch processing.

This commit is contained in:
Nathan TeBlunthuis 2025-07-15 15:20:17 -07:00
parent 76d54ae597
commit c40506137b
6 changed files with 1809 additions and 1605 deletions

File diff suppressed because it is too large Load Diff

View File

@ -63,7 +63,7 @@ class RevisionTable:
return pa.schema([c.field for c in self.columns])
def pop(self) -> dict:
data = {}
data = dict()
for column in self.columns:
data[column.field.name] = column.pop()
@ -174,15 +174,6 @@ class RevisionTextChars(RevisionField[Union[int, None]]):
return None
class RevisionText(RevisionField[str]):
field = pa.field("text", pa.string())
def extract(self, page: mwtypes.Page, revisions: list[mwxml.Revision]) -> str:
revision = revisions[-1]
return revision.text
class RevisionIsMinor(RevisionField[bool]):
field = pa.field("minor", pa.bool_())

View File

@ -96,48 +96,6 @@ class DiffToOperationMap:
del self.from_par_move_dict[rkey]
break
# if len(self.from_par_move_dict) > 0 or len(self.to_par_move_dict) > 0:
# print("Couldn't find exact matches for all parmoves!")
# # we couldn't find all the matches via exact match
# # let's try matching based on line number instead
# lkeys_to_remove = []
# for lkey, from_diff in self.from_par_move_dict.items():
# from_linenum = from_diff["moveInfo"]["linkId"].split("_")[2]
# rkey_to_remove = None
# for rkey, to_diff in self.to_par_move_dict.items():
# to_linenum = rkey.split("_")[2]
# if from_linenum == to_linenum:
# print("Matching on line number")
# yield from self.doParMove(from_diff, to_diff)
# rkey_to_remove = rkey
# lkeys_to_remove.append(lkey)
# break
# if rkey_to_remove is not None:
# del self.to_par_move_dict[rkey_to_remove]
# for lkey in lkeys_to_remove:
# del self.from_par_move_dict[lkey]
# if len(self.from_par_move_dict) > 0 or len(self.to_par_move_dict) > 0:
# print("Couldn't find exact matches for all parmoves!")
# # we couldn't find all the matches via exact match or line number
# # let's try matching based on opIndex instead
# lkeys_to_remove = []
# for lkey, from_diff in self.from_par_move_dict.items():
# rkey_to_remove = None
# from_idx = from_diff["moveInfo"]["linkId"].split("_")[1]
# for rkey, to_diff in self.to_par_move_dict.items():
# to_idx = rkey.split("_")[1]
# print(from_idx)
# print(to_idx)
# if from_idx == to_idx:
# yield from self.doParMove(from_diff, to_diff)
# rkey_to_remove = rkey
# lkeys_to_remove.append(lkey)
# if rkey_to_remove is not None:
# del self.to_par_move_dict[rkey_to_remove]
# for lkey in lkeys_to_remove:
# del self.from_par_move_dict[lkey]
# we couldn't find matches. treat type 4 as removal and type 5 as highlight.
for from_diff in self.from_par_move_dict.values():
yield from self.doDelete(from_diff)
@ -368,22 +326,21 @@ class DiffToOperationMap:
class WikiDiffMatcher:
def __init__(
self,
texts: list[str] = None,
tokenizer: Optional[RegexTokenizer] = None,
):
differ = pywikidiff2.pywikidiff2(
numContextLines=1000000, moved_paragraph_detection_cutoff=200000
)
# Pre-compute diffs to reduce traffic overhead.
self.diffs = [json.loads(diff) for diff in differ.inline_json_diff_sequence(list(texts))]
self.tokenizer = tokenizer or TOKENIZER
class Processor(DiffEngine.Processor):
def __init__(self, texts, tokenizer=None):
self.diffs = iter(texts)
def __init__(self, tokenizer=None):
self.tokenizer = tokenizer or TOKENIZER
self.last_tokens = []
self.previous_text = ""
self.differ = pywikidiff2.pywikidiff2(
numContextLines=1000000, moved_paragraph_detection_cutoff=200000
)
self.last_diff = None
def update(self, last_tokens):
self.last_tokens = last_tokens
@ -391,7 +348,8 @@ class WikiDiffMatcher:
def process(self, text, token_class=None):
# The diff has already been computed, but we need to incrementally
# retrieve it to recreate the behavior DiffState expects.
diff = next(self.diffs)
diff = json.loads(self.differ.inline_json_diff(self.previous_text, text))
self.last_diff = diff
diffToOperationsMapper = DiffToOperationMap(diff, self.tokenizer)
diffops = list(diffToOperationsMapper.to_operations())
@ -444,7 +402,7 @@ class WikiDiffMatcher:
return border_ops, self.last_tokens, tokens
def processor(self, *args, **kwargs):
return self.Processor(self.diffs, self.tokenizer)
return self.Processor(self.tokenizer)
def process(self):
# DiffState checks for this method even though it is not called.

View File

@ -89,9 +89,9 @@ class WikiqTester:
:return: The output of the wikiq call.
"""
if out:
call = " ".join([WIKIQ, self.input_file, "-o", self.output, *args])
call = " ".join([WIKIQ, self.input_file, "-o", self.output, "--batch-size", "10", *args])
else:
call = " ".join([WIKIQ, self.input_file, *args])
call = " ".join([WIKIQ, self.input_file, "--batch-size", "10", *args])
print(call)
return subprocess.check_output(call, stderr=subprocess.PIPE, shell=True)
@ -276,6 +276,20 @@ def test_diff():
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_diff_plus_pwr():
tester = WikiqTester(SAILORMOON, "diff_pwr", in_compression="7z", out_format='parquet', baseline_format='parquet')
try:
tester.call_wikiq("--diff --persistence wikidiff2", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
test = pd.read_parquet(tester.output + f"/{SAILORMOON}.parquet")
baseline = pd.read_parquet(tester.baseline_file)
test = test.reindex(columns=sorted(test.columns))
assert_frame_equal(test, baseline, check_like=True)
def test_text():
tester = WikiqTester(SAILORMOON, "text", in_compression="7z", out_format='parquet', baseline_format='parquet')

File diff suppressed because it is too large Load Diff

View File

@ -59,7 +59,7 @@ def assert_correct_equal_section(ops, expected_equal_lines, expected_equal_token
def test_equality():
rev1 = open("test/test_diff_revisions/1285792388").read()
# whitespace is added because exact identity reverts do not result in diffs.
matcher = WikiDiffMatcher([rev1,rev1 + " "])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
ops, a, b = diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev1 + " ")
@ -75,7 +75,7 @@ def test_equality():
def test_highlight_range_3():
rev1 = open("test/test_diff_revisions/test_highlight_3_from").read()
rev2 = open("test/test_diff_revisions/test_highlight_3_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -85,7 +85,7 @@ def test_highlight_range_3():
def test_highlight_range_4():
rev1 = open("test/test_diff_revisions/test_highlight_4_from").read()
rev2 = open("test/test_diff_revisions/test_highlight_4_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -95,7 +95,7 @@ def test_highlight_range_4():
def test_complex_diff():
rev1 = open("test/test_diff_revisions/test_complex_from").read()
rev2 = open("test/test_diff_revisions/test_complex_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -107,7 +107,7 @@ def test_complex_diff():
def test_highlight_range_unicode():
rev1 = open("test/test_diff_revisions/test_unicode_highlight_from").read()
rev2 = open("test/test_diff_revisions/test_unicode_highlight_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -118,7 +118,7 @@ def test_highlight_range_unicode():
def test_highlight_range():
rev1 = open("test/test_diff_revisions/1295229484_rangeedit0").read()
rev2 = open("test/test_diff_revisions/1295229484_rangeedit1").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -128,7 +128,7 @@ def test_highlight_range():
def test_unmatched_parmoves():
rev1 = open("test/test_diff_revisions/test_unmatched_parmoves_from").read()
rev2 = open("test/test_diff_revisions/test_unmatched_parmoves_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -138,7 +138,7 @@ def test_unmatched_parmoves():
def test_bug_4():
rev1 = open("test/test_diff_revisions/test_bug_4_from").read()
rev2 = open("test/test_diff_revisions/test_bug_4_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -151,7 +151,7 @@ def test_delete():
rev2 = open("test/test_diff_revisions/1295229484_delete").read()
# whitespace is added because exact identity reverts do not result in diffs.
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev2)
@ -207,7 +207,7 @@ def test_delete():
def test_addition():
rev1 = open("test/test_diff_revisions/1285792388").read()
rev2 = open("test/test_diff_revisions/1295229484").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -255,7 +255,7 @@ def test_addition():
def test_paragraph_move():
rev1 = open("test/test_diff_revisions/1295229484").read()
rev2 = open("test/test_diff_revisions/1295229484_parmove").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -268,7 +268,7 @@ def test_paragraph_move():
def test_paragraph_move_and_change():
rev1 = open("test/test_diff_revisions/1295229484").read()
rev2 = open("test/test_diff_revisions/1295229484_parmove_and_change").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -281,7 +281,7 @@ def test_paragraph_move_and_change():
def test_infobox():
rev1 = open("test/test_diff_revisions/test_infobox_from").read()
rev2 = open("test/test_diff_revisions/test_infobox_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -294,7 +294,7 @@ def test_infobox():
def test_leading_whitespace():
rev1 = open("test/test_diff_revisions/test_leading_ws_from").read()
rev2 = open("test/test_diff_revisions/test_leading_ws_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -307,7 +307,7 @@ def test_leading_whitespace():
def test_whitespace_bug():
rev1 = open("test/test_diff_revisions/test_whitespace_bug_from").read()
rev2 = open("test/test_diff_revisions/test_whitespace_bug_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -320,7 +320,7 @@ def test_whitespace_bug():
def test_bug_3():
rev1 = open("test/test_diff_revisions/test_bug_3_from").read()
rev2 = open("test/test_diff_revisions/test_bug_3_to").read()
matcher = WikiDiffMatcher([rev1,rev2])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
# note that a and b are constructed from the diffs.
@ -335,7 +335,7 @@ def test_bug_3():
def test_actually_equal():
rev1 = open("test/test_diff_revisions/1285792388").read()
# whitespace is added because exact identity reverts do not result in diffs.
matcher = WikiDiffMatcher([rev1,rev1])
matcher = WikiDiffMatcher()
diff_processor = matcher.processor()
ops, a, b = diff_processor.process(rev1)
ops, a, b = diff_processor.process(rev1)