224 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			224 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| # Test related variables
 | |
| # By default, TEST_DIR is created under WORKDIR
 | |
| TEST_DIR ?= "${WORKDIR}/qemuimagetest"
 | |
| TEST_LOG ?= "${LOG_DIR}/qemuimagetests"
 | |
| TEST_RESULT ?= "${TEST_DIR}/result"
 | |
| TEST_TMP ?= "${TEST_DIR}/tmp"
 | |
| TEST_SCEN ?= "sanity"
 | |
| TEST_STATUS ?= "${TEST_TMP}/status"
 | |
| TARGET_IPSAVE ?= "${TEST_TMP}/target_ip"
 | |
| TEST_SERIALIZE ?= "1"
 | |
| 
 | |
| python do_qemuimagetest() {
 | |
|     qemuimagetest_main(d)
 | |
| }
 | |
| addtask qemuimagetest before do_build after do_rootfs
 | |
| do_qemuimagetest[nostamp] = "1"
 | |
| do_qemuimagetest[depends] += "qemu-native:do_populate_sysroot"
 | |
| 
 | |
| python do_qemuimagetest_standalone() {
 | |
|     qemuimagetest_main(d)
 | |
| }
 | |
| addtask qemuimagetest_standalone
 | |
| do_qemuimagetest_standalone[nostamp] = "1"
 | |
| do_qemuimagetest_standalone[depends] += "qemu-native:do_populate_sysroot"
 | |
| 
 | |
| def qemuimagetest_main(d):
 | |
|     import sys
 | |
|     import re
 | |
|     import os
 | |
|     import shutil
 | |
|     
 | |
|     """
 | |
|     Test Controller for automated testing.
 | |
|     """
 | |
|     
 | |
|     casestr = re.compile(r'(?P<scen>\w+\b):(?P<case>\S+$)')
 | |
|     resultstr = re.compile(r'\s*(?P<case>\w+)\s*(?P<pass>\d+)\s*(?P<fail>\d+)\s*(?P<noresult>\d+)')
 | |
|     machine = d.getVar('MACHINE', 1)
 | |
|     pname = d.getVar('PN', 1)
 | |
|     
 | |
|     """function to save test cases running status"""
 | |
|     def teststatus(test, status, index, length):
 | |
|         test_status = d.getVar('TEST_STATUS', 1)
 | |
|         if not os.path.exists(test_status):
 | |
|             raise bb.build.FuncFailed("No test status file existing under TEST_TMP")
 | |
| 
 | |
|         f = open(test_status, "w")
 | |
|         f.write("\t%-15s%-15s%-15s%-15s\n" % ("Case", "Status", "Number", "Total"))
 | |
|         f.write("\t%-15s%-15s%-15s%-15s\n" % (case, status, index, length))
 | |
|         f.close()
 | |
| 
 | |
|     """funtion to run each case under scenario"""
 | |
|     def runtest(scen, case, fulltestpath):
 | |
|         resultpath = d.getVar('TEST_RESULT', 1)
 | |
|         tmppath = d.getVar('TEST_TMP', 1)
 | |
| 
 | |
|         """initialize log file for testcase"""
 | |
|         logpath = d.getVar('TEST_LOG', 1)
 | |
|         bb.utils.mkdirhier("%s/%s" % (logpath, scen))
 | |
|         caselog = os.path.join(logpath, "%s/log_%s.%s" % (scen, case, d.getVar('DATETIME', 1)))
 | |
|         os.system("touch %s" % caselog)
 | |
|         
 | |
|         """export TEST_TMP, TEST_RESULT, DEPLOY_DIR and QEMUARCH"""
 | |
|         os.environ["PATH"] = d.getVar("PATH", True)
 | |
|         os.environ["TEST_TMP"] = tmppath
 | |
|         os.environ["TEST_RESULT"] = resultpath
 | |
|         os.environ["DEPLOY_DIR"] = d.getVar("DEPLOY_DIR", True)
 | |
|         os.environ["QEMUARCH"] = machine
 | |
|         os.environ["QEMUTARGET"] = pname
 | |
|         os.environ["DISPLAY"] = d.getVar("DISPLAY", True)
 | |
|         os.environ["COREBASE"] = d.getVar("COREBASE", True)
 | |
|         os.environ["TOPDIR"] = d.getVar("TOPDIR", True)
 | |
|         os.environ["OE_TMPDIR"] = d.getVar("TMPDIR", True)
 | |
|         os.environ["TEST_STATUS"] = d.getVar("TEST_STATUS", True)
 | |
|         os.environ["TARGET_IPSAVE"] = d.getVar("TARGET_IPSAVE", True)
 | |
|         os.environ["TEST_SERIALIZE"] = d.getVar("TEST_SERIALIZE", True)
 | |
|         os.environ["SDK_NAME"] = d.getVar("SDK_NAME", True)
 | |
| 
 | |
|         """run Test Case"""
 | |
|         bb.note("Run %s test in scenario %s" % (case, scen))
 | |
|         os.system("%s" % fulltestpath)
 | |
|     
 | |
|     """function to check testcase list and remove inappropriate cases"""
 | |
|     def check_list(list):
 | |
| 	final_list = []
 | |
| 	for test in list:
 | |
| 	    (scen, case, fullpath) = test
 | |
| 
 | |
| 	    """Skip rpm/zypper if package_rpm not set for PACKAGE_CLASSES"""
 | |
| 	    if case.find("zypper") != -1 or case.find("rpm") != -1:
 | |
| 	        if d.getVar("PACKAGE_CLASSES", True).find("rpm", 0, 11) == -1:
 | |
|             	    bb.note("skip rpm/zypper cases since package_rpm not set in PACKAGE_CLASSES")
 | |
| 	 	    continue
 | |
| 	        else:
 | |
| 		    final_list.append((scen, case, fullpath))
 | |
| 	    else:
 | |
|                     final_list.append((scen, case, fullpath))
 | |
| 
 | |
| 	if not final_list:
 | |
| 	    raise bb.build.FuncFailed("There is no suitable testcase for this target")
 | |
| 
 | |
| 	return final_list
 | |
| 
 | |
|     """Generate testcase list in runtime"""
 | |
|     def generate_list(testlist):
 | |
|         list = []
 | |
| 	final_list = []
 | |
|         if len(testlist) == 0:
 | |
|             raise bb.build.FuncFailed("No testcase defined in TEST_SCEN")
 | |
| 
 | |
|         """check testcase folder and add case list according to TEST_SCEN"""
 | |
|         for item in testlist.split(" "):
 | |
|             n = casestr.match(item)
 | |
|             if n:
 | |
|                 item = n.group('scen')
 | |
|                 casefile = n.group('case')
 | |
|                 for dir in d.getVar("QEMUIMAGETESTS", True).split():
 | |
|                     fulltestcase = os.path.join(dir, item, casefile)
 | |
|                     if not os.path.isfile(fulltestcase):
 | |
|                         raise bb.build.FuncFailed("Testcase %s not found" % fulltestcase)
 | |
|                     list.append((item, casefile, fulltestcase))
 | |
|             else:
 | |
|                 for dir in d.getVar("QEMUIMAGETESTS", True).split():
 | |
|                     scenlist = os.path.join(dir, "scenario", machine, pname)
 | |
|                     if not os.path.isfile(scenlist):
 | |
|                         raise bb.build.FuncFailed("No scenario list file named %s found" % scenlist)
 | |
| 
 | |
|                     f = open(scenlist, "r")
 | |
|                     for line in f:
 | |
|                        if item != line.split()[0]:
 | |
|                            continue
 | |
|                        else:
 | |
|                            casefile = line.split()[1]
 | |
| 
 | |
|                        fulltestcase = os.path.join(dir, item, casefile)
 | |
|                        if not os.path.isfile(fulltestcase):
 | |
|                             raise bb.build.FuncFailed("Testcase %s not found" % fulltestcase)
 | |
|                        list.append((item, casefile, fulltestcase))
 | |
