Add some descriptive comments.
This commit is contained in:
parent
c285402683
commit
bb83d62b74
90
wikiq
90
wikiq
@ -26,7 +26,7 @@ from deltas import SequenceMatcher
|
|||||||
from deltas import SegmentMatcher
|
from deltas import SegmentMatcher
|
||||||
|
|
||||||
import dataclasses as dc
|
import dataclasses as dc
|
||||||
from dataclasses import dataclass, make_dataclass
|
from dataclasses import dataclass
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
import pyarrow.parquet as pq
|
import pyarrow.parquet as pq
|
||||||
|
|
||||||
@ -133,6 +133,11 @@ class WikiqPage():
|
|||||||
return next(self.__revisions)
|
return next(self.__revisions)
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
A RegexPair is defined by a regular expression (pattern) and a label.
|
||||||
|
The pattern can include capture groups. If it does then each capture group will have a resulting column in the output.
|
||||||
|
If the pattern does not include a capture group, then only one output column will result.
|
||||||
|
"""
|
||||||
class RegexPair(object):
|
class RegexPair(object):
|
||||||
def __init__(self, pattern, label):
|
def __init__(self, pattern, label):
|
||||||
self.pattern = re.compile(pattern)
|
self.pattern = re.compile(pattern)
|
||||||
@ -201,6 +206,22 @@ class RegexPair(object):
|
|||||||
|
|
||||||
return rev_data
|
return rev_data
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
We used to use a dictionary to collect fields for the output.
|
||||||
|
Now we use dataclasses. Compared to a dictionary, this should help:
|
||||||
|
- prevent some bugs
|
||||||
|
- make it easier to output parquet data.
|
||||||
|
- use class attribute '.' syntax instead of dictionary syntax.
|
||||||
|
- improve support for tooling (autocomplete, type hints)
|
||||||
|
- use type information to define formatting rules
|
||||||
|
|
||||||
|
Depending on the parameters passed into Wikiq, the output schema can be different.
|
||||||
|
Therefore, we need to end up constructing a dataclass with the correct output schema.
|
||||||
|
It also needs to have the correct pyarrow schema so we can write parquet files.
|
||||||
|
|
||||||
|
The RevDataBase type has all the fields that will be output no matter how wikiq is invoked.
|
||||||
|
"""
|
||||||
@dataclass()
|
@dataclass()
|
||||||
class RevDataBase():
|
class RevDataBase():
|
||||||
revid: int
|
revid: int
|
||||||
@ -218,10 +239,16 @@ class RevDataBase():
|
|||||||
editor: str = None
|
editor: str = None
|
||||||
anon: bool = None
|
anon: bool = None
|
||||||
|
|
||||||
|
# toggles url encoding. this isn't a dataclass field since it doesn't have a type annotation
|
||||||
urlencode = False
|
urlencode = False
|
||||||
|
|
||||||
|
# defines pyarrow schema.
|
||||||
|
# each field in the data class needs an entry in this array.
|
||||||
|
# the names should match and be in the same order.
|
||||||
|
# this isn't a dataclass field since it doesn't have a type annotation
|
||||||
pa_schema_fields = [
|
pa_schema_fields = [
|
||||||
pa.field("revid", pa.int64()),
|
pa.field("revid", pa.int64()),
|
||||||
pa.field("date_time",pa.timestamp('ms')),
|
pa.field("date_time", pa.timestamp('ms')),
|
||||||
pa.field("articleid",pa.int64()),
|
pa.field("articleid",pa.int64()),
|
||||||
pa.field("editorid",pa.int64()),
|
pa.field("editorid",pa.int64()),
|
||||||
pa.field("title",pa.string()),
|
pa.field("title",pa.string()),
|
||||||
@ -236,9 +263,11 @@ class RevDataBase():
|
|||||||
pa.field("anon",pa.bool_())
|
pa.field("anon",pa.bool_())
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# pyarrow is a columnar format, so most of the work happens in the flush_parquet_buffer function
|
||||||
def to_pyarrow(self):
|
def to_pyarrow(self):
|
||||||
return dc.astuple(self)
|
return dc.astuple(self)
|
||||||
|
|
||||||
|
# logic to convert each field into the wikiq tsv format goes here.
|
||||||
def to_tsv_row(self):
|
def to_tsv_row(self):
|
||||||
|
|
||||||
row = []
|
row = []
|
||||||
@ -275,12 +304,26 @@ class RevDataBase():
|
|||||||
def header_row(self):
|
def header_row(self):
|
||||||
return '\t'.join(map(lambda f: f.name, dc.fields(self)))
|
return '\t'.join(map(lambda f: f.name, dc.fields(self)))
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
If collapse=True we'll use a RevDataCollapse dataclass.
|
||||||
|
This class inherits from RevDataBase. This means that it has all the same fields and functions.
|
||||||
|
|
||||||
|
It just adds a new field and updates the pyarrow schema.
|
||||||
|
|
||||||
|
"""
|
||||||
@dataclass()
|
@dataclass()
|
||||||
class RevDataCollapse(RevDataBase):
|
class RevDataCollapse(RevDataBase):
|
||||||
collapsed_revs:int = None
|
collapsed_revs:int = None
|
||||||
|
|
||||||
pa_collapsed_revs_schema = pa.field('collapsed_revs',pa.int64())
|
pa_collapsed_revs_schema = pa.field('collapsed_revs',pa.int64())
|
||||||
pa_schema_fields = RevDataBase.pa_schema_fields + [pa_collapsed_revs_schema]
|
pa_schema_fields = RevDataBase.pa_schema_fields + [pa_collapsed_revs_schema]
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
If persistence data is to be computed we'll need the fields added by RevDataPersistence.
|
||||||
|
|
||||||
|
"""
|
||||||
@dataclass()
|
@dataclass()
|
||||||
class RevDataPersistence(RevDataBase):
|
class RevDataPersistence(RevDataBase):
|
||||||
token_revs:int = None
|
token_revs:int = None
|
||||||
@ -296,6 +339,10 @@ class RevDataPersistence(RevDataBase):
|
|||||||
|
|
||||||
pa_schema_fields = RevDataBase.pa_schema_fields + pa_persistence_schema_fields
|
pa_schema_fields = RevDataBase.pa_schema_fields + pa_persistence_schema_fields
|
||||||
|
|
||||||
|
"""
|
||||||
|
class RevDataCollapsePersistence uses multiple inheritence to make a class that has both persistence and collapse fields.
|
||||||
|
|
||||||
|
"""
|
||||||
@dataclass()
|
@dataclass()
|
||||||
class RevDataCollapsePersistence(RevDataCollapse, RevDataPersistence):
|
class RevDataCollapsePersistence(RevDataCollapse, RevDataPersistence):
|
||||||
pa_schema_fields = RevDataCollapse.pa_schema_fields + RevDataPersistence.pa_persistence_schema_fields
|
pa_schema_fields = RevDataCollapse.pa_schema_fields + RevDataPersistence.pa_persistence_schema_fields
|
||||||
@ -323,6 +370,9 @@ class WikiqParser():
|
|||||||
self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
|
self.regex_revision_pairs = self.make_matchmake_pairs(regex_match_revision, regex_revision_label)
|
||||||
self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
|
self.regex_comment_pairs = self.make_matchmake_pairs(regex_match_comment, regex_comment_label)
|
||||||
|
|
||||||
|
|
||||||
|
# This is where we set the type for revdata.
|
||||||
|
|
||||||
if self.collapse_user is True:
|
if self.collapse_user is True:
|
||||||
if self.persist == PersistMethod.none:
|
if self.persist == PersistMethod.none:
|
||||||
revdata_type = RevDataCollapse
|
revdata_type = RevDataCollapse
|
||||||
@ -333,16 +383,23 @@ class WikiqParser():
|
|||||||
else:
|
else:
|
||||||
revdata_type = RevDataBase
|
revdata_type = RevDataBase
|
||||||
|
|
||||||
|
# if there are regex fields, we need to add them to the revdata type.
|
||||||
regex_fields = [(field.name, list[str], dc.field(default=None)) for field in self.regex_schemas]
|
regex_fields = [(field.name, list[str], dc.field(default=None)) for field in self.regex_schemas]
|
||||||
|
|
||||||
self.revdata_type = make_dataclass('RevData_Parser',
|
# make_dataclass is a function that defines a new dataclass type.
|
||||||
fields=regex_fields,
|
# here we extend the type we have already chosen and add the regular expression types
|
||||||
bases=(revdata_type,))
|
self.revdata_type = dc.make_dataclass('RevData_Parser',
|
||||||
|
fields=regex_fields,
|
||||||
|
bases=(revdata_type,))
|
||||||
|
|
||||||
|
# we also need to make sure that we have the right pyarrow schema
|
||||||
self.revdata_type.pa_schema_fields = revdata_type.pa_schema_fields + self.regex_schemas
|
self.revdata_type.pa_schema_fields = revdata_type.pa_schema_fields + self.regex_schemas
|
||||||
|
|
||||||
self.revdata_type.urlencode = self.urlencode
|
self.revdata_type.urlencode = self.urlencode
|
||||||
|
|
||||||
|
self.schema = pa.schema(self.revdata_type.pa_schema_fields)
|
||||||
|
|
||||||
|
# here we initialize the variables we need for output.
|
||||||
if output_parquet is True:
|
if output_parquet is True:
|
||||||
self.output_parquet = True
|
self.output_parquet = True
|
||||||
self.pq_writer = None
|
self.pq_writer = None
|
||||||
@ -451,6 +508,7 @@ class WikiqParser():
|
|||||||
# Iterate through a page's revisions
|
# Iterate through a page's revisions
|
||||||
for rev in page:
|
for rev in page:
|
||||||
|
|
||||||
|
# create a new data object instead of a dictionary.
|
||||||
rev_data = self.revdata_type(revid = rev.id,
|
rev_data = self.revdata_type(revid = rev.id,
|
||||||
date_time = datetime.fromtimestamp(rev.timestamp.unix(), tz=timezone.utc),
|
date_time = datetime.fromtimestamp(rev.timestamp.unix(), tz=timezone.utc),
|
||||||
articleid = page.id,
|
articleid = page.id,
|
||||||
@ -556,6 +614,7 @@ class WikiqParser():
|
|||||||
print("Done: %s revisions and %s pages." % (rev_count, page_count),
|
print("Done: %s revisions and %s pages." % (rev_count, page_count),
|
||||||
file=sys.stderr)
|
file=sys.stderr)
|
||||||
|
|
||||||
|
# remember to flush the parquet_buffer if we're done
|
||||||
if self.output_parquet is True:
|
if self.output_parquet is True:
|
||||||
self.flush_parquet_buffer()
|
self.flush_parquet_buffer()
|
||||||
self.pq_writer.close()
|
self.pq_writer.close()
|
||||||
@ -564,6 +623,10 @@ class WikiqParser():
|
|||||||
self.output_file.close()
|
self.output_file.close()
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
For performance reasons it's better to write parquet in batches instead of one row at a time.
|
||||||
|
So this function just puts the data on a buffer. If the buffer is full, then it gets flushed (written).
|
||||||
|
"""
|
||||||
def write_parquet_row(self, rev_data):
|
def write_parquet_row(self, rev_data):
|
||||||
padata = rev_data.to_pyarrow()
|
padata = rev_data.to_pyarrow()
|
||||||
self.parquet_buffer.append(padata)
|
self.parquet_buffer.append(padata)
|
||||||
@ -572,10 +635,16 @@ class WikiqParser():
|
|||||||
self.flush_parquet_buffer()
|
self.flush_parquet_buffer()
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
Function that actually writes data to the parquet file.
|
||||||
|
It needs to transpose the data from row-by-row to column-by-column
|
||||||
|
"""
|
||||||
def flush_parquet_buffer(self):
|
def flush_parquet_buffer(self):
|
||||||
schema = pa.schema(self.revdata_type.pa_schema_fields)
|
|
||||||
|
|
||||||
def row_to_col(rg, types):
|
"""
|
||||||
|
Returns the pyarrow table that we'll write
|
||||||
|
"""
|
||||||
|
def rows_to_table(rg, schema):
|
||||||
cols = []
|
cols = []
|
||||||
first = rg[0]
|
first = rg[0]
|
||||||
for col in first:
|
for col in first:
|
||||||
@ -586,17 +655,18 @@ class WikiqParser():
|
|||||||
cols[j].append(row[j])
|
cols[j].append(row[j])
|
||||||
|
|
||||||
arrays = []
|
arrays = []
|
||||||
for col, typ in zip(cols, types):
|
for col, typ in zip(cols, schema.types):
|
||||||
arrays.append(pa.array(col, typ))
|
arrays.append(pa.array(col, typ))
|
||||||
return arrays
|
return pa.Table.from_arrays(arrays, schema=schema)
|
||||||
|
|
||||||
outtable = pa.Table.from_arrays(row_to_col(self.parquet_buffer, schema.types), schema=schema)
|
outtable = rows_to_table(self.parquet_buffer, self.schema)
|
||||||
if self.pq_writer is None:
|
if self.pq_writer is None:
|
||||||
self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
|
self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
|
||||||
|
|
||||||
self.pq_writer.write_table(outtable)
|
self.pq_writer.write_table(outtable)
|
||||||
self.parquet_buffer = []
|
self.parquet_buffer = []
|
||||||
|
|
||||||
|
# depending on if we are configured to write tsv or parquet, we'll call a different function.
|
||||||
def print_rev_data(self, rev_data):
|
def print_rev_data(self, rev_data):
|
||||||
if self.output_parquet is False:
|
if self.output_parquet is False:
|
||||||
printfunc = self.write_tsv_row
|
printfunc = self.write_tsv_row
|
||||||
|
Loading…
Reference in New Issue
Block a user