use dataclasses and pyarrow for types.

This commit is contained in:
Nathan TeBlunthuis 2021-10-17 20:21:22 -07:00
parent d8d20f670b
commit ae9a241747

280
wikiq
View File

@ -8,6 +8,7 @@ import argparse
import sys import sys
import os, os.path import os, os.path
import re import re
from datetime import datetime
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from collections import deque from collections import deque
@ -29,8 +30,6 @@ import pandas as pd
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
from typing import List
class PersistMethod: class PersistMethod:
none = 0 none = 0
sequence = 1 sequence = 1
@ -189,33 +188,120 @@ class RegexPair(object):
temp_dict[self.label] = None temp_dict[self.label] = None
# update rev_data with our new columns # update rev_data with our new columns
rev_data.update(temp_dict) for k, v in temp_dict:
rev_data.setattr(k,v)
return rev_data return rev_data
@dataclass @dataclass()
class RevData(): class RevDataBase():
revid: int revid: int
date_time: datetime date_time: datetime
articleid: int articleid: int
editorid: int editorid: int
title: str title: str
namespace: int namespace: int
deleted: bool deleted: bool
text_chars: int text_chars: int = None
revert: bool revert: bool = None
reverteds: list[bool] reverteds: list[int] = None
sha1: str sha1: str = None
text_chars: int minor: bool = None
revert: bool editor: str = None
reverteds: list[int] anon: bool = None
minor: bool collapsed_revs:int = None
editor: str
anon: bool pa_schema_fields = [
collapsed_revs:int pa.field("revid", pa.int64),
token_revs:int pa.field("date_time",pa.timestamp('ms')),
tokens_added:int pa.field("articleid",pa.int64()),
tokens_removed:int pa.field("editorid",pa.int64()),
tokens_window:int pa.field("title",pa.string()),
pa.field("namespace",pa.int32()),
pa.field("deleted",pa.binary()),
pa.field("test_chars",pa.int32()),
pa.field("revert",pa.binary()),
pa.field("reverteds",pa.list_(pa.int64())),
pa.field("sha1",pa.string()),
pa.field("minor",pa.binary()),
pa.field("editor",pa.string()),
pa.field("anon",pa.binary())
]
def to_pyarrow(self):
return pa.array(self.astuple(), map(self.pa_schema_fields, pa.field.type))
def to_tsv_row(self):
row = []
for f in self.fields():
val = getattr(self, f.name)
if getattr(self, f.name) is None:
row.append("")
elif f.type == bool:
row.append("TRUE" if val else "FALSE")
elif f.type == datetime:
row.append(val.strftime('%Y-%m-%d %H:%M:%S'))
elif f.name in {'editor','title'}:
s = '"' + val + '"'
if f.name in TO_ENCODE:
row.append(quote(str(val)))
elif f.type == list[int]:
row.append('"' + ",".join([str(x) for x in val]) + '"')
elif f.type == str:
if f.name in TO_ENCODE:
row.append(quote(str(val)))
else:
row.append(val)
return '\t'.join(row)
# def __init__(revid: int,
# date_time: datetime,
# articleid: int,
# editorid: int,
# title: str,
# namespace: int,
# deleted: bool,
# test_chars: int,
# revert: bool,
# reverteds: list[bool],
# sha1: str,
# minor: bool,
# editor: str,
# anon: bool):
@dataclass()
class RevDataCollapse(RevDataBase):
collapsed_revs:int = None
pa_collapsed_revs_schema = pa.field('collapsed_revs',pa.int64())
pa_schema_fields = RevDataBase.pa_schema_fields + pa_collapsed_revs_schema
pa_schema = pa.schema(pa_schema_fields)
@dataclass()
class RevDataPersistence(RevDataBase):
token_revs:int = None
tokens_added:int = None
tokens_removed:int = None
tokens_window:int = None
pa_persistence_schema_fields = [
pa.field(token_revs, pa.int64()),
pa.field(tokens_added, pa.int64()),
pa.field(tokens_removed, pa.int64()),
pa.tokens_window, pa.int64()]
pa_schema_fields = RevDataBase.pa_schema_fields + pa_persistence_schema_fields
@dataclass()
class RevDataCollapsePersistence(RevDataCollapse, RevDataPersistence):
pa_scehma_fields = RevDataCollapse.pa_schema_fields + RevDataPersistence.pa_persistence_schema_fields
class WikiqParser(): class WikiqParser():
def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15, output_parquet=True, parquet_buffer_size=2000): def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15, output_parquet=True, parquet_buffer_size=2000):
@ -227,13 +313,12 @@ class WikiqParser():
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.persist = persist self.persist = persist
self.printed_header = False
self.namespaces = [] self.namespaces = []
self.urlencode = urlencode self.urlencode = urlencode
self.revert_radius = revert_radius self.revert_radius = revert_radius
self.parquet_buffer = [] self.output_buffer = []
self.parquet_buffer_size = parquet_buffer_size self.output_buffer_size = output_buffer_size
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
@ -244,7 +329,26 @@ class WikiqParser():
self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label) self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label) self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
if self.collapse_user is True:
if self.persist == PersistMethod.none:
revdata_type = RevDataCollapse
else:
revdata_type = RevDataCollapsePersistence
elif self.persist != PersistMethod.none:
revdata_type = RevDataPersistence
else:
revdata_type = RevDataBase
regex_fields = [(field.name, list[str]), for field in self.regex_schemas]
self.revdata_type = dataclasses.make_dataclass('RevData_Parser',
fields=map(regex_fields,
lambda pa_field: (pa_field.name,
list[string],
field(default=None))),
bases=(revdata_type))
self.revdata_type.pa_schema_fields = revdata_type.pa_schema_fields + regex_fields
if output_parquet is True: if output_parquet is True:
self.output_parquet = True self.output_parquet = True
self.pq_writer = None self.pq_writer = None
@ -273,7 +377,7 @@ class WikiqParser():
return rev_data return rev_data
def matchmake_revision(self, text, rev_data): def matchmake_revision(self, text, rev_data):
return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs) return self.matchmake_pairs(text, rev_data, self.regex_revision_pairs)
def matchmake_comment(self, comment, rev_data): def matchmake_comment(self, comment, rev_data):
return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs) return self.matchmake_pairs(comment, rev_data, self.regex_comment_pairs)
@ -346,27 +450,17 @@ class WikiqParser():
# Iterate through a page's revisions # Iterate through a page's revisions
for rev in page: for rev in page:
# initialize rev_data rev_data = self.revdata_type(revid = rev.id,
rev_data = { date_time = rev.timestamp,
'revid':rev.id, articleid = page.id,
'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'), editorid = "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
'articleid' : page.id, title = page.title,
'editorid' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id, deleted = rev.deleted.text
'title' : '"' + page.title + '"', )
'namespace' : namespace,
'deleted' : "TRUE" if rev.deleted.text else "FALSE"
}
rev_data = self.matchmake(rev, rev_data) rev_data = self.matchmake(rev, rev_data)
# if revisions are deleted, /many/ things will be missing if not rev.deleted.text:
if rev.deleted.text:
rev_data['text_chars'] = ""
rev_data['sha1'] = ""
rev_data['revert'] = ""
rev_data['reverteds'] = ""
else:
# rev.text can be None if the page has no text # rev.text can be None if the page has no text
if not rev.text: if not rev.text:
rev.text = "" rev.text = ""
@ -378,33 +472,25 @@ class WikiqParser():
text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest() text_sha1 = sha1(bytes(rev.text, "utf8")).hexdigest()
rev_data['sha1'] = text_sha1 rev_data.sha1 = text_sha1
# TODO rev.bytes doesn't work.. looks like a bug # TODO rev.bytes doesn't work.. looks like a bug
rev_data['text_chars'] = len(rev.text) rev_data.text_chars = len(rev.text)
# generate revert data # generate revert data
revert = rev_detector.process(text_sha1, rev.id) rev_data.revert = rev_detector.process(text_sha1, rev.id)
if revert: if revert:
rev_data['revert'] = "TRUE" rev_data.reverteds = revert.reverteds
rev_data['reverteds'] = '"' + ",".join([str(x) for x in revert.reverteds]) + '"'
else:
rev_data['revert'] = "FALSE"
rev_data['reverteds'] = ""
# if the fact that the edit was minor can be hidden, this might be an issue # if the fact that the edit was minor can be hidden, this might be an issue
rev_data['minor'] = "TRUE" if rev.minor else "FALSE" rev_data.minor = rev.minor
if not rev.deleted.user: if not rev.deleted.user:
# wrap user-defined editors in quotes for fread # wrap user-defined editors in quotes for fread
rev_data['editor'] = '"' + rev.user.text + '"' rev_data.editor = rev.user.text
rev_data['anon'] = "TRUE" if rev.user.id == None else "FALSE" rev_data.anon = rev.user.id == None
else:
rev_data['anon'] = ""
rev_data['editor'] = ""
#if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I): #if re.match(r'^#redirect \[\[.*\]\]', rev.text, re.I):
# redirect = True # redirect = True
#else: #else:
@ -414,13 +500,11 @@ class WikiqParser():
# if collapse user was on, lets run that # if collapse user was on, lets run that
if self.collapse_user: if self.collapse_user:
rev_data['collapsed_revs'] = rev.collapsed_revs rev_data.collapsed_revs = rev.collapsed_revs
if self.persist != PersistMethod.none: if self.persist != PersistMethod.none:
if rev.deleted.text:
for k in ["token_revs", "tokens_added", "tokens_removed", "tokens_window"]: if not rev.deleted.text:
old_rev_data[k] = None
else:
if self.persist != PersistMethod.legacy: if self.persist != PersistMethod.legacy:
_, tokens_added, tokens_removed = state.update(rev.text, rev.id) _, tokens_added, tokens_removed = state.update(rev.text, rev.id)
@ -435,12 +519,12 @@ class WikiqParser():
num_token_revs, num_tokens = calculate_persistence(old_tokens_added) num_token_revs, num_tokens = calculate_persistence(old_tokens_added)
old_rev_data["token_revs"] = num_token_revs rev_data.token_revs = num_token_revs
old_rev_data["tokens_added"] = num_tokens rev_data.tokens_added = num_tokens
old_rev_data["tokens_removed"] = len(old_tokens_removed) rev_data.tokens_removed = len(old_tokens_removed)
old_rev_data["tokens_window"] = PERSISTENCE_RADIUS-1 rev_data.tokens_window = PERSISTENCE_RADIUS-1
self.print_rev_data(old_rev_data) self.print_rev_data(rev_data)
else: else:
self.print_rev_data(rev_data) self.print_rev_data(rev_data)
@ -457,10 +541,10 @@ class WikiqParser():
rev_id, rev_data, tokens_added, tokens_removed = item rev_id, rev_data, tokens_added, tokens_removed = item
num_token_revs, num_tokens = calculate_persistence(tokens_added) num_token_revs, num_tokens = calculate_persistence(tokens_added)
rev_data["token_revs"] = num_token_revs rev_data.token_revs = num_token_revs
rev_data["tokens_added"] = num_tokens rev_data.tokens_added = num_tokens
rev_data["tokens_removed"] = len(tokens_removed) rev_data.tokens_removed = len(tokens_removed)
rev_data["tokens_window"] = len(window)-(i+1) rev_data.tokens_window = len(window)-(i+1)
self.print_rev_data(rev_data) self.print_rev_data(rev_data)
page_count += 1 page_count += 1
@ -477,52 +561,40 @@ class WikiqParser():
def write_parquet_row(self, rev_data): def write_parquet_row(self, rev_data):
if 'deleted' in rev_data.keys(): padata = rev_data.to_pyarrow()
rev_data['deleted'] = True if rev_data['deleted'] == "TRUE" else False self.output_buffer.append(padata)
if 'minor' in rev_data.keys(): if len(self.output_buffer) >= self.output_buffer_size:
rev_data['minor'] = True if rev_data['minor'] == "TRUE" else False
if 'anon' in rev_data.keys():
rev_data['anon'] = True if rev_data['anon'] == "TRUE" else False
self.parquet_buffer.append(rev_data)
if len(self.parquet_buffer) >= self.parquet_buffer_size:
self.flush_parquet_buffer() self.flush_parquet_buffer()
def flush_parquet_buffer(self):
outtable = pd.DataFrame.from_records(self.parquet_buffer)
outtable = pa.Table.from_pandas(outtable)
if self.pq_writer is None:
schema = outtable.schema
for regex_schema in self.regex_schemas:
schema.append(regex_schema)
def flush_parquet_buffer(self):
outtable = pa.table.concat_arrays(self.output_buffer)
if self.pq_writer is None:
schema = pa.schema(self.revdata_type.pa_schema_field)
self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark') self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
self.pq_writer.write_table(outtable) self.pq_writer.write_table(outtable)
self.output_buffer = []
def print_rev_data(self, rev_data): def print_rev_data(self, rev_data):
if self.output_parquet is False: if self.output_parquet is False:
printfunc = lambda rev_data: print("\t".join(rev_data), file=self.output_file) printfunc = self.write_tsv_row
else: else:
printfunc = self.write_parquet_row printfunc = self.write_parquet_row
# if it's the first time through, print the header
if self.urlencode:
for field in TO_ENCODE:
rev_data[field] = quote(str(rev_data[field]))
if not self.printed_header:
printfunc(rev_data)
self.printed_header = True
printfunc(rev_data) printfunc(rev_data)
def write_tsv_row(self, rev_data):
self.output_buffer.append(rev_data.to_tsv_line())
if len(self.output_buffer) >= self.output_buffer_size:
self.flush_tsv_buffer()
def flush_tsv_buffer():
if self.output_header:
def open_input_file(input_filename): def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename): if re.match(r'.*\.7z$', input_filename):