| 	final_list = check_list(list)
 | |
|         return final_list
 | |
| 
 | |
|     """Clean tmp folder for testing"""
 | |
|     def clean_tmp():
 | |
|         tmppath = d.getVar('TEST_TMP', 1)
 | |
| 
 | |
|         if os.path.isdir(tmppath):
 | |
|             for f in os.listdir(tmppath):
 | |
|                 tmpfile = os.path.join(tmppath, f)
 | |
| 		if os.path.isfile(tmpfile):
 | |
|                     os.remove(tmpfile)
 | |
|                 elif os.path.isdir(tmpfile):
 | |
|                     shutil.rmtree(tmpfile, True)
 | |
| 
 | |
|     """Before running testing, clean temp folder first"""
 | |
|     clean_tmp()
 | |
| 
 | |
|     """check testcase folder and create test log folder"""
 | |
|     testpath = d.getVar('TEST_DIR', 1)
 | |
|     bb.utils.mkdirhier(testpath)
 | |
|     
 | |
|     logpath = d.getVar('TEST_LOG', 1)
 | |
|     bb.utils.mkdirhier(logpath)
 | |
| 
 | |
|     tmppath = d.getVar('TEST_TMP', 1)
 | |
|     bb.utils.mkdirhier(tmppath)
 | |
| 
 | |
|     """initialize test status file"""
 | |
|     test_status = d.getVar('TEST_STATUS', 1)
 | |
|     if os.path.exists(test_status):
 | |
|         os.remove(test_status)
 | |
|     os.system("touch %s" % test_status)
 | |
| 
 | |
|     """initialize result file"""
 | |
|     resultpath = d.getVar('TEST_RESULT', 1)
 | |
|     bb.utils.mkdirhier(resultpath)
 | |
|     resultfile = os.path.join(resultpath, "testresult.%s" % d.getVar('DATETIME', 1))
 | |
|     sresultfile = os.path.join(resultpath, "testresult.log")
 | |
| 
 | |
|     machine = d.getVar('MACHINE', 1)
 | |
| 
 | |
|     if os.path.exists(sresultfile):
 | |
|         os.remove(sresultfile)
 | |
|     os.system("touch %s" % resultfile)
 | |
|     os.symlink(resultfile, sresultfile)
 | |
|     f = open(sresultfile, "a")
 | |
|     f.write("\tTest Result for %s %s\n" % (machine, pname))
 | |
|     f.write("\t%-15s%-15s%-15s%-15s\n" % ("Testcase", "PASS", "FAIL", "NORESULT"))
 | |
|     f.close()
 | |
|     
 | |
|     """generate pre-defined testcase list"""
 | |
|     testlist = d.getVar('TEST_SCEN', 1)
 | |
|     fulllist = generate_list(testlist)
 | |
| 
 | |
|     """Begin testing"""
 | |
|     for index,test in enumerate(fulllist):
 | |
|         (scen, case, fullpath) = test
 | |
|         teststatus(case, "running", index, (len(fulllist) - 1))
 | |
|         runtest(scen, case, fullpath)
 | |
|         teststatus(case, "finished", index, (len(fulllist) - 1))
 | |
|     
 | |
|     """Print Test Result"""
 | |
|     ret = 0
 | |
|     f = open(sresultfile, "r")
 | |
|     for line in f:
 | |
|         m = resultstr.match(line)
 | |
|         if m:
 | |
|             if m.group('fail') == "1":
 | |
|                 ret = 1
 | |
|             elif m.group('noresult') == "1":
 | |
|                 ret = 2
 | |
|             line = line.strip('\n')
 | |
|             bb.note(line)
 | |
|         else:
 | |
|             line = line.strip('\n')
 | |
|             bb.note(line)
 | |
|     f.close()
 | |
| 
 | |
|     """Clean temp files for testing"""
 | |
|     clean_tmp()
 | |
| 
 | |
|     if ret != 0:
 | |
|         raise bb.build.FuncFailed("Some testcases fail, pls. check test result and test log!!!")
 | |
| 
 | 
