add test for two wikiq jobs in the same directory.
This commit is contained in:
@@ -399,3 +399,68 @@ def test_cleanup_original_missing_temp_valid_no_checkpoint():
|
||||
assert resume_point == (30, 300), f"Expected (30, 300), got {resume_point}"
|
||||
|
||||
print("Original missing, temp valid, no checkpoint test passed!")
|
||||
|
||||
|
||||
def test_concurrent_jobs_different_input_files():
|
||||
"""Test that merge only processes temp files for the current input file.
|
||||
|
||||
When multiple wikiq processes write to the same partitioned output directory
|
||||
with different input files, each process should only merge its own temp files.
|
||||
"""
|
||||
from wikiq.resume import merge_partitioned_namespaces
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create partitioned output structure
|
||||
ns0_dir = os.path.join(tmpdir, "namespace=0")
|
||||
ns1_dir = os.path.join(tmpdir, "namespace=1")
|
||||
os.makedirs(ns0_dir)
|
||||
os.makedirs(ns1_dir)
|
||||
|
||||
# Simulate two different input files producing output
|
||||
file1 = "enwiki-20250123-pages-meta-history24-p1p100.parquet"
|
||||
file2 = "enwiki-20250123-pages-meta-history24-p101p200.parquet"
|
||||
|
||||
# Create original and temp files for file1
|
||||
table1_orig = pa.table({"articleid": [1, 2], "revid": [10, 20]})
|
||||
table1_temp = pa.table({"articleid": [3, 4], "revid": [30, 40]})
|
||||
pq.write_table(table1_orig, os.path.join(ns0_dir, file1))
|
||||
pq.write_table(table1_temp, os.path.join(ns0_dir, file1 + ".resume_temp"))
|
||||
pq.write_table(table1_orig, os.path.join(ns1_dir, file1))
|
||||
pq.write_table(table1_temp, os.path.join(ns1_dir, file1 + ".resume_temp"))
|
||||
|
||||
# Create original and temp files for file2 (simulating another concurrent job)
|
||||
table2_orig = pa.table({"articleid": [100, 200], "revid": [1000, 2000]})
|
||||
table2_temp = pa.table({"articleid": [300, 400], "revid": [3000, 4000]})
|
||||
pq.write_table(table2_orig, os.path.join(ns0_dir, file2))
|
||||
pq.write_table(table2_temp, os.path.join(ns0_dir, file2 + ".resume_temp"))
|
||||
pq.write_table(table2_orig, os.path.join(ns1_dir, file2))
|
||||
pq.write_table(table2_temp, os.path.join(ns1_dir, file2 + ".resume_temp"))
|
||||
|
||||
# Merge only file1's temp files
|
||||
merge_partitioned_namespaces(tmpdir, ".resume_temp", file1)
|
||||
|
||||
# Verify file1's temp files were merged and removed
|
||||
assert not os.path.exists(os.path.join(ns0_dir, file1 + ".resume_temp")), \
|
||||
"file1 temp should be merged in ns0"
|
||||
assert not os.path.exists(os.path.join(ns1_dir, file1 + ".resume_temp")), \
|
||||
"file1 temp should be merged in ns1"
|
||||
|
||||
# Verify file1's original now has merged data
|
||||
merged1_ns0 = pq.read_table(os.path.join(ns0_dir, file1))
|
||||
merged1_ns1 = pq.read_table(os.path.join(ns1_dir, file1))
|
||||
assert merged1_ns0.num_rows == 4, f"file1 ns0 should have 4 rows after merge, got {merged1_ns0.num_rows}"
|
||||
assert merged1_ns1.num_rows == 4, f"file1 ns1 should have 4 rows after merge, got {merged1_ns1.num_rows}"
|
||||
|
||||
# Verify file2's temp files are UNTOUCHED (still exist)
|
||||
assert os.path.exists(os.path.join(ns0_dir, file2 + ".resume_temp")), \
|
||||
"file2 temp should NOT be touched in ns0"
|
||||
assert os.path.exists(os.path.join(ns1_dir, file2 + ".resume_temp")), \
|
||||
"file2 temp should NOT be touched in ns1"
|
||||
|
||||
# Verify file2's original is unchanged
|
||||
orig2_ns0 = pq.read_table(os.path.join(ns0_dir, file2))
|
||||
orig2_ns1 = pq.read_table(os.path.join(ns1_dir, file2))
|
||||
assert orig2_ns0.num_rows == 2, "file2 ns0 should still have 2 rows"
|
||||
assert orig2_ns1.num_rows == 2, "file2 ns1 should still have 2 rows"
|
||||
|
||||
print("Concurrent jobs with different input files test passed!")
|
||||
|
||||
Reference in New Issue
Block a user