# start the server import asyncio import subprocess from functools import partial import pytest import pytest_asyncio from typing import List from deltas import Delete, Equal, Insert, wikitext_split from mwpersistence import Token from wiki_diff_matcher import WikiDiffMatcher @pytest_asyncio.fixture(scope="module", autouse=True) async def start_stop_server(): print("starting server") proc = await asyncio.create_subprocess_exec("php", "-S", "127.0.0.1:8000", "wikidiff2_api.php", stdout=subprocess.PIPE, stderr=subprocess.PIPE) # php needs a moment to actually start await asyncio.sleep(0.1) yield proc print("stopping server") proc.terminate() stdout, stderr = await proc.communicate() print(stdout.decode()) print(stderr.decode()) def assert_equal_enough(tokens:List[Token], rev): # the tokens exclude newlines # we allow extra whitespace at the beginning or end token_doc = ''.join(str(t) for t in tokens).strip() while '\n\n' in token_doc: token_doc = token_doc.replace('\n\n','\n') while '\n\n' in rev: rev = rev.replace('\n\n','\n').strip() print(token_doc, file = open('token','w')) print(rev, file = open('rev','w')) assert token_doc == rev def assert_correct_equal_section(ops, expected_equal_lines, expected_equal_tokens): n_equal_lines = 0 last_b2 = max(ops[0].b1, 0) initial_equal_tokens = 0 first_unequal_token = None for op in ops: if not isinstance(op, Equal): if isinstance(op, Insert): first_unequal_token = op.b1 else: first_unequal_token = op.a1 break n_equal_lines += 1 initial_equal_tokens += op.b2 - last_b2 last_b2 = op.b2 if expected_equal_lines == 1: first_unequal_token = op.b2 + 1 # if the last line is an equal if first_unequal_token is None: first_unequal_token = ops[-1].b2 assert n_equal_lines == expected_equal_lines # check that there are no gaps and the number is as expected assert initial_equal_tokens == last_b2 - ops[0].b1 == first_unequal_token - ops[0].b1 == expected_equal_tokens return last_b2 def test_equality(): rev1 = open("test/test_diff_revisions/1285792388").read() # whitespace is added because exact identity reverts do not result in diffs. matcher = WikiDiffMatcher([rev1,rev1 + " "]) diff_processor = matcher.processor() ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev1 + " ") assert len(ops) == 258 for op in ops[:-2]: print(op) assert isinstance(op, Equal) # note that the whitespace token does not result in a token according to wikitext_split # compare the tokens based on the diffs to the baseline # whitespace differences are allowed assert_equal_enough(b, rev1) def test_highlight_range_3(): rev1 = open("test/test_diff_revisions/test_highlight_3_from").read() rev2 = open("test/test_diff_revisions/test_highlight_3_to").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range_4(): rev1 = open("test/test_diff_revisions/test_highlight_4_from").read() rev2 = open("test/test_diff_revisions/test_highlight_4_to").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_complex_diff(): rev1 = open("test/test_diff_revisions/test_complex_from").read() rev2 = open("test/test_diff_revisions/test_complex_to").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range_unicode(): rev1 = open("test/test_diff_revisions/test_unicode_highlight_from").read() rev2 = open("test/test_diff_revisions/test_unicode_highlight_to").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_highlight_range(): rev1 = open("test/test_diff_revisions/1295229484_rangeedit0").read() rev2 = open("test/test_diff_revisions/1295229484_rangeedit1").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) def test_delete(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_delete").read() # whitespace is added because exact identity reverts do not result in diffs. matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) initial_equal_tokens = 0 first_nondelete_token = None n_deletes = 0 n_deleted_tokens = 0 last_b2 = initial_equal_tokens initial_equal_lines = 4 initial_equal_tokens = 14 last_b2 = assert_correct_equal_section(ops, expected_equal_lines=initial_equal_lines, expected_equal_tokens=initial_equal_tokens) first_noninsert_token = initial_equal_tokens last_non_delete = False idx = 0 for op in ops[initial_equal_lines:]: idx += 1 # deletes are interleaved with Equal newlines. if not isinstance(op, Delete): if last_non_delete: first_nondelete_token = op.a1 break last_non_delete = True else: last_non_delete = False if last_non_delete: n_deletes += 1 n_deleted_tokens += op.a2 - last_b2 last_b2 = op.a2 assert n_deletes == 2 assert n_deleted_tokens == last_b2 - initial_equal_tokens == first_nondelete_token - initial_equal_tokens == 317 last_b2 = assert_correct_equal_section(ops[initial_equal_lines + idx:], expected_equal_lines=252, expected_equal_tokens=9765) # first lets test that we properly build the operations. # then we can test if the state seems to work as intended. def test_addition(): rev1 = open("test/test_diff_revisions/1285792388").read() rev2 = open("test/test_diff_revisions/1295229484").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) even = True for op in ops: if even: assert isinstance(op, Insert) even = False else: assert isinstance(op, Equal) even = True assert_equal_enough(b, rev1) diff_processor.previous_text = rev1 ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) ops = list(ops) initial_equal_lines = 256 initial_equal_tokens = 9487 last_b2 = assert_correct_equal_section(ops, expected_equal_lines=initial_equal_lines, expected_equal_tokens=initial_equal_tokens) last_non_insert = False first_noninsert_token = None n_inserts = 0 n_inserted_tokens = 0 last_b2 = last_insert_b2 = initial_equal_tokens idx = 0 print(ops[initial_equal_lines:]) for op in ops[initial_equal_lines:]: if isinstance(op, Insert): n_inserts += 1 n_inserted_tokens += op.b2 - op.b1 last_insert_b2 = op.b2 last_b2 = op.b2 assert n_inserted_tokens + 1 == last_insert_b2 - initial_equal_tokens == 293 assert n_inserts == 2 def test_paragraph_move(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_parmove").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(b, rev2) assert_equal_enough(a, rev1) def test_paragraph_move_and_change(): rev1 = open("test/test_diff_revisions/1295229484").read() rev2 = open("test/test_diff_revisions/1295229484_parmove_and_change").read() matcher = WikiDiffMatcher([rev1,rev2]) diff_processor = matcher.processor() # note that a and b are constructed from the diffs. # so they reflect the state of the text according to the diff processor ops, a, b = diff_processor.process(rev1) ops, a, b = diff_processor.process(rev2) assert_equal_enough(a, rev1) assert_equal_enough(b, rev2) # slow test def test_diff_consistency(): from mwxml import Dump stream = subprocess.Popen(["7za", "x", "-so", "test/dumps/sailormoon.xml.7z", "*.xml"], stdout=subprocess.PIPE).stdout dump = Dump.from_file(stream) for page in dump: revisions = [rev.text for rev in page if rev.text] matcher = WikiDiffMatcher(revisions) diff_processor = matcher.processor() last_rev = "" for rev in revisions: print(rev, file=open("test_unicode_highlight_to",'w')) print(last_rev, file=open("test_unicode_highlight_from",'w')) ops, a, b = diff_processor.process(rev) #assert_equal_enough(a, last_rev) assert_equal_enough(b, rev) last_rev = rev