1#!/usr/bin/env python 2# -*- coding: UTF-8 -*- 3import warnings 4from collections import defaultdict 5import sys 6import re 7import xml.etree.ElementTree as ET 8import xml.dom.minidom 9 10IS_PY2 = sys.version_info[0] == 2 11 12try: 13 # Python 2 14 unichr 15except NameError: # pragma: nocover 16 # Python 3 17 unichr = chr 18 19""" 20Based on the understanding of what Jenkins can parse for JUnit XML files. 21 22<?xml version="1.0" encoding="utf-8"?> 23<testsuites errors="1" failures="1" tests="4" time="45"> 24 <testsuite errors="1" failures="1" hostname="localhost" id="0" name="test1" 25 package="testdb" tests="4" timestamp="2012-11-15T01:02:29"> 26 <properties> 27 <property name="assert-passed" value="1"/> 28 </properties> 29 <testcase classname="testdb.directory" name="1-passed-test" time="10"/> 30 <testcase classname="testdb.directory" name="2-failed-test" time="20"> 31 <failure message="Assertion FAILED: failed assert" type="failure"> 32 the output of the testcase 33 </failure> 34 </testcase> 35 <testcase classname="package.directory" name="3-errord-test" time="15"> 36 <error message="Assertion ERROR: error assert" type="error"> 37 the output of the testcase 38 </error> 39 </testcase> 40 <testcase classname="package.directory" name="3-skipped-test" time="0"> 41 <skipped message="SKIPPED Test" type="skipped"> 42 the output of the testcase 43 </skipped> 44 </testcase> 45 <testcase classname="testdb.directory" name="3-passed-test" time="10"> 46 <system-out> 47 I am system output 48 </system-out> 49 <system-err> 50 I am the error output 51 </system-err> 52 </testcase> 53 </testsuite> 54</testsuites> 55""" 56 57 58def decode(var, encoding): 59 """ 60 If not already unicode, decode it. 61 """ 62 if IS_PY2: 63 if isinstance(var, unicode): # noqa: F821 64 ret = var 65 elif isinstance(var, str): 66 if encoding: 67 ret = var.decode(encoding) 68 else: 69 ret = unicode(var) # noqa: F821 70 else: 71 ret = unicode(var) # noqa: F821 72 else: 73 ret = str(var) 74 return ret 75 76 77class TestSuite(object): 78 """ 79 Suite of test cases. 80 Can handle unicode strings or binary strings if their encoding is provided. 81 """ 82 83 def __init__( 84 self, 85 name, 86 test_cases=None, 87 hostname=None, 88 id=None, 89 package=None, 90 timestamp=None, 91 properties=None, 92 file=None, 93 log=None, 94 url=None, 95 stdout=None, 96 stderr=None, 97 ): 98 self.name = name 99 if not test_cases: 100 test_cases = [] 101 try: 102 iter(test_cases) 103 except TypeError: 104 raise TypeError("test_cases must be a list of test cases") 105 self.test_cases = test_cases 106 self.timestamp = timestamp 107 self.hostname = hostname 108 self.id = id 109 self.package = package 110 self.file = file 111 self.log = log 112 self.url = url 113 self.stdout = stdout 114 self.stderr = stderr 115 self.properties = properties 116 117 def build_xml_doc(self, encoding=None): 118 """ 119 Builds the XML document for the JUnit test suite. 120 Produces clean unicode strings and decodes non-unicode with the help of encoding. 121 @param encoding: Used to decode encoded strings. 122 @return: XML document with unicode string elements 123 """ 124 125 # build the test suite element 126 test_suite_attributes = dict() 127 if any(c.assertions for c in self.test_cases): 128 test_suite_attributes["assertions"] = str(sum([int(c.assertions) for c in self.test_cases if c.assertions])) 129 test_suite_attributes["disabled"] = str(len([c for c in self.test_cases if not c.is_enabled])) 130 test_suite_attributes["errors"] = str(len([c for c in self.test_cases if c.is_error()])) 131 test_suite_attributes["failures"] = str(len([c for c in self.test_cases if c.is_failure()])) 132 test_suite_attributes["name"] = decode(self.name, encoding) 133 test_suite_attributes["skipped"] = str(len([c for c in self.test_cases if c.is_skipped()])) 134 test_suite_attributes["tests"] = str(len(self.test_cases)) 135 test_suite_attributes["time"] = str(sum(c.elapsed_sec for c in self.test_cases if c.elapsed_sec)) 136 137 if self.hostname: 138 test_suite_attributes["hostname"] = decode(self.hostname, encoding) 139 if self.id: 140 test_suite_attributes["id"] = decode(self.id, encoding) 141 if self.package: 142 test_suite_attributes["package"] = decode(self.package, encoding) 143 if self.timestamp: 144 test_suite_attributes["timestamp"] = decode(self.timestamp, encoding) 145 if self.file: 146 test_suite_attributes["file"] = decode(self.file, encoding) 147 if self.log: 148 test_suite_attributes["log"] = decode(self.log, encoding) 149 if self.url: 150 test_suite_attributes["url"] = decode(self.url, encoding) 151 152 xml_element = ET.Element("testsuite", test_suite_attributes) 153 154 # add any properties 155 if self.properties: 156 props_element = ET.SubElement(xml_element, "properties") 157 for k, v in self.properties.items(): 158 attrs = {"name": decode(k, encoding), "value": decode(v, encoding)} 159 ET.SubElement(props_element, "property", attrs) 160 161 # add test suite stdout 162 if self.stdout: 163 stdout_element = ET.SubElement(xml_element, "system-out") 164 stdout_element.text = decode(self.stdout, encoding) 165 166 # add test suite stderr 167 if self.stderr: 168 stderr_element = ET.SubElement(xml_element, "system-err") 169 stderr_element.text = decode(self.stderr, encoding) 170 171 # test cases 172 for case in self.test_cases: 173 test_case_attributes = dict() 174 test_case_attributes["name"] = decode(case.name, encoding) 175 if case.assertions: 176 # Number of assertions in the test case 177 test_case_attributes["assertions"] = "%d" % case.assertions 178 if case.elapsed_sec: 179 test_case_attributes["time"] = "%f" % case.elapsed_sec 180 if case.timestamp: 181 test_case_attributes["timestamp"] = decode(case.timestamp, encoding) 182 if case.classname: 183 test_case_attributes["classname"] = decode(case.classname, encoding) 184 if case.status: 185 test_case_attributes["status"] = decode(case.status, encoding) 186 if case.category: 187 test_case_attributes["class"] = decode(case.category, encoding) 188 if case.file: 189 test_case_attributes["file"] = decode(case.file, encoding) 190 if case.line: 191 test_case_attributes["line"] = decode(case.line, encoding) 192 if case.log: 193 test_case_attributes["log"] = decode(case.log, encoding) 194 if case.url: 195 test_case_attributes["url"] = decode(case.url, encoding) 196 197 test_case_element = ET.SubElement(xml_element, "testcase", test_case_attributes) 198 199 # failures 200 for failure in case.failures: 201 if failure["output"] or failure["message"]: 202 attrs = {"type": "failure"} 203 if failure["message"]: 204 attrs["message"] = decode(failure["message"], encoding) 205 if failure["type"]: 206 attrs["type"] = decode(failure["type"], encoding) 207 failure_element = ET.Element("failure", attrs) 208 if failure["output"]: 209 failure_element.text = decode(failure["output"], encoding) 210 test_case_element.append(failure_element) 211 212 # errors 213 for error in case.errors: 214 if error["message"] or error["output"]: 215 attrs = {"type": "error"} 216 if error["message"]: 217 attrs["message"] = decode(error["message"], encoding) 218 if error["type"]: 219 attrs["type"] = decode(error["type"], encoding) 220 error_element = ET.Element("error", attrs) 221 if error["output"]: 222 error_element.text = decode(error["output"], encoding) 223 test_case_element.append(error_element) 224 225 # skippeds 226 for skipped in case.skipped: 227 attrs = {"type": "skipped"} 228 if skipped["message"]: 229 attrs["message"] = decode(skipped["message"], encoding) 230 skipped_element = ET.Element("skipped", attrs) 231 if skipped["output"]: 232 skipped_element.text = decode(skipped["output"], encoding) 233 test_case_element.append(skipped_element) 234 235 # test stdout 236 if case.stdout: 237 stdout_element = ET.Element("system-out") 238 stdout_element.text = decode(case.stdout, encoding) 239 test_case_element.append(stdout_element) 240 241 # test stderr 242 if case.stderr: 243 stderr_element = ET.Element("system-err") 244 stderr_element.text = decode(case.stderr, encoding) 245 test_case_element.append(stderr_element) 246 247 return xml_element 248 249 @staticmethod 250 def to_xml_string(test_suites, prettyprint=True, encoding=None): 251 """ 252 Returns the string representation of the JUnit XML document. 253 @param encoding: The encoding of the input. 254 @return: unicode string 255 """ 256 warnings.warn( 257 "Testsuite.to_xml_string is deprecated. It will be removed in version 2.0.0. " 258 "Use function to_xml_report_string", 259 DeprecationWarning, 260 ) 261 return to_xml_report_string(test_suites, prettyprint, encoding) 262 263 @staticmethod 264 def to_file(file_descriptor, test_suites, prettyprint=True, encoding=None): 265 """ 266 Writes the JUnit XML document to a file. 267 """ 268 warnings.warn( 269 "Testsuite.to_file is deprecated. It will be removed in version 2.0.0. Use function to_xml_report_file", 270 DeprecationWarning, 271 ) 272 to_xml_report_file(file_descriptor, test_suites, prettyprint, encoding) 273 274 275def to_xml_report_string(test_suites, prettyprint=True, encoding=None): 276 """ 277 Returns the string representation of the JUnit XML document. 278 @param encoding: The encoding of the input. 279 @return: unicode string 280 """ 281 282 try: 283 iter(test_suites) 284 except TypeError: 285 raise TypeError("test_suites must be a list of test suites") 286 287 xml_element = ET.Element("testsuites") 288 attributes = defaultdict(int) 289 for ts in test_suites: 290 ts_xml = ts.build_xml_doc(encoding=encoding) 291 for key in ["disabled", "errors", "failures", "tests"]: 292 attributes[key] += int(ts_xml.get(key, 0)) 293 for key in ["time"]: 294 attributes[key] += float(ts_xml.get(key, 0)) 295 xml_element.append(ts_xml) 296 for key, value in attributes.items(): 297 xml_element.set(key, str(value)) 298 299 xml_string = ET.tostring(xml_element, encoding=encoding) 300 # is encoded now 301 xml_string = _clean_illegal_xml_chars(xml_string.decode(encoding or "utf-8")) 302 # is unicode now 303 304 if prettyprint: 305 # minidom.parseString() works just on correctly encoded binary strings 306 xml_string = xml_string.encode(encoding or "utf-8") 307 xml_string = xml.dom.minidom.parseString(xml_string) 308 # toprettyxml() produces unicode if no encoding is being passed or binary string with an encoding 309 xml_string = xml_string.toprettyxml(encoding=encoding) 310 if encoding: 311 xml_string = xml_string.decode(encoding) 312 # is unicode now 313 return xml_string 314 315 316def to_xml_report_file(file_descriptor, test_suites, prettyprint=True, encoding=None): 317 """ 318 Writes the JUnit XML document to a file. 319 """ 320 xml_string = to_xml_report_string(test_suites, prettyprint=prettyprint, encoding=encoding) 321 # has problems with encoded str with non-ASCII (non-default-encoding) characters! 322 file_descriptor.write(xml_string) 323 324 325def _clean_illegal_xml_chars(string_to_clean): 326 """ 327 Removes any illegal unicode characters from the given XML string. 328 329 @see: http://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python 330 """ 331 332 illegal_unichrs = [ 333 (0x00, 0x08), 334 (0x0B, 0x1F), 335 (0x7F, 0x84), 336 (0x86, 0x9F), 337 (0xD800, 0xDFFF), 338 (0xFDD0, 0xFDDF), 339 (0xFFFE, 0xFFFF), 340 (0x1FFFE, 0x1FFFF), 341 (0x2FFFE, 0x2FFFF), 342 (0x3FFFE, 0x3FFFF), 343 (0x4FFFE, 0x4FFFF), 344 (0x5FFFE, 0x5FFFF), 345 (0x6FFFE, 0x6FFFF), 346 (0x7FFFE, 0x7FFFF), 347 (0x8FFFE, 0x8FFFF), 348 (0x9FFFE, 0x9FFFF), 349 (0xAFFFE, 0xAFFFF), 350 (0xBFFFE, 0xBFFFF), 351 (0xCFFFE, 0xCFFFF), 352 (0xDFFFE, 0xDFFFF), 353 (0xEFFFE, 0xEFFFF), 354 (0xFFFFE, 0xFFFFF), 355 (0x10FFFE, 0x10FFFF), 356 ] 357 358 illegal_ranges = ["%s-%s" % (unichr(low), unichr(high)) for (low, high) in illegal_unichrs if low < sys.maxunicode] 359 360 illegal_xml_re = re.compile('[%s]' % ''.join(illegal_ranges)) 361 return illegal_xml_re.sub("", string_to_clean) 362 363 364class TestCase(object): 365 """A JUnit test case with a result and possibly some stdout or stderr""" 366 367 def __init__( 368 self, 369 name, 370 classname=None, 371 elapsed_sec=None, 372 stdout=None, 373 stderr=None, 374 assertions=None, 375 timestamp=None, 376 status=None, 377 category=None, 378 file=None, 379 line=None, 380 log=None, 381 url=None, 382 allow_multiple_subelements=False, 383 ): 384 self.name = name 385 self.assertions = assertions 386 self.elapsed_sec = elapsed_sec 387 self.timestamp = timestamp 388 self.classname = classname 389 self.status = status 390 self.category = category 391 self.file = file 392 self.line = line 393 self.log = log 394 self.url = url 395 self.stdout = stdout 396 self.stderr = stderr 397 398 self.is_enabled = True 399 self.errors = [] 400 self.failures = [] 401 self.skipped = [] 402 self.allow_multiple_subalements = allow_multiple_subelements 403 404 def add_error_info(self, message=None, output=None, error_type=None): 405 """Adds an error message, output, or both to the test case""" 406 error = {} 407 error["message"] = message 408 error["output"] = output 409 error["type"] = error_type 410 if self.allow_multiple_subalements: 411 if message or output: 412 self.errors.append(error) 413 elif not len(self.errors): 414 self.errors.append(error) 415 else: 416 if message: 417 self.errors[0]["message"] = message 418 if output: 419 self.errors[0]["output"] = output 420 if error_type: 421 self.errors[0]["type"] = error_type 422 423 def add_failure_info(self, message=None, output=None, failure_type=None): 424 """Adds a failure message, output, or both to the test case""" 425 failure = {} 426 failure["message"] = message 427 failure["output"] = output 428 failure["type"] = failure_type 429 if self.allow_multiple_subalements: 430 if message or output: 431 self.failures.append(failure) 432 elif not len(self.failures): 433 self.failures.append(failure) 434 else: 435 if message: 436 self.failures[0]["message"] = message 437 if output: 438 self.failures[0]["output"] = output 439 if failure_type: 440 self.failures[0]["type"] = failure_type 441 442 def add_skipped_info(self, message=None, output=None): 443 """Adds a skipped message, output, or both to the test case""" 444 skipped = {} 445 skipped["message"] = message 446 skipped["output"] = output 447 if self.allow_multiple_subalements: 448 if message or output: 449 self.skipped.append(skipped) 450 elif not len(self.skipped): 451 self.skipped.append(skipped) 452 else: 453 if message: 454 self.skipped[0]["message"] = message 455 if output: 456 self.skipped[0]["output"] = output 457 458 def is_failure(self): 459 """returns true if this test case is a failure""" 460 return sum(1 for f in self.failures if f["message"] or f["output"]) > 0 461 462 def is_error(self): 463 """returns true if this test case is an error""" 464 return sum(1 for e in self.errors if e["message"] or e["output"]) > 0 465 466 def is_skipped(self): 467 """returns true if this test case has been skipped""" 468 return len(self.skipped) > 0 469