diff --git a/pyproject.toml b/pyproject.toml index b312298..60f2110 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.9" dependencies = [ "deltas>=0.7.0", "mediawiki-utilities>=0.4.18", + "more-itertools>=10.7.0", "mwpersistence>=0.2.4", "mwreverts>=0.1.5", "mwtypes>=0.4.0", diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index 5a69b6a..61522af 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -11,6 +11,7 @@ import sys from collections import deque from hashlib import sha1 from io import TextIOWrapper +from more_itertools import chunked from itertools import groupby from subprocess import PIPE, Popen from typing import IO, Any, Generator, TextIO, Union @@ -215,7 +216,7 @@ class WikiqParser: namespaces: Union[list[int], None] = None, revert_radius: int = 15, output_parquet: bool = True, - parquet_buffer_size: int = 2000, + buffer_size: int = 200, partition_namespaces: bool = False, ): @@ -243,12 +244,13 @@ class WikiqParser: self.regex_comment_pairs: list[RegexPair] = self.make_matchmake_pairs(regex_match_comment, regex_comment_label) # here we initialize the variables we need for output. + self.buffer_size = buffer_size + if output_parquet is True: self.output_parquet = True self.pq_writer = None self.output_file = output_file self.parquet_buffer = [] - self.parquet_buffer_size = parquet_buffer_size else: self.print_header = True if output_file == sys.stdout.buffer: @@ -400,9 +402,8 @@ class WikiqParser: regex_matches = {} - # Iterate through pages + # Iterate through pages; note that collapse_revs has already been applied. for page in dump: - revision_texts = [] # skip namespaces not in the filter if self.namespace_filter is not None: @@ -415,123 +416,137 @@ class WikiqParser: else: reverts_column.rev_detector = None - # Iterate through a page's revisions - for revs in page: - # Revisions may or may not be grouped into lists of contiguous revisions by the - # same user. We call these "edit sessions". Otherwise revs is a list containing - # exactly one revision. - revs = list(revs) - revs = fix_hex_digests(revs) + # Iterate through a page in batches of edit (sessions) + batches = chunked(page, self.buffer_size) + last_rev_text = None + for batch in batches: + output_buffer = None + revision_texts = [] if last_rev_text is None else [last_rev_text] - table.add(page.mwpage, revs) + for revs in batch: + revs = fix_hex_digests(list(revs)) + # Revisions may or may not be grouped into lists of contiguous revisions by the + # same user. We call these "edit sessions". Otherwise revs is a list containing + # exactly one revision. + # This is very broken; we can't bring all revisions into memory at once. + # reverts_column.reset() + table.add(page.mwpage, revs) + + # if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I): + # redirect = True + # else: + # redirect = False + # TODO missing: additions_size deletions_size - # if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I): - # redirect = True - # else: - # redirect = False + rev_count += 1 - # TODO missing: additions_size deletions_size + # Get the last revision in the edit session. + rev = revs[-1] + regex_dict = self.matchmake_revision(rev) + for k, v in regex_dict.items(): + if regex_matches.get(k) is None: + regex_matches[k] = [] + regex_matches[k].append(v) - rev_count += 1 + revision_texts.append(rev.text) + last_rev_text = rev.text - # Get the last revision in the edit session. - rev = revs[-1] - regex_dict = self.matchmake_revision(rev) - for k, v in regex_dict.items(): - if regex_matches.get(k) is None: - regex_matches[k] = [] - regex_matches[k].append(v) + wikidiff_matcher = None + if self.diff or self.persist == PersistMethod.wikidiff2: + wikidiff_matcher = WikiDiffMatcher(revision_texts, + tokenizer=wikitext_split, + ) - revision_texts.append(rev.text) + # Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them. + row_buffer = table.pop() - wikidiff_matcher = None - if self.diff or self.persist == PersistMethod.wikidiff2: - wikidiff_matcher = WikiDiffMatcher(revision_texts, - tokenizer=wikitext_split, - ) + if self.diff: + row_buffer['diff'] = [[entry for entry in wikidiff_matcher.diffs[i]['diff'] if entry['type'] != 0 ] for i in range(len(revision_texts) - (1 if last_rev_text is not None else 0))] + is_revert_column: list[Union[bool, None]] = [] + print("row_buffer:deleted" + str(row_buffer['deleted']), file=open("debug",'w')) + print("row_buffer:reverteds" + str(row_buffer['reverteds']), file=open("debug",'a')) + for r, d in zip(row_buffer['reverteds'], row_buffer['deleted']): + if self.revert_radius == 0 or d: + is_revert_column.append(None) + else: + is_revert_column.append(r is not None) - # Collect the set of revisions currently buffered in the table so we can run multi-revision functions on them. - row_buffer = table.pop() + + row_buffer['revert'] = is_revert_column + print("row_buffer:revert" + str(row_buffer['revert']),file=open("debug",'a')) + for k, v in regex_matches.items(): + row_buffer[k] = v + regex_matches = {} - if self.diff: - row_buffer['diff'] = [[entry for entry in wikidiff_matcher.diffs[i]['diff'] if entry['type'] != 0 ] for i in range(len(revision_texts))] + if self.persist != PersistMethod.none: + window = deque(maxlen=PERSISTENCE_RADIUS) - is_revert_column: list[Union[bool, None]] = [] - for r, d in zip(row_buffer['reverteds'], row_buffer['deleted']): - if self.revert_radius == 0 or d: - is_revert_column.append(None) - else: - is_revert_column.append(r is not None) + row_buffer['token_revs'] = [] + row_buffer['tokens_added'] = [] + row_buffer['tokens_removed'] = [] + row_buffer['tokens_window'] = [] - row_buffer['revert'] = is_revert_column + if self.persist == PersistMethod.sequence: + state = mwpersistence.DiffState(SequenceMatcher(tokenizer=wikitext_split), + revert_radius=PERSISTENCE_RADIUS) + elif self.persist == PersistMethod.segment: + state = mwpersistence.DiffState(SegmentMatcher(tokenizer=wikitext_split), + revert_radius=PERSISTENCE_RADIUS) + elif self.persist == PersistMethod.wikidiff2: + state = mwpersistence.DiffState(wikidiff_matcher, + revert_radius=PERSISTENCE_RADIUS) + else: + from mw.lib import persistence + state = persistence.State() - for k, v in regex_matches.items(): - row_buffer[k] = v - regex_matches = {} + for idx, text in enumerate(revision_texts[(0 if last_rev_text is not None else 1):]): + rb_idx = idx - (0 if last_rev_text is not None else 1) + rev_id = row_buffer['revid'][rb_idx] + if self.persist != PersistMethod.legacy: + _, tokens_added, tokens_removed = state.update(text, rev_id) + else: + _, tokens_added, tokens_removed = state.process(text, rev_id) + window.append((rev_id, tokens_added, tokens_removed)) - if self.persist != PersistMethod.none: - window = deque(maxlen=PERSISTENCE_RADIUS) + if len(window) == PERSISTENCE_RADIUS: + old_rev_id, old_tokens_added, old_tokens_removed = window.popleft() + num_token_revs, num_tokens = calculate_persistence(old_tokens_added) - row_buffer['token_revs'] = [] - row_buffer['tokens_added'] = [] - row_buffer['tokens_removed'] = [] - row_buffer['tokens_window'] = [] + row_buffer['token_revs'].append(num_token_revs) + row_buffer['tokens_added'].append(num_tokens) + row_buffer['tokens_removed'].append(len(old_tokens_removed)) + row_buffer['tokens_window'].append(PERSISTENCE_RADIUS - 1) - if self.persist == PersistMethod.sequence: - state = mwpersistence.DiffState(SequenceMatcher(tokenizer=wikitext_split), - revert_radius=PERSISTENCE_RADIUS) - elif self.persist == PersistMethod.segment: - state = mwpersistence.DiffState(SegmentMatcher(tokenizer=wikitext_split), - revert_radius=PERSISTENCE_RADIUS) - elif self.persist == PersistMethod.wikidiff2: + # print out metadata for the last RADIUS revisions + for i, item in enumerate(window): + # if the window was full, we've already printed item 0 + if len(window) == PERSISTENCE_RADIUS and i == 0: + continue - state = mwpersistence.DiffState(wikidiff_matcher, - revert_radius=PERSISTENCE_RADIUS) - else: - from mw.lib import persistence - state = persistence.State() - - for idx, text in enumerate(row_buffer['text']): - rev_id = row_buffer['revid'][idx] - if self.persist != PersistMethod.legacy: - _, tokens_added, tokens_removed = state.update(text, rev_id) - else: - _, tokens_added, tokens_removed = state.process(text, rev_id) - - window.append((rev_id, tokens_added, tokens_removed)) - - if len(window) == PERSISTENCE_RADIUS: - old_rev_id, old_tokens_added, old_tokens_removed = window.popleft() - num_token_revs, num_tokens = calculate_persistence(old_tokens_added) + rev_id, tokens_added, tokens_removed = item + num_token_revs, num_tokens = calculate_persistence(tokens_added) row_buffer['token_revs'].append(num_token_revs) row_buffer['tokens_added'].append(num_tokens) - row_buffer['tokens_removed'].append(len(old_tokens_removed)) - row_buffer['tokens_window'].append(PERSISTENCE_RADIUS - 1) + row_buffer['tokens_removed'].append(len(tokens_removed)) + row_buffer['tokens_window'].append(len(window) - (i + 1)) - # print out metadata for the last RADIUS revisions - for i, item in enumerate(window): - # if the window was full, we've already printed item 0 - if len(window) == PERSISTENCE_RADIUS and i == 0: - continue + if not self.text: + del row_buffer['text'] - rev_id, tokens_added, tokens_removed = item - num_token_revs, num_tokens = calculate_persistence(tokens_added) + if self.partition_namespaces is True: + writer = pq_writers[page.mwpage.namespace] - row_buffer['token_revs'].append(num_token_revs) - row_buffer['tokens_added'].append(num_tokens) - row_buffer['tokens_removed'].append(len(tokens_removed)) - row_buffer['tokens_window'].append(len(window) - (i + 1)) - - if not self.text: - del row_buffer['text'] - - if self.partition_namespaces is True: - writer = pq_writers[page.mwpage.namespace] - - writer.write(pa.table(row_buffer, schema=schema)) + print("output_buffer:" + str(output_buffer), file=open('debug','a')) + if output_buffer is None: + output_buffer = row_buffer + else: + [output_buffer[k].extend(row_buffer[k]) for k in output_buffer.keys()] + print("output_buffer:" + str(output_buffer), file=open('debug','a')) + record_batch = pa.record_batch(output_buffer, schema=schema) + writer.write_batch(record_batch) page_count += 1 print("Done: %s revisions and %s pages." % (rev_count, page_count),