initial work on parquet support

This commit is contained in:
Nathan TeBlunthuis 2021-10-17 13:22:22 -07:00
parent cdfa77d66d
commit d8d20f670b

152
wikiq
View File

@ -24,6 +24,13 @@ PERSISTENCE_RADIUS=7
from deltas import SequenceMatcher from deltas import SequenceMatcher
from deltas import SegmentMatcher from deltas import SegmentMatcher
from dataclasses import dataclass
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from typing import List
class PersistMethod: class PersistMethod:
none = 0 none = 0
sequence = 1 sequence = 1
@ -174,44 +181,87 @@ class RegexPair(object):
# there are no capture groups, we just search for all the matches of the regex # there are no capture groups, we just search for all the matches of the regex
else: else:
#given that there are matches to be made #given that there are matches to be made
if self.pattern.search(content) is not None: if type(content) in(str, bytes):
m = self.pattern.findall(content) if self.pattern.search(content) is not None:
temp_dict[self.label] = ', '.join(m) m = self.pattern.findall(content)
else: temp_dict[self.label] = ', '.join(m)
temp_dict[self.label] = None else:
temp_dict[self.label] = None
# update rev_data with our new columns # update rev_data with our new columns
rev_data.update(temp_dict) rev_data.update(temp_dict)
return rev_data return rev_data
@dataclass
class RevData():
revid: int
date_time: datetime
articleid: int
editorid: int
title: str
namespace: int
deleted: bool
text_chars: int
revert: bool
reverteds: list[bool]
sha1: str
text_chars: int
revert: bool
reverteds: list[int]
minor: bool
editor: str
anon: bool
collapsed_revs:int
token_revs:int
tokens_added:int
tokens_removed:int
tokens_window:int
class WikiqParser(): class WikiqParser():
def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15): def __init__(self, input_file, output_file, regex_match_revision, regex_match_comment, regex_revision_label, regex_comment_label, collapse_user=False, persist=None, urlencode=False, namespaces = None, revert_radius=15, output_parquet=True, parquet_buffer_size=2000):
""" """
Parameters: Parameters:
persist : what persistence method to use. Takes a PersistMethod value persist : what persistence method to use. Takes a PersistMethod value
""" """
self.input_file = input_file self.input_file = input_file
self.output_file = output_file
self.collapse_user = collapse_user self.collapse_user = collapse_user
self.persist = persist self.persist = persist
self.printed_header = False self.printed_header = False
self.namespaces = [] self.namespaces = []
self.urlencode = urlencode self.urlencode = urlencode
self.revert_radius = revert_radius self.revert_radius = revert_radius
self.parquet_buffer = []
self.parquet_buffer_size = parquet_buffer_size
if namespaces is not None: if namespaces is not None:
self.namespace_filter = set(namespaces) self.namespace_filter = set(namespaces)
else: else:
self.namespace_filter = None self.namespace_filter = None
self.regex_schemas = []
self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label) self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label) self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
if output_parquet is True:
self.output_parquet = True
self.pq_writer = None
self.output_file = output_file
else:
self.output_file = open(output_file,'w')
def make_matchmake_pairs(self, patterns, labels): def make_matchmake_pairs(self, patterns, labels):
if (patterns is not None and labels is not None) and \ if (patterns is not None and labels is not None) and \
(len(patterns) == len(labels)): (len(patterns) == len(labels)):
return [RegexPair(pattern, label) for pattern, label in zip(patterns, labels)] result = []
for pattern, label in zip(patterns, labels):
result.append(RegexPair(pattern, label))
self.regex_schemas.append(pa.field(label, pa.list_(pa.string())))
return result
elif (patterns is None and labels is None): elif (patterns is None and labels is None):
return [] return []
else: else:
@ -301,7 +351,7 @@ class WikiqParser():
'revid':rev.id, 'revid':rev.id,
'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'date_time' : rev.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'articleid' : page.id, 'articleid' : page.id,
'editor_id' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id, 'editorid' : "" if rev.deleted.user == True or rev.user.id is None else rev.user.id,
'title' : '"' + page.title + '"', 'title' : '"' + page.title + '"',
'namespace' : namespace, 'namespace' : namespace,
'deleted' : "TRUE" if rev.deleted.text else "FALSE" 'deleted' : "TRUE" if rev.deleted.text else "FALSE"
@ -411,7 +461,6 @@ class WikiqParser():
rev_data["tokens_added"] = num_tokens rev_data["tokens_added"] = num_tokens
rev_data["tokens_removed"] = len(tokens_removed) rev_data["tokens_removed"] = len(tokens_removed)
rev_data["tokens_window"] = len(window)-(i+1) rev_data["tokens_window"] = len(window)-(i+1)
self.print_rev_data(rev_data) self.print_rev_data(rev_data)
page_count += 1 page_count += 1
@ -419,22 +468,65 @@ class WikiqParser():
print("Done: %s revisions and %s pages." % (rev_count, page_count), print("Done: %s revisions and %s pages." % (rev_count, page_count),
file=sys.stderr) file=sys.stderr)
if self.output_parquet is True:
self.flush_parquet_buffer()
self.pq_writer.close()
else:
output_file.close()
def write_parquet_row(self, rev_data):
if 'deleted' in rev_data.keys():
rev_data['deleted'] = True if rev_data['deleted'] == "TRUE" else False
if 'minor' in rev_data.keys():
rev_data['minor'] = True if rev_data['minor'] == "TRUE" else False
if 'anon' in rev_data.keys():
rev_data['anon'] = True if rev_data['anon'] == "TRUE" else False
self.parquet_buffer.append(rev_data)
if len(self.parquet_buffer) >= self.parquet_buffer_size:
self.flush_parquet_buffer()
def flush_parquet_buffer(self):
outtable = pd.DataFrame.from_records(self.parquet_buffer)
outtable = pa.Table.from_pandas(outtable)
if self.pq_writer is None:
schema = outtable.schema
for regex_schema in self.regex_schemas:
schema.append(regex_schema)
self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
self.pq_writer.write_table(outtable)
def print_rev_data(self, rev_data): def print_rev_data(self, rev_data):
if self.output_parquet is False:
printfunc = lambda rev_data: print("\t".join(rev_data), file=self.output_file)
else:
printfunc = self.write_parquet_row
# if it's the first time through, print the header # if it's the first time through, print the header
if self.urlencode: if self.urlencode:
for field in TO_ENCODE: for field in TO_ENCODE:
rev_data[field] = quote(str(rev_data[field])) rev_data[field] = quote(str(rev_data[field]))
if not self.printed_header: if not self.printed_header:
print("\t".join([str(k) for k in sorted(rev_data.keys())]), file=self.output_file) printfunc(rev_data)
self.printed_header = True self.printed_header = True
print("\t".join([str(v) for k, v in sorted(rev_data.items())]), file=self.output_file) printfunc(rev_data)
def open_input_file(input_filename): def open_input_file(input_filename):
if re.match(r'.*\.7z$', input_filename): if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, '*'] cmd = ["7za", "x", "-so", input_filename, "*.xml"]
elif re.match(r'.*\.gz$', input_filename): elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename] cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename): elif re.match(r'.*\.bz2$', input_filename):
@ -447,13 +539,19 @@ def open_input_file(input_filename):
return input_file return input_file
def open_output_file(input_filename): def get_output_filename(input_filename, parquet = False):
# create a regex that creates the output filename
output_filename = re.sub(r'\.(7z|gz|bz2)?$', '', input_filename) output_filename = re.sub(r'\.(7z|gz|bz2)?$', '', input_filename)
output_filename = re.sub(r'\.xml', '', output_filename) output_filename = re.sub(r'\.xml', '', output_filename)
output_filename = output_filename + ".tsv" if parquet is False:
output_file = open(output_filename, "w") output_filename = output_filename + ".tsv"
else:
output_filename = output_filename + ".parquet"
return output_filename
def open_output_file(input_filename):
# create a regex that creates the output filename
output_filename = get_output_filename(input_filename, parquet = False)
output_file = open(output_filename, "w")
return output_file return output_file
parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.') parser = argparse.ArgumentParser(description='Parse MediaWiki XML database dumps into tab delimitted data.')
@ -463,7 +561,7 @@ parser.add_argument('dumpfiles', metavar="DUMPFILE", nargs="*", type=str,
help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.") help="Filename of the compressed or uncompressed XML database dump. If absent, we'll look for content on stdin and output on stdout.")
parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1, parser.add_argument('-o', '--output-dir', metavar='DIR', dest='output_dir', type=str, nargs=1,
help="Directory for output files.") help="Directory for output files. If it ends with .parquet output will be in parquet format.")
parser.add_argument('-s', '--stdout', dest="stdout", action="store_true", parser.add_argument('-s', '--stdout', dest="stdout", action="store_true",
help="Write output to standard out (do not create dump file)") help="Write output to standard out (do not create dump file)")
@ -502,6 +600,8 @@ parser.add_argument('-CPl', '--comment-pattern-label', dest="regex_comment_label
args = parser.parse_args() args = parser.parse_args()
# set persistence method # set persistence method
if args.persist is None: if args.persist is None:
@ -519,6 +619,7 @@ else:
namespaces = None namespaces = None
if len(args.dumpfiles) > 0: if len(args.dumpfiles) > 0:
output_parquet = False
for filename in args.dumpfiles: for filename in args.dumpfiles:
input_file = open_input_file(filename) input_file = open_input_file(filename)
@ -528,13 +629,16 @@ if len(args.dumpfiles) > 0:
else: else:
output_dir = "." output_dir = "."
if output_dir.endswith(".parquet"):
output_parquet = True
print("Processing file: %s" % filename, file=sys.stderr) print("Processing file: %s" % filename, file=sys.stderr)
if args.stdout: if args.stdout:
output_file = sys.stdout output_file = sys.stdout
else: else:
filename = os.path.join(output_dir, os.path.basename(filename)) filename = os.path.join(output_dir, os.path.basename(filename))
output_file = open_output_file(filename) output_file = get_output_filename(filename, parquet = output_parquet)
wikiq = WikiqParser(input_file, wikiq = WikiqParser(input_file,
output_file, output_file,
@ -546,13 +650,15 @@ if len(args.dumpfiles) > 0:
regex_match_revision = args.regex_match_revision, regex_match_revision = args.regex_match_revision,
regex_revision_label = args.regex_revision_label, regex_revision_label = args.regex_revision_label,
regex_match_comment = args.regex_match_comment, regex_match_comment = args.regex_match_comment,
regex_comment_label = args.regex_comment_label) regex_comment_label = args.regex_comment_label,
output_parquet=output_parquet)
print(wikiq.output_parquet)
wikiq.process() wikiq.process()
# close things # close things
input_file.close() input_file.close()
output_file.close()
else: else:
wikiq = WikiqParser(sys.stdin, wikiq = WikiqParser(sys.stdin,
sys.stdout, sys.stdout,