import json import sys from itertools import chain from typing import Generator, List, Optional, Tuple import requests from deltas import (Delete, DiffEngine, Equal, Insert, Operation, RegexTokenizer, Token, tokenizers) TOKENIZER = tokenizers.wikitext_split def compute_diffs(url: str, texts: list[str]) -> list: response = None try: response = requests.post(url, json=texts) response.raise_for_status() incremental_diffs = response.json() except requests.exceptions.ConnectionError as e: print( f"Connection Error: Could not connect to the server at {url}. Make sure your local server is running." ) print(e) raise e except requests.exceptions.HTTPError as e: print(f"HTTP Error: {e}") if response is not None: print(f"Response Body: {response.text}") raise e except requests.exceptions.JSONDecodeError as e: # Must come before RequestException as JSONDecodeError is # a subclass. print(f"JSON Decode Error: {e}", file=sys.stderr) if response is not None: print(f"Response Body: {response.text}", file=sys.stderr) raise e except requests.exceptions.RequestException as e: print(f"An unexpected error occurred: {e}") raise e # for diff in incremental_diffs: # for wikidiffop in json.loads(diff)["diff"][0:5]: # print(wikidiffop) return incremental_diffs class DiffToOperationMap: def __init__(self, from_text, to_text, diff, tokenizer): self.diff = diff self.tokenizer = tokenizer self.diff = json.loads(diff) # the code below is designed to work in bytes because that's how wikidiff2 indexes self.from_bytes = from_text.encode("utf-8") self.to_bytes = to_text.encode("utf-8") self.from_last_end_bytes = 0 self.from_last_to_bytes = 0 self.n_from_start_tokens = 0 self.n_to_start_tokens = 0 self.last_to_start_line = 0 self.last_from_start_line = 0 self.from_last_end_bytes = 0 self.to_last_end_bytes = 0 def tokenize(self, bytes): return self.tokenizer.tokenize(bytes.decode("utf-8")) def to_operations(self): parmove_from_dict = {} # lookup move diffs based on moveinfo id. parmove_to_dict = {} for entry in self.diff["diff"]: offset = entry['offset'] linebytes = entry["text"].encode("utf-8") # ignore empty diffs. They don't have any tokens if len(linebytes) == 0: continue # this is the first byte of the line in the 'from' revision. from_start_line = entry["offset"]["from"] # this is the first byte of the line in the 'to' revision. to_start_line = entry["offset"]["to"] if entry["type"] == 0: yield from self.doEqual(linebytes, offset) # a line included in the 'to' revision, but not in the 'from' revision elif entry["type"] == 1: yield from self.doInsert(linebytes, offset) # a line included in the 'from' revision, but not in the 'to' revision elif entry["type"] == 2: yield from self.doDelete(linebytes, offset) elif entry["type"] == 3: yield from self.doHighlightRange(linebytes, entry['highlightRanges'], offset) elif entry["type"] == 4: parmove_from_dict["moveInfo"]["id"] = diff elif entry["type"] == 5: # for type 4 diffs (paragraph moved in the from revision) we need to find a matching type 5 diff. parmove_to_dict["moveInfo"]["id"] = diff # for deletions and equality report the token indexes from the 'from' revision. else: # The 'type' isn't one of the known raise ValueError(d) # mwpersistence expects differences to be represented in order from the # result's perspective ("to"), not the previous text. Thus, if a line # is moved earlier then its insertion should appear before its deletion. # As a rule of thumb, the "to" segments should be non-overlapping and # strictly increasing, while the "from" segments should merely be # non-overlapping. # now we go through the parmoves for id, from_diff in parmove_from_dict.items(): to_diff = parmove_from_dict[from_diff["moveInfo"]["linkId"]] def doEqual(self, equal_bytes, offset): tokens = self.tokenize(equal_bytes) n_tokens = len(tokens) self.n_from_end_tokens = self.n_from_start_tokens + n_tokens self.n_to_end_tokens = self.n_to_start_tokens + n_tokens yield ( Equal( self.n_from_start_tokens, self.n_from_end_tokens, self.n_to_start_tokens, self.n_to_end_tokens, ), tokens, tokens, ) # we need to keep track of the to and from last end bytes self.from_last_end_bytes = offset["from"] + len(equal_bytes) self.to_last_end_bytes = offset["to"] + len(equal_bytes) self.n_from_start_tokens += n_tokens self.n_to_start_tokens += n_tokens def doInsert(self, insert_bytes, offset): tokens = self.tokenize(insert_bytes) n_tokens = len(tokens) self.n_to_end_tokens = self.n_to_start_tokens + n_tokens yield ( Insert( self.n_from_start_tokens, self.n_from_start_tokens, self.n_to_start_tokens, self.n_to_end_tokens, ), [], tokens, ) # We have now used more of the "to" tokens. self.n_to_start_tokens += n_tokens self.to_last_end_bytes = offset["to"] + len(insert_bytes) def doDelete(self, delete_bytes, offset): tokens = self.tokenize(delete_bytes) n_tokens = len(tokens) self.n_from_end_tokens = self.n_from_start_tokens + n_tokens yield ( Delete( self.n_from_start_tokens, self.n_from_end_tokens, self.n_to_start_tokens, self.n_to_start_tokens, ), tokens, [], ) # We have now used more of the "from" tokens. self.n_from_start_tokens += n_tokens self.from_last_end_bytes = offset["from"] + len(delete_bytes) def doHighlightRange(self, highlight_bytes, highlightRanges, offset): # The text field is an overlapping mix of both the from and to, # so we need to handle it highlight-by-highlight. # there can be gaps between highlight segments. # for instance, if a word is deleted from the middle of a line. # we need to track that. highlight_end = 0 highlight_offset = offset # note that diffs are token-level, but the indexes are byte-level for highlightRange in highlightRanges: highlight_start = highlightRange["start"] # equal bytes in between highlights if highlight_start > highlight_end: equal_bytes = highlight_bytes[ highlight_end : highlight_start ] n_equal_bytes = len(equal_bytes) yield from self.doEqual(equal_bytes, highlight_offset) highlight_offset['from'] += n_equal_bytes highlight_offset['to'] += n_equal_bytes # handle highlighted insert / delete highlight_end = highlight_start + highlightRange["length"] range_bytes = highlight_bytes[highlight_start:highlight_end] n_range_bytes = len(range_bytes) if highlightRange["type"] == 0: yield from self.doInsert(range_bytes, highlight_offset) highlight_offset['to'] += n_range_bytes elif highlightRange["type"] == 1: yield from self.doDelete(range_bytes, highlight_offset) highlight_offset['from'] += n_range_bytes else: raise Exception(entry) # handle the rest of the line which is equal if highlight_end < len(highlight_bytes): range_bytes = highlight_bytes[highlight_end:] yield from self.doEqual(range_bytes, highlight_offset) class WikiDiffMatcher: def __init__( self, texts: list[str] = None, tokenizer: Optional[RegexTokenizer] = None, url: Optional[str] = "http://127.0.0.1:8000", ): # Pre-compute diffs to reduce traffic overhead. self.diffs = compute_diffs(url, texts) self.tokenizer = tokenizer or TOKENIZER class Processor(DiffEngine.Processor): def __init__(self, texts, tokenizer=None): self.diffs = iter(texts) self.tokenizer = tokenizer or TOKENIZER self.last_tokens = [] self.previous_text = "" def update(self, last_tokens): self.last_tokens = last_tokens def process(self, text, token_class=None): # The diff has already been computed, but we need to incrementally # retrieve it to recreate the behavior DiffState expects. diff = next(self.diffs) diffToOperationsMapper = DiffToOperationMap(self.previous_text, text, diff, self.tokenizer) ( operations, aseq, bseq, ) = list( zip(*diffToOperationsMapper.to_operations()) ) self.last_tokens = list(chain.from_iterable(aseq)) tokens = list(chain.from_iterable(bseq)) self.previous_text = text return operations, self.last_tokens, tokens def processor(self, *args, **kwargs): return self.Processor(self.diffs, self.tokenizer) def process(self): # DiffState checks for this method even though it is not called. raise Exception("Unnecessary implementation")