From 6a4bf81e1a8fc0ce25b1e026ba1e617d18aaf8aa Mon Sep 17 00:00:00 2001 From: Nathan TeBlunthuis Date: Fri, 19 Dec 2025 11:50:56 -0800 Subject: [PATCH] add test for two wikiq jobs in the same directory. --- test/test_resume.py | 65 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/test_resume.py b/test/test_resume.py index b99661d..b37cc30 100644 --- a/test/test_resume.py +++ b/test/test_resume.py @@ -399,3 +399,68 @@ def test_cleanup_original_missing_temp_valid_no_checkpoint(): assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}" print("Original missing, temp valid, no checkpoint test passed!") + + +def test_concurrent_jobs_different_input_files(): + """Test that merge only processes temp files for the current input file. + + When multiple wikiq processes write to the same partitioned output directory + with different input files, each process should only merge its own temp files. + """ + from wikiq.resume import merge_partitioned_namespaces + + with tempfile.TemporaryDirectory() as tmpdir: + # Create partitioned output structure + ns0_dir = os.path.join(tmpdir, "namespace=0") + ns1_dir = os.path.join(tmpdir, "namespace=1") + os.makedirs(ns0_dir) + os.makedirs(ns1_dir) + + # Simulate two different input files producing output + file1 = "enwiki-20250123-pages-meta-history24-p1p100.parquet" + file2 = "enwiki-20250123-pages-meta-history24-p101p200.parquet" + + # Create original and temp files for file1 + table1_orig = pa.table({"articleid": [1, 2], "revid": [10, 20]}) + table1_temp = pa.table({"articleid": [3, 4], "revid": [30, 40]}) + pq.write_table(table1_orig, os.path.join(ns0_dir, file1)) + pq.write_table(table1_temp, os.path.join(ns0_dir, file1 + ".resume_temp")) + pq.write_table(table1_orig, os.path.join(ns1_dir, file1)) + pq.write_table(table1_temp, os.path.join(ns1_dir, file1 + ".resume_temp")) + + # Create original and temp files for file2 (simulating another concurrent job) + table2_orig = pa.table({"articleid": [100, 200], "revid": [1000, 2000]}) + table2_temp = pa.table({"articleid": [300, 400], "revid": [3000, 4000]}) + pq.write_table(table2_orig, os.path.join(ns0_dir, file2)) + pq.write_table(table2_temp, os.path.join(ns0_dir, file2 + ".resume_temp")) + pq.write_table(table2_orig, os.path.join(ns1_dir, file2)) + pq.write_table(table2_temp, os.path.join(ns1_dir, file2 + ".resume_temp")) + + # Merge only file1's temp files + merge_partitioned_namespaces(tmpdir, ".resume_temp", file1) + + # Verify file1's temp files were merged and removed + assert not os.path.exists(os.path.join(ns0_dir, file1 + ".resume_temp")), \ + "file1 temp should be merged in ns0" + assert not os.path.exists(os.path.join(ns1_dir, file1 + ".resume_temp")), \ + "file1 temp should be merged in ns1" + + # Verify file1's original now has merged data + merged1_ns0 = pq.read_table(os.path.join(ns0_dir, file1)) + merged1_ns1 = pq.read_table(os.path.join(ns1_dir, file1)) + assert merged1_ns0.num_rows == 4, f"file1 ns0 should have 4 rows after merge, got {merged1_ns0.num_rows}" + assert merged1_ns1.num_rows == 4, f"file1 ns1 should have 4 rows after merge, got {merged1_ns1.num_rows}" + + # Verify file2's temp files are UNTOUCHED (still exist) + assert os.path.exists(os.path.join(ns0_dir, file2 + ".resume_temp")), \ + "file2 temp should NOT be touched in ns0" + assert os.path.exists(os.path.join(ns1_dir, file2 + ".resume_temp")), \ + "file2 temp should NOT be touched in ns1" + + # Verify file2's original is unchanged + orig2_ns0 = pq.read_table(os.path.join(ns0_dir, file2)) + orig2_ns1 = pq.read_table(os.path.join(ns1_dir, file2)) + assert orig2_ns0.num_rows == 2, "file2 ns0 should still have 2 rows" + assert orig2_ns1.num_rows == 2, "file2 ns1 should still have 2 rows" + + print("Concurrent jobs with different input files test passed!")