From b46f98a875d3547f091f38d900e578e634e93a57 Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Mon, 1 Dec 2025 07:19:52 -0800 Subject: [PATCH] make --resume work with partitioned namespaces. --- src/wikiq/__init__.py | 217 ++++++++++++++++++++++++++++++---------- test/Wikiq_Unit_Test.py | 94 +++++++++++++++++ 2 files changed, 257 insertions(+), 54 deletions(-) diff --git a/src/wikiq/__init__.py b/src/wikiq/__init__.py index aff8981..22ea3e2 100755 --- a/src/wikiq/__init__.py +++ b/src/wikiq/__init__.py @@ -347,13 +347,24 @@ class WikiqParser: # Track whether we've passed the resume point found_resume_point = self.resume_from_revid is None - # When resuming with parquet, write new data to temp file and merge at the end + # When resuming with parquet, write new data to temp file/directory and merge at the end original_output_file = None temp_output_file = None if self.resume_from_revid is not None and self.output_parquet: if isinstance(self.output_file, str) and os.path.exists(self.output_file): original_output_file = self.output_file + # For partitioned namespaces, create a temp directory; for single files, create a temp file path temp_output_file = self.output_file + ".resume_temp" + # Remove temp file/dir if it exists from a previous failed run + if os.path.exists(temp_output_file): + import shutil + if os.path.isdir(temp_output_file): + shutil.rmtree(temp_output_file) + else: + os.remove(temp_output_file) + # For partitioned namespaces, create the directory now; for single files it will be created by ParquetWriter + if self.partition_namespaces: + os.makedirs(temp_output_file, exist_ok=True) self.output_file = temp_output_file # Construct dump file iterator @@ -780,43 +791,26 @@ class WikiqParser: if original_output_file is not None and temp_output_file is not None: print("Merging resumed data with existing output...", file=sys.stderr) try: - # Create a merged output file - merged_output_file = original_output_file + ".merged" + # Check if we're merging partitioned namespaces or single files + if os.path.isdir(original_output_file): + # Merge partitioned namespace directories + self._merge_partitioned_namespaces(original_output_file, temp_output_file) + else: + # Merge single parquet files + merged_output_file = original_output_file + ".merged" + merge_parquet_files(original_output_file, temp_output_file, merged_output_file) - # Open the original file - original_pq = pq.ParquetFile(original_output_file) - # Open the temp file - temp_pq = pq.ParquetFile(temp_output_file) + # Replace the original file with the merged file + os.remove(original_output_file) + os.rename(merged_output_file, original_output_file) - # Create a writer for the merged file - merged_writer = None - - # Copy all row groups from the original file - for i in range(original_pq.num_row_groups): - row_group = original_pq.read_row_group(i) - if merged_writer is None: - merged_writer = pq.ParquetWriter( - merged_output_file, - row_group.schema, - flavor="spark" - ) - merged_writer.write_table(row_group) - - # Append all row groups from the temp file - for i in range(temp_pq.num_row_groups): - row_group = temp_pq.read_row_group(i) - merged_writer.write_table(row_group) - - # Close the writer - if merged_writer is not None: - merged_writer.close() - - # Replace the original file with the merged file - os.remove(original_output_file) - os.rename(merged_output_file, original_output_file) - - # Clean up the temp file - os.remove(temp_output_file) + # Clean up the temp file/directory + if os.path.exists(temp_output_file): + if os.path.isdir(temp_output_file): + import shutil + shutil.rmtree(temp_output_file) + else: + os.remove(temp_output_file) print("Merge complete.", file=sys.stderr) except Exception as e: @@ -824,6 +818,50 @@ class WikiqParser: print(f"New data saved in: {temp_output_file}", file=sys.stderr) raise + def _merge_partitioned_namespaces(self, original_output_dir, temp_output_dir): + """ + Merge partitioned namespace directories. + For each namespace partition in the temp directory, merge its parquet files with the original. + """ + import shutil + + # Get all namespace directories from temp + temp_namespace_dirs = [d for d in os.listdir(temp_output_dir) if d.startswith('namespace=')] + + for ns_dir in temp_namespace_dirs: + temp_ns_path = os.path.join(temp_output_dir, ns_dir) + original_ns_path = os.path.join(original_output_dir, ns_dir) + + # Find parquet files in the temp namespace directory + temp_parquet_files = [f for f in os.listdir(temp_ns_path) if f.endswith('.parquet')] + + if not temp_parquet_files: + continue + + temp_parquet_path = os.path.join(temp_ns_path, temp_parquet_files[0]) + + # Check if the namespace partition exists in the original directory + if os.path.exists(original_ns_path): + # Namespace partition exists, merge the files + original_parquet_files = [f for f in os.listdir(original_ns_path) if f.endswith('.parquet')] + + if not original_parquet_files: + # No parquet file in original, just copy the temp file + shutil.copy(temp_parquet_path, os.path.join(original_ns_path, temp_parquet_files[0])) + else: + original_parquet_path = os.path.join(original_ns_path, original_parquet_files[0]) + merged_parquet_path = original_parquet_path + ".merged" + + # Merge the files + merge_parquet_files(original_parquet_path, temp_parquet_path, merged_parquet_path) + + # Replace the original file with the merged file + os.remove(original_parquet_path) + os.rename(merged_parquet_path, original_parquet_path) + else: + # Namespace partition doesn't exist in original, create it + shutil.copytree(temp_ns_path, original_ns_path) + def match_archive_suffix(input_filename): if re.match(r".*\.7z$", input_filename): @@ -864,37 +902,108 @@ def open_output_file(input_filename): return output_file +def merge_parquet_files(original_path, temp_path, merged_path): + """ + Merge two parquet files by copying all row groups from original and temp into merged. + """ + original_pq = pq.ParquetFile(original_path) + temp_pq = pq.ParquetFile(temp_path) + + merged_writer = None + + # Copy all row groups from the original file + for i in range(original_pq.num_row_groups): + row_group = original_pq.read_row_group(i) + if merged_writer is None: + merged_writer = pq.ParquetWriter( + merged_path, + row_group.schema, + flavor="spark" + ) + merged_writer.write_table(row_group) + + # Append all row groups from the temp file + for i in range(temp_pq.num_row_groups): + row_group = temp_pq.read_row_group(i) + merged_writer.write_table(row_group) + + # Close the writer + if merged_writer is not None: + merged_writer.close() + + def get_last_revid_from_parquet(output_file): """ - Read the last revid from a parquet file by reading only the last row group. + Read the last revid from a parquet file or partitioned namespace directory. Returns None if the file doesn't exist or is empty. + Handles both single files and partitioned namespace structures (namespace=*/file.parquet). + For partitioned namespaces, finds the most recently modified partition and reads from it. """ try: - file_path = output_file - - if not os.path.exists(file_path): + if not os.path.exists(output_file): return None - # Open the parquet file - parquet_file = pq.ParquetFile(file_path) + # Check if this is a partitioned namespace directory + if os.path.isdir(output_file): + # Find all namespace=* subdirectories + namespace_dirs = [d for d in os.listdir(output_file) if d.startswith('namespace=')] - # Get the number of row groups - num_row_groups = parquet_file.num_row_groups + if not namespace_dirs: + return None - if num_row_groups == 0: - return None + # Find the most recently modified namespace partition + most_recent_ns = None + most_recent_mtime = -1 - # Read only the last row group, and only the revid column - last_row_group = parquet_file.read_row_group(num_row_groups - 1, columns=['revid']) + for ns_dir in namespace_dirs: + ns_path = os.path.join(output_file, ns_dir) + mtime = os.path.getmtime(ns_path) + if mtime > most_recent_mtime: + most_recent_mtime = mtime + most_recent_ns = ns_path - if last_row_group.num_rows == 0: - return None + if most_recent_ns is None: + return None - # Get the last revid from this row group - last_revid = last_row_group.column('revid')[-1].as_py() - return last_revid + # Find the parquet file in the most recent namespace directory + parquet_files = [f for f in os.listdir(most_recent_ns) if f.endswith('.parquet')] + + if not parquet_files: + return None + + parquet_path = os.path.join(most_recent_ns, parquet_files[0]) + parquet_file = pq.ParquetFile(parquet_path) + + if parquet_file.num_row_groups == 0: + return None + + # Read only the last row group, and only the revid column + last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid']) + + if last_row_group.num_rows == 0: + return None + + # Get the last revid from this row group + last_revid = last_row_group.column('revid')[-1].as_py() + return last_revid + else: + # Single parquet file + parquet_file = pq.ParquetFile(output_file) + + if parquet_file.num_row_groups == 0: + return None + + # Read only the last row group, and only the revid column + last_row_group = parquet_file.read_row_group(parquet_file.num_row_groups - 1, columns=['revid']) + + if last_row_group.num_rows == 0: + return None + + # Get the last revid from this row group + last_revid = last_row_group.column('revid')[-1].as_py() + return last_revid except Exception as e: - print(f"Error reading last revid from {file_path}: {e}", file=sys.stderr) + print(f"Error reading last revid from {output_file}: {e}", file=sys.stderr) return None diff --git a/test/Wikiq_Unit_Test.py b/test/Wikiq_Unit_Test.py index 421bdfd..b351115 100644 --- a/test/Wikiq_Unit_Test.py +++ b/test/Wikiq_Unit_Test.py @@ -7,6 +7,7 @@ from typing import Final, Union import pytest import numpy as np import pandas as pd +import pyarrow as pa from pandas import DataFrame from pandas.testing import assert_frame_equal, assert_series_equal @@ -536,3 +537,96 @@ def test_resume_with_diff(): assert_frame_equal(resumed_df, full_df, check_like=True, check_dtype=False) print(f"Resume with diff test passed! Original: {len(full_df)} rows, Resumed: {len(resumed_df)} rows") + +def test_resume_with_partition_namespaces(): + """Test that --resume works correctly with --partition-namespaces.""" + import pyarrow.parquet as pq + + # First, create a complete baseline output with partition-namespaces + tester_full = WikiqTester(SAILORMOON, "resume_partition_full", in_compression="7z", out_format="parquet") + + try: + tester_full.call_wikiq("--partition-namespaces", "--fandom-2020") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + # Read the full output from the partitioned directory + full_output_dir = tester_full.output + namespace_dirs = [d for d in os.listdir(full_output_dir) if d.startswith('namespace=')] + + if not namespace_dirs: + pytest.fail("No namespace directories found in output") + + # Collect all revisions from all namespaces + full_revids = [] + for ns_dir in sorted(namespace_dirs): + parquet_files = [f for f in os.listdir(os.path.join(full_output_dir, ns_dir)) if f.endswith('.parquet')] + if parquet_files: + ns_parquet_path = os.path.join(full_output_dir, ns_dir, parquet_files[0]) + pf = pq.ParquetFile(ns_parquet_path) + table = pf.read(columns=['revid']) + revids = table.column('revid').to_pylist() + full_revids.extend(revids) + + full_revids_sorted = sorted(set(full_revids)) + total_revisions = len(full_revids_sorted) + + # Get a revid about 1/3 through to use as the resume point + resume_idx = total_revisions // 3 + resume_revid = full_revids_sorted[resume_idx] + + print(f"Total revisions: {total_revisions}, Resume point: {resume_idx}, Resume revid: {resume_revid}") + + # Create a partial output by manually creating the partitioned structure + tester_partial = WikiqTester(SAILORMOON, "resume_partition_partial", in_compression="7z", out_format="parquet") + partial_output_dir = tester_partial.output + + # Copy the full partitioned output to the partial directory + for ns_dir in namespace_dirs: + src_ns_path = os.path.join(full_output_dir, ns_dir) + dst_ns_path = os.path.join(partial_output_dir, ns_dir) + shutil.copytree(src_ns_path, dst_ns_path) + + # Now filter each namespace file to only include revisions up to resume_idx + revised_data_count = 0 + for ns_dir in namespace_dirs: + parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')] + if parquet_files: + ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0]) + pf = pq.ParquetFile(ns_parquet_path) + table = pf.read() + + # Filter to only rows up to the resume point + revids = table.column('revid').to_pylist() + mask = pa.array([revid <= resume_revid for revid in revids], type=pa.bool_()) + partial_table = table.filter(mask) + revised_data_count += len(partial_table) + + # Write back the filtered data + pq.write_table(partial_table, ns_parquet_path) + + print(f"Created partial output with {revised_data_count} revisions (up to revid {resume_revid})") + + # Now resume from the partial output + try: + tester_partial.call_wikiq("--partition-namespaces", "--fandom-2020", "--resume") + except subprocess.CalledProcessError as exc: + pytest.fail(exc.stderr.decode("utf8")) + + # Read the resumed output and collect revids + resumed_revids = [] + for ns_dir in namespace_dirs: + parquet_files = [f for f in os.listdir(os.path.join(partial_output_dir, ns_dir)) if f.endswith('.parquet')] + if parquet_files: + ns_parquet_path = os.path.join(partial_output_dir, ns_dir, parquet_files[0]) + pf = pq.ParquetFile(ns_parquet_path) + table = pf.read(columns=['revid']) + revids = table.column('revid').to_pylist() + resumed_revids.extend(revids) + + resumed_revids_sorted = sorted(set(resumed_revids)) + + # Compare the revids + assert resumed_revids_sorted == full_revids_sorted, f"Resumed revids mismatch: {len(resumed_revids_sorted)} vs {len(full_revids_sorted)}" + + print(f"Resume with partition-namespaces test passed! Original: {len(full_revids_sorted)} revisions, Resumed: {len(resumed_revids_sorted)} revisions")