from itertools import chain from functools import partial import re import pytest import pytest_asyncio from typing import List from deltas import Delete, Equal, Insert, wikitext_split from mwpersistence import Token from wikiq.wiki_diff_matcher import WikiDiffMatcher def _replace_whitespace(match): if match.group(1): # If spaces matched (e.g., ' ') return ' ' elif match.group(2): # If newlines matched (e.g., '\n\n') return '\n' elif match.group(3): # If tabs matched (e.g., '\t\t') return '\t' return '' # Should not be reached if pattern is comprehensive def assert_equal_enough(tokens:List[Token], rev): # the tokens exclude newlines # we allow extra whitespace at the beginning or end token_doc = ''.join(str(t) for t in tokens) print(token_doc, file = open('token','w')) print(rev, file = open('rev','w')) token_doc = re.sub(r'( +)|(\n+)|(\t+)', _replace_whitespace, token_doc).strip() rev = re.sub(r'( +)|(\n+)|(\t+)', _replace_whitespace, rev).strip() assert token_doc == rev def assert_correct_equal_section(ops, expected_equal_lines, expected_equal_tokens): n_equal_lines = 0 last_b2 = max(ops[0].b1, 0) initial_equal_tokens = 0 first_unequal_token = None for op in ops: if not isinstance(op, Equal): if isinstance(op, Insert): first_unequal_token = op.b1 else: first_unequal_token = op.a1 break n_equal_lines += 1 initial_equal_tokens += op.b2 - last_b2 last_b2 = op.b2 if expected_equal_lines == 1: first_unequal_token = op.b2 + 1 # if the last line is an equal if first_unequal_token is None: first_unequal_token = ops[-1].b2 assert n_equal_lines == expected_equal_lines # check that there are no gaps and the number is as expected assert initial_equal_tokens == last_b2 - ops[0].b1 == first_unequal_token - ops[0].b1 == expected_equal_tokens return last_b2 def test_equality(): rev1 = open("test/test_diff_revisions/1285792388").read() # whitespace is added because exact identity reverts do not result in diffs. matcher = WikiDiffMatcher() diff_processor = matcher.processor() ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev1 + " ") assert len(ops) == 257 for op in ops[:-2]: assert isinstance(op, Equal) # note that the whitespace token does not result in a token according to wikitext_split # compare the tokens based on the diffs to the baseline # whitespace differences are allowed assert_equal_enough(b, rev1) def test_highlight_range_3(): rev1 = open("test/test_diff_revisions/test_highlight_3_from").read() rev2 = open("test/test_diff_revisions/test_highlight_3_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range_4(): rev1 = open("test/test_diff_revisions/test_highlight_4_from").read() rev2 = open("test/test_diff_revisions/test_highlight_4_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_complex_diff(): rev1 = open("test/test_diff_revisions/test_complex_from").read() rev2 = open("test/test_diff_revisions/test_complex_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range_unicode(): rev1 = open("test/test_diff_revisions/test_unicode_highlight_from").read() rev2 = open("test/test_diff_revisions/test_unicode_highlight_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range(): rev1 = open("test/test_diff_revisions/1295229484_rangeedit0").read() rev2 = open("test/test_diff_revisions/1295229484_rangeedit1").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_unmatched_parmoves(): rev1 = open("test/test_diff_revisions/test_unmatched_parmoves_from").read() rev2 = open("test/test_diff_revisions/test_unmatched_parmoves_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_bug_4(): rev1 = open("test/test_diff_revisions/test_bug_4_from").read() rev2 = open("test/test_diff_revisions/test_bug_4_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_delete(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_delete").read() # whitespace is added because exact identity reverts do not result in diffs. matcher = WikiDiffMatcher() diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) first_nondelete_token = None n_deletes = 0 n_deleted_tokens = 0 initial_equal_lines = 256 initial_equal_tokens = 9911 for i, op in enumerate(ops): if initial_equal_lines > 0: assert isinstance(op, Equal) else: break initial_equal_lines -= 1 assert initial_equal_lines == 0 assert ops[i-1].a2 - ops[0].a1 == initial_equal_tokens first_noninsert_token = initial_equal_tokens last_delete = False last_insert = False idx = 0 n_non_delete = 0 last_delete_idx = 0 for op in ops[initial_equal_lines:]: idx += 1 if isinstance(op, Delete): n_deletes += 1 n_deleted_tokens += op.a2 - op.a1 last_delete = True last_delete_idx = idx # we need to add back a newline when we have a delete else: n_non_delete += 1 if not last_delete and first_nondelete_token is None: first_nondelete_token = op.a1 if n_non_delete: last_b2 = op.b2 assert n_deletes == 4 assert n_deleted_tokens == 320 assert idx == len(ops) # first lets test that we properly build the operations. # then we can test if the state seems to work as intended. def test_addition(): rev1 = open("test/test_diff_revisions/1285792388").read() rev2 = open("test/test_diff_revisions/1295229484").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) for op in ops: assert isinstance(op, Insert) assert_equal_enough(b, rev1) diff_processor.previous_text = rev1 ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) ops = list(ops) initial_equal_lines = 255 initial_equal_tokens = 9614 last_b2 = assert_correct_equal_section(ops, expected_equal_lines=initial_equal_lines, expected_equal_tokens=initial_equal_tokens) last_non_insert = False first_noninsert_token = None n_inserts = 0 n_inserted_tokens = 0 last_b2 = last_insert_b2 = initial_equal_tokens idx = 0 last_insert = False for op in ops[initial_equal_lines:]: if isinstance(op, Insert): n_inserts += 1 n_inserted_tokens += op.b2 - op.b1 last_insert_b2 = op.b2 last_insert = True elif last_insert: assert isinstance(op, Equal) last_b2 = op.b2 assert n_inserted_tokens == last_insert_b2 - initial_equal_tokens == 296 assert n_inserts == 4 def test_paragraph_move(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_parmove").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) def test_paragraph_move_and_change(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_parmove_and_change").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_infobox(): rev1 = open("test/test_diff_revisions/test_infobox_from").read() rev2 = open("test/test_diff_revisions/test_infobox_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) def test_leading_whitespace(): rev1 = open("test/test_diff_revisions/test_leading_ws_from").read() rev2 = open("test/test_diff_revisions/test_leading_ws_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) def test_whitespace_bug(): rev1 = open("test/test_diff_revisions/test_whitespace_bug_from").read() rev2 = open("test/test_diff_revisions/test_whitespace_bug_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) def test_bug_3(): rev1 = open("test/test_diff_revisions/test_bug_3_from").read() rev2 = open("test/test_diff_revisions/test_bug_3_to").read() matcher = WikiDiffMatcher() diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) #assert_equal_enough(a, rev1) def test_actually_equal(): rev1 = open("test/test_diff_revisions/1285792388").read() # whitespace is added because exact identity reverts do not result in diffs. matcher = WikiDiffMatcher() diff_processor = matcher.processor() ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev1) assert len(ops) == 1 assert isinstance(ops[0], Equal) # note that the whitespace token does not result in a token according to wikitext_split # compare the tokens based on the diffs to the baseline # whitespace differences are allowed assert_equal_enough(b, rev1) assert_equal_enough(a, rev1) # slow test. comment out the following line to enable it. @pytest.mark.skip def test_diff_consistency(): from mwxml import Dump dump = Dump.from_file("test/dumps/ikwiki.xml") for page in dump: revisions = [rev.text for rev in page if rev.text] matcher = WikiDiffMatcher(revisions) diff_processor = matcher.processor() last_rev = "" for rev in revisions: print(rev, file=open("test_unicode_highlight_to",'w')) print(last_rev, file=open("test_unicode_highlight_from",'w')) ops, a, b = diff_processor.process(rev) assert_equal_enough(a, last_rev) assert_equal_enough(b, rev) last_rev = rev @pytest.mark.skip def test_benchmark_diff(benchmark): from mwxml import Dump dump = Dump.from_file("test/dumps/ikwiki.xml") revs = chain.from_iterable([rev.text for rev in page] for page in dump) def next_revs(): return [next(revs), next(revs)], {} benchmark.pedantic(WikiDiffMatcher, setup=next_revs, iterations=1,rounds=1000, warmup_rounds=1)