249 lines
8.7 KiB
Python
249 lines
8.7 KiB
Python
import unittest
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
from shutil import copyfile
|
|
import pdb
|
|
|
|
# with / without pwr DONE
|
|
# with / without url encode DONE
|
|
# with / without collapse user DONE
|
|
# with output to sdtout DONE
|
|
# note that the persistence radius is 7 by default
|
|
# reading various file formats including
|
|
# 7z, gz, bz2, xml DONE
|
|
# wikia and wikipedia data DONE
|
|
# malformed xmls DONE
|
|
|
|
class Test_Wikipedia(unittest.TestCase):
|
|
def setUp(self):
|
|
if not os.path.exists("test_output"):
|
|
os.mkdir("test_output")
|
|
|
|
self.wiki = 'ikwiki-20180301-pages-meta-history'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.test_output_dir = os.path.join(".", "test_output")
|
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.bz2".format(self.wiki)
|
|
self.base_call = "../wikiq {0} -o {1}"
|
|
self.input_dir = "dumps"
|
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
|
self.baseline_output_dir = "baseline_output"
|
|
|
|
def test_WP_url_encode(self):
|
|
test_filename = "url-encode_" + self.wikiq_out_name
|
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
call = call + " --url-encode"
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
proc.wait()
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test_lines = open(test_file)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
test_lines.close()
|
|
baseline_lines.close()
|
|
|
|
|
|
class Test_Basic(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
if not os.path.exists("test_output"):
|
|
os.mkdir("test_output")
|
|
|
|
self.wiki = 'sailormoon'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.test_output_dir = os.path.join(".", "test_output")
|
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = "../wikiq {0} -o {1}"
|
|
self.input_dir = "dumps"
|
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
|
self.baseline_output_dir = "baseline_output"
|
|
|
|
def test_noargs(self):
|
|
|
|
test_filename = "noargs_" + self.wikiq_out_name
|
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
proc.wait()
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test_lines = open(test_file)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test, baseline)
|
|
|
|
test_lines.close()
|
|
baseline_lines.close()
|
|
|
|
def test_collapse_user(self):
|
|
test_filename = "collapse-user_" + self.wikiq_out_name
|
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
call = call + " --collapse-user"
|
|
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
proc.wait()
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test_lines = open(test_file)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
test_lines.close()
|
|
baseline_lines.close()
|
|
|
|
def test_pwr_legacy(self):
|
|
test_filename = "persistence_legacy_" + self.wikiq_out_name
|
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
call = call + " --persistence-legacy"
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
proc.wait()
|
|
|
|
|
|
copyfile(self.call_output, test_file)
|
|
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test_lines = open(test_file)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
test_lines.close()
|
|
baseline_lines.close()
|
|
|
|
def test_url_encode(self):
|
|
test_filename = "url-encode_" + self.wikiq_out_name
|
|
|
|
test_file = os.path.join(self.test_output_dir, test_filename)
|
|
if os.path.exists(test_file):
|
|
os.remove(test_file)
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
call = call + " --url-encode"
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
|
|
proc.wait()
|
|
|
|
copyfile(self.call_output, test_file)
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
|
|
|
|
test_lines = open(test_file)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
test_lines.close()
|
|
baseline_lines.close()
|
|
|
|
class Test_Malformed(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
if not os.path.exists("test_output"):
|
|
os.mkdir("test_output")
|
|
|
|
self.wiki = 'twinpeaks'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
self.test_output_dir = os.path.join(".", "test_output")
|
|
self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = "../wikiq {0} -o {1}"
|
|
self.input_dir = "dumps"
|
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
|
|
|
|
|
def test_malformed_noargs(self):
|
|
|
|
call = self.base_call.format(self.input_file, self.test_output_dir)
|
|
proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
|
proc.wait()
|
|
outs, errs = proc.communicate()
|
|
errlines = str(errs).split("\\n")
|
|
self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
|
|
|
|
class Test_Stdout(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.wiki = 'sailormoon'
|
|
self.wikiq_out_name = self.wiki + ".tsv"
|
|
|
|
self.infile = "{0}.xml.7z".format(self.wiki)
|
|
self.base_call = "../wikiq {0} --stdout"
|
|
self.input_dir = "dumps"
|
|
self.input_file = os.path.join(".", self.input_dir,self.infile)
|
|
self.baseline_output_dir = "baseline_output"
|
|
|
|
def test_noargs(self):
|
|
|
|
call = self.base_call.format(self.input_file)
|
|
proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
|
|
outs = proc.stdout.decode('utf-8')
|
|
|
|
test_file = "noargs_" + self.wikiq_out_name
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
|
|
|
test_lines = outs.splitlines(True)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
def test_persistence(self):
|
|
|
|
call = self.base_call.format(self.input_file) + " --persistence"
|
|
proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
|
|
outs = proc.stdout.decode('utf-8')
|
|
|
|
test_file = "persistence_" + self.wikiq_out_name
|
|
baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
|
|
|
test_lines = outs.splitlines(True)
|
|
baseline_lines = open(baseline_file)
|
|
for test, baseline in zip(test_lines, baseline_lines):
|
|
self.assertEqual(test,baseline)
|
|
|
|
# test_file = "noargs_" + self.wikiq_out_name
|
|
# copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
|
|
|
|
# baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
|
|
|
|
# test_lines = open(os.path.join(self.test_output_dir,test_file))
|
|
# baseline_lines = open(baseline_file)
|
|
# for test, baseline in zip(test_lines, baseline_lines):
|
|
# self.assertEqual(test,baseline)
|
|
# test_lines.close()
|
|
# baseline_lines.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|