Merge branch 'parquet_support' into test-parquet

This commit is contained in:
Will Beason
2025-05-29 10:21:30 -05:00
5 changed files with 53 additions and 57 deletions

24
wikiq
View File

@@ -266,13 +266,13 @@ class RevDataBase:
pa.field("revid", pa.int64()),
pa.field("date_time", pa.timestamp('ms')),
pa.field("articleid", pa.int64()),
pa.field("editorid", pa.int64()),
pa.field("editorid", pa.int64(), nullable=True),
pa.field("title", pa.string()),
pa.field("namespace", pa.int32()),
pa.field("deleted", pa.bool_()),
pa.field("text_chars", pa.int32()),
pa.field("revert", pa.bool_()),
pa.field("reverteds", pa.list_(pa.int64())),
pa.field("revert", pa.bool_(), nullable=True),
pa.field("reverteds", pa.list_(pa.int64()), nullable=True),
pa.field("sha1", pa.string()),
pa.field("minor", pa.bool_()),
pa.field("editor", pa.string()),
@@ -280,7 +280,7 @@ class RevDataBase:
]
# pyarrow is a columnar format, so most of the work happens in the flush_parquet_buffer function
def to_pyarrow(self) -> tuple[Any, ...]:
def to_pyarrow(self):
return dc.astuple(self)
# logic to convert each field into the wikiq tsv format goes here.
@@ -732,16 +732,22 @@ class WikiqParser:
print(line, file=self.output_file)
def open_input_file(input_filename) -> TextIOWrapper | IO[Any] | IO[bytes]:
def match_archive_suffix(input_filename):
if re.match(r'.*\.7z$', input_filename):
cmd = ["7za", "x", "-so", input_filename, "*.xml"]
cmd = ["7za", "x", "-so", input_filename]
elif re.match(r'.*\.gz$', input_filename):
cmd = ["zcat", input_filename]
elif re.match(r'.*\.bz2$', input_filename):
cmd = ["bzcat", "-dk", input_filename]
else:
raise ValueError("Unrecognized file type: %s" % input_filename)
return cmd
def open_input_file(input_filename, fandom_2020=False):
cmd = match_archive_suffix(input_filename)
if fandom_2020:
cmd.append("*.xml")
try:
return Popen(cmd, stdout=PIPE).stdout
except NameError:
@@ -814,6 +820,10 @@ def main():
action='append',
help="The label for the outputted column based on matching the regex in comments.")
parser.add_argument('--fandom-2020', dest="fandom_2020",
action='store_true',
help="Whether the archive is from the fandom 2020 dumps by Wikiteam. These dumps can have multiple .xml files in their archives.")
args = parser.parse_args()
# set persistence method
@@ -835,7 +845,7 @@ def main():
if len(args.dumpfiles) > 0:
output_parquet = False
for filename in args.dumpfiles:
input_file = open_input_file(filename)
input_file = open_input_file(filename, args.fandom_2020)
# open directory for output
if args.output_dir: