make --resume work with partitioned namespaces.

This commit is contained in:
Nathan TeBlunthuis 2025-12-01 07:19:52 -08:00
parent 3c26185739
commit b46f98a875
2 changed files with 257 additions and 54 deletions

View File

@ -347,13 +347,24 @@ class WikiqParser:
# Track whether we've passed the resume point # Track whether we've passed the resume point
found_resume_point = self.resume_from_revid is None found_resume_point = self.resume_from_revid is None
# When resuming with parquet, write new data to temp file and merge at the end # When resuming with parquet, write new data to temp file/directory and merge at the end
original_output_file = None original_output_file = None
temp_output_file = None temp_output_file = None
if self.resume_from_revid is not None and self.output_parquet: if self.resume_from_revid is not None and self.output_parquet:
if isinstance(self.output_file, str) and os.path.exists(self.output_file): if isinstance(self.output_file, str) and os.path.exists(self.output_file):
original_output_file = self.output_file original_output_file = self.output_file
# For partitioned namespaces, create a temp directory; for single files, create a temp file path
temp_output_file = self.output_file + ".resume_temp" temp_output_file = self.output_file + ".resume_temp"
# Remove temp file/dir if it exists from a previous failed run
if os.path.exists(temp_output_file):
import shutil
if os.path.isdir(temp_output_file):
shutil.rmtree(temp_output_file)
else:
os.remove(temp_output_file)
# For partitioned namespaces, create the directory now; for single files it will be created by ParquetWriter
if self.partition_namespaces:
os.makedirs(temp_output_file, exist_ok=True)
self.output_file = temp_output_file self.output_file = temp_output_file
# Construct dump file iterator # Construct dump file iterator
@ -780,43 +791,26 @@ class WikiqParser:
if original_output_file is not None and temp_output_file is not None: if original_output_file is not None and temp_output_file is not None:
print("Merging resumed data with existing output...", file=sys.stderr) print("Merging resumed data with existing output...", file=sys.stderr)
try: try:
# Create a merged output file # Check if we're merging partitioned namespaces or single files
merged_output_file = original_output_file + ".merged" if os.path.isdir(original_output_file):
# Merge partitioned namespace directories
self._merge_partitioned_namespaces(original_output_file, temp_output_file)
else:
# Merge single parquet files
merged_output_file = original_output_file + ".merged"
merge_parquet_files(original_output_file, temp_output_file, merged_output_file)
# Open the original file # Replace the original file with the merged file
original_pq = pq.ParquetFile(original_output_file) os.remove(original_output_file)
# Open the temp file os.rename(merged_output_file, original_output_file)
temp_pq = pq.ParquetFile(temp_output_file)
# Create a writer for the merged file # Clean up the temp file/directory
merged_writer = None if os.path.exists(temp_output_file):
if os.path.isdir(temp_output_file):
# Copy all row groups from the original file import shutil
for i in range(original_pq.num_row_groups): shutil.rmtree(temp_output_file)
row_group = original_pq.read_row_group(i) else:
if merged_writer is None: os.remove(temp_output_file)
merged_writer = pq.ParquetWriter(
merged_output_file,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
# Replace the original file with the merged file
os.remove(original_output_file)
os.rename(merged_output_file, original_output_file)
# Clean up the temp file
os.remove(temp_output_file)
print("Merge complete.", file=sys.stderr) print("Merge complete.", file=sys.stderr)
except Exception as e: except Exception as e:
@ -824,6 +818,50 @@ class WikiqParser:
print(f"New data saved in: {temp_output_file}", file=sys.stderr) print(f"New data saved in: {temp_output_file}", file=sys.stderr)
raise raise
def _merge_partitioned_namespaces(self, original_output_dir, temp_output_dir):
"""
Merge partitioned namespace directories.
For each namespace partition in the temp directory, merge its parquet files with the original.
"""
import shutil
# Get all namespace directories from temp
temp_namespace_dirs = [d for d in os.listdir(temp_output_dir) if d.startswith('namespace=')]
for ns_dir in temp_namespace_dirs:
temp_ns_path = os.path.join(temp_output_dir, ns_dir)
original_ns_path = os.path.join(original_output_dir, ns_dir)
# Find parquet files in the temp namespace directory
temp_parquet_files = [f for f in os.listdir(temp_ns_path) if f.endswith('.parquet')]
if not temp_parquet_files:
continue
temp_parquet_path = os.path.join(temp_ns_path, temp_parquet_files[0])
# Check if the namespace partition exists in the original directory
if os.path.exists(original_ns_path):
# Namespace partition exists, merge the files
original_parquet_files = [f for f in os.listdir(original_ns_path) if f.endswith('.parquet')]
if not original_parquet_files:
# No parquet file in original, just copy the temp file
shutil.copy(temp_parquet_path, os.path.join(original_ns_path, temp_parquet_files[0]))
else:
original_parquet_path = os.path.join(original_ns_path, original_parquet_files[0])
merged_parquet_path = original_parquet_path + ".merged"
# Merge the files
merge_parquet_files(original_parquet_path, temp_parquet_path, merged_parquet_path)
# Replace the original file with the merged file
os.remove(original_parquet_path)
os.rename(merged_parquet_path, original_parquet_path)
else:
# Namespace partition doesn't exist in original, create it
shutil.copytree(temp_ns_path, original_ns_path)
def match_archive_suffix(input_filename): def match_archive_suffix(input_filename):
if re.match(r".*\.7z$", input_filename): if re.match(r".*\.7z$", input_filename):
@ -864,37 +902,108 @@ def open_output_file(input_filename):
return output_file return output_file
def merge_parquet_files(original_path, temp_path, merged_path):
"""
Merge two parquet files by copying all row groups from original and temp into merged.
"""
original_pq = pq.ParquetFile(original_path)
temp_pq = pq.ParquetFile(temp_path)
merged_writer = None
# Copy all row groups from the original file
for i in range(original_pq.num_row_groups):
row_group = original_pq.read_row_group(i)
if merged_writer is None:
merged_writer = pq.ParquetWriter(
merged_path,
row_group.schema,
flavor="spark"
)
merged_writer.write_table(row_group)
# Append all row groups from the temp file
for i in range(temp_pq.num_row_groups):
row_group = temp_pq.read_row_group(i)
merged_writer.write_table(row_group)
# Close the writer
if merged_writer is not None:
merged_writer.close()
def get_last_revid_from_parquet(output_file): def get_last_revid_from_parquet(output_file):
""" """
Read the last revid from a parquet file by reading only the last row group. Read the last revid from a parquet file or partitioned namespace directory.
Returns None if the file doesn't exist or is empty. Returns None if the file doesn't exist or is empty.
Handles both single files and partitioned namespace structures (namespace=*/file.parquet).
For partitioned namespaces, finds the most recently modified partition and reads from it.
""" """
try: try:
file_path = output_file if not os.path.exists(output_file):
if not os.path.exists(file_path):
return None return None
# Open the parquet file # Check if this is a partitioned namespace directory
parquet_file = pq.ParquetFile(file_path) if os.path.isdir(output_file):
# Find all namespace=* subdirectories
namespace_dirs = [d for d in os.listdir(output_file) if d.startswith('namespace=')]
# Get the number of row groups if not namespace_dirs:
num_row_groups = parquet_file.num_row_groups return None
if num_row_groups == 0: # Find the most recently modified namespace partition
return None most_recent_ns = None
most_recent_mtime = -1
# Read only the last row group, and only the revid column for ns_dir in namespace_dirs:
last_row_group = parquet_file.read_row_group(num_row_groups - 1, columns=['revid']) ns_path = os.path.join(output_file, ns_dir)
mtime = os.path.getmtime(ns_path)
if mtime > most_recent_mtime:
most_recent_mtime = mtime
most_recent_ns = ns_path
if last_row_group.num_rows == 0: if most_recent_ns is None:
return None return None
# Get the last revid from this row group # Find the parquet file in the most recent namespace directory
last_revid = last_row_group.column('revid')[-1].as_py() parquet_files = [f for f in os.listdir(most_recent_ns) if f.endswith('.parquet')]
return last_revid
if not parquet_files:
return None
parquet_path = os.path.join(most_recent_ns, parquet_files[0])
parquet_file = pq.ParquetFile(parquet_path)
if parquet_file.num_row_groups == 0:
return None
# Read only the last row group, and only the revid column
last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid'])
if last_row_group.num_rows == 0:
return None
# Get the last revid from this row group
last_revid = last_row_group.column('revid')[-1].as_py()
return last_revid
else:
# Single parquet file
parquet_file = pq.ParquetFile(output_file)
if parquet_file.num_row_groups == 0:
return None
# Read only the last row group, and only the revid column
last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid'])
if last_row_group.num_rows == 0:
return None
# Get the last revid from this row group
last_revid = last_row_group.column('revid')[-1].as_py()
return last_revid
except Exception as e: except Exception as e:
print(f"Error reading last revid from {file_path}: {e}", file=sys.stderr) print(f"Error reading last revid from {output_file}: {e}", file=sys.stderr)
return None return None

View File

@ -7,6 +7,7 @@ from typing import Final, Union
import pytest import pytest
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pyarrow as pa
from pandas import DataFrame from pandas import DataFrame
from pandas.testing import assert_frame_equal, assert_series_equal from pandas.testing import assert_frame_equal, assert_series_equal
@ -536,3 +537,96 @@ def test_resume_with_diff():
assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False)
print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows")
def test_resume_with_partition_namespaces():
"""Test that --resume works correctly with --partition-namespaces."""
import pyarrow.parquet as pq
# First, create a complete baseline output with partition-namespaces
tester_full = WikiqTester(SAILORMOON, "resume_partition_full", in_compression="7z", out_format="parquet")
try:
tester_full.call_wikiq("--partition-namespaces", "--fandom-2020")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the full output from the partitioned directory
full_output_dir = tester_full.output
namespace_dirs = [d for d in os.listdir(full_output_dir) if d.startswith('namespace=')]
if not namespace_dirs:
pytest.fail("No namespace directories found in output")
# Collect all revisions from all namespaces
full_revids = []
for ns_dir in sorted(namespace_dirs):
parquet_files = [f for f in os.listdir(os.path.join(full_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(full_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
full_revids.extend(revids)
full_revids_sorted = sorted(set(full_revids))
total_revisions = len(full_revids_sorted)
# Get a revid about 1/3 through to use as the resume point
resume_idx = total_revisions // 3
resume_revid = full_revids_sorted[resume_idx]
print(f"Total revisions: {total_revisions}, Resume point: {resume_idx}, Resume revid: {resume_revid}")
# Create a partial output by manually creating the partitioned structure
tester_partial = WikiqTester(SAILORMOON, "resume_partition_partial", in_compression="7z", out_format="parquet")
partial_output_dir = tester_partial.output
# Copy the full partitioned output to the partial directory
for ns_dir in namespace_dirs:
src_ns_path = os.path.join(full_output_dir, ns_dir)
dst_ns_path = os.path.join(partial_output_dir, ns_dir)
shutil.copytree(src_ns_path, dst_ns_path)
# Now filter each namespace file to only include revisions up to resume_idx
revised_data_count = 0
for ns_dir in namespace_dirs:
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read()
# Filter to only rows up to the resume point
revids = table.column('revid').to_pylist()
mask = pa.array([revid <= resume_revid for revid in revids], type=pa.bool_())
partial_table = table.filter(mask)
revised_data_count += len(partial_table)
# Write back the filtered data
pq.write_table(partial_table, ns_parquet_path)
print(f"Created partial output with {revised_data_count} revisions (up to revid {resume_revid})")
# Now resume from the partial output
try:
tester_partial.call_wikiq("--partition-namespaces", "--fandom-2020", "--resume")
except subprocess.CalledProcessError as exc:
pytest.fail(exc.stderr.decode("utf8"))
# Read the resumed output and collect revids
resumed_revids = []
for ns_dir in namespace_dirs:
parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')]
if parquet_files:
ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0])
pf = pq.ParquetFile(ns_parquet_path)
table = pf.read(columns=['revid'])
revids = table.column('revid').to_pylist()
resumed_revids.extend(revids)
resumed_revids_sorted = sorted(set(resumed_revids))
# Compare the revids
assert resumed_revids_sorted == full_revids_sorted, f"Resumed revids mismatch: {len(resumed_revids_sorted)} vs {len(full_revids_sorted)}"
print(f"Resume with partition-namespaces test passed! Original: {len(full_revids_sorted)} revisions, Resumed: {len(resumed_revids_sorted)} revisions")