1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3from collections import defaultdict 4import sys 5import re 6import xml.etree.ElementTree as ET 7import xml.dom.minidom 8 9try: 10 # Python 2 11 unichr 12 PY2 = True 13except NameError: # pragma: nocover 14 # Python 3 15 unichr = chr 16 PY2 = False 17 18""" 19Based on the understanding of what Jenkins can parse for JUnit XML files. 20 21<?xml version="1.0" encoding="utf-8"?> 22<testsuites errors="1" failures="1" tests="4" time="45"> 23 <testsuite errors="1" failures="1" hostname="localhost" id="0" name="test1" 24 package="testdb" tests="4" timestamp="2012-11-15T01:02:29"> 25 <properties> 26 <property name="assert-passed" value="1"/> 27 </properties> 28 <testcase classname="testdb.directory" name="1-passed-test" time="10"/> 29 <testcase classname="testdb.directory" name="2-failed-test" time="20"> 30 <failure message="Assertion FAILED: failed assert" type="failure"> 31 the output of the testcase 32 </failure> 33 </testcase> 34 <testcase classname="package.directory" name="3-errord-test" time="15"> 35 <error message="Assertion ERROR: error assert" type="error"> 36 the output of the testcase 37 </error> 38 </testcase> 39 <testcase classname="package.directory" name="3-skipped-test" time="0"> 40 <skipped message="SKIPPED Test" type="skipped"> 41 the output of the testcase 42 </skipped> 43 </testcase> 44 <testcase classname="testdb.directory" name="3-passed-test" time="10"> 45 <system-out> 46 I am system output 47 </system-out> 48 <system-err> 49 I am the error output 50 </system-err> 51 </testcase> 52 </testsuite> 53</testsuites> 54""" 55 56 57def decode(var, encoding): 58 """ 59 If not already unicode, decode it. 60 """ 61 if PY2: 62 if isinstance(var, unicode): 63 ret = var 64 elif isinstance(var, str): 65 if encoding: 66 ret = var.decode(encoding) 67 else: 68 ret = unicode(var) 69 else: 70 ret = unicode(var) 71 else: 72 ret = str(var) 73 return ret 74 75 76class TestSuite(object): 77 """ 78 Suite of test cases. 79 Can handle unicode strings or binary strings if their encoding is provided. 80 """ 81 82 def __init__(self, name, test_cases=None, hostname=None, id=None, 83 package=None, timestamp=None, properties=None, file=None, 84 log=None, url=None, stdout=None, stderr=None): 85 self.name = name 86 if not test_cases: 87 test_cases = [] 88 try: 89 iter(test_cases) 90 except TypeError: 91 raise Exception('test_cases must be a list of test cases') 92 self.test_cases = test_cases 93 self.timestamp = timestamp 94 self.hostname = hostname 95 self.id = id 96 self.package = package 97 self.file = file 98 self.log = log 99 self.url = url 100 self.stdout = stdout 101 self.stderr = stderr 102 self.properties = properties 103 104 def build_xml_doc(self, encoding=None): 105 """ 106 Builds the XML document for the JUnit test suite. 107 Produces clean unicode strings and decodes non-unicode with the help of encoding. 108 @param encoding: Used to decode encoded strings. 109 @return: XML document with unicode string elements 110 """ 111 112 # build the test suite element 113 test_suite_attributes = dict() 114 test_suite_attributes['name'] = decode(self.name, encoding) 115 if any(c.assertions for c in self.test_cases): 116 test_suite_attributes['assertions'] = \ 117 str(sum([int(c.assertions) for c in self.test_cases if c.assertions])) 118 test_suite_attributes['disabled'] = \ 119 str(len([c for c in self.test_cases if not c.is_enabled])) 120 test_suite_attributes['failures'] = \ 121 str(len([c for c in self.test_cases if c.is_failure()])) 122 test_suite_attributes['errors'] = \ 123 str(len([c for c in self.test_cases if c.is_error()])) 124 test_suite_attributes['skipped'] = \ 125 str(len([c for c in self.test_cases if c.is_skipped()])) 126 test_suite_attributes['time'] = \ 127 str(sum(c.elapsed_sec for c in self.test_cases if c.elapsed_sec)) 128 test_suite_attributes['tests'] = str(len(self.test_cases)) 129 130 if self.hostname: 131 test_suite_attributes['hostname'] = decode(self.hostname, encoding) 132 if self.id: 133 test_suite_attributes['id'] = decode(self.id, encoding) 134 if self.package: 135 test_suite_attributes['package'] = decode(self.package, encoding) 136 if self.timestamp: 137 test_suite_attributes['timestamp'] = decode(self.timestamp, encoding) 138 if self.file: 139 test_suite_attributes['file'] = decode(self.file, encoding) 140 if self.log: 141 test_suite_attributes['log'] = decode(self.log, encoding) 142 if self.url: 143 test_suite_attributes['url'] = decode(self.url, encoding) 144 145 xml_element = ET.Element("testsuite", test_suite_attributes) 146 147 # add any properties 148 if self.properties: 149 props_element = ET.SubElement(xml_element, "properties") 150 for k, v in self.properties.items(): 151 attrs = {'name': decode(k, encoding), 'value': decode(v, encoding)} 152 ET.SubElement(props_element, "property", attrs) 153 154 # add test suite stdout 155 if self.stdout: 156 stdout_element = ET.SubElement(xml_element, "system-out") 157 stdout_element.text = decode(self.stdout, encoding) 158 159 # add test suite stderr 160 if self.stderr: 161 stderr_element = ET.SubElement(xml_element, "system-err") 162 stderr_element.text = decode(self.stderr, encoding) 163 164 # test cases 165 for case in self.test_cases: 166 test_case_attributes = dict() 167 test_case_attributes['name'] = decode(case.name, encoding) 168 if case.assertions: 169 # Number of assertions in the test case 170 test_case_attributes['assertions'] = "%d" % case.assertions 171 if case.elapsed_sec: 172 test_case_attributes['time'] = "%f" % case.elapsed_sec 173 if case.timestamp: 174 test_case_attributes['timestamp'] = decode(case.timestamp, encoding) 175 if case.classname: 176 test_case_attributes['classname'] = decode(case.classname, encoding) 177 if case.status: 178 test_case_attributes['status'] = decode(case.status, encoding) 179 if case.category: 180 test_case_attributes['class'] = decode(case.category, encoding) 181 if case.file: 182 test_case_attributes['file'] = decode(case.file, encoding) 183 if case.line: 184 test_case_attributes['line'] = decode(case.line, encoding) 185 if case.log: 186 test_case_attributes['log'] = decode(case.log, encoding) 187 if case.url: 188 test_case_attributes['url'] = decode(case.url, encoding) 189 190 test_case_element = ET.SubElement( 191 xml_element, "testcase", test_case_attributes) 192 193 # failures 194 if case.is_failure(): 195 attrs = {'type': 'failure'} 196 if case.failure_message: 197 attrs['message'] = decode(case.failure_message, encoding) 198 if case.failure_type: 199 attrs['type'] = decode(case.failure_type, encoding) 200 failure_element = ET.Element("failure", attrs) 201 if case.failure_output: 202 failure_element.text = decode(case.failure_output, encoding) 203 test_case_element.append(failure_element) 204 205 # errors 206 if case.is_error(): 207 attrs = {'type': 'error'} 208 if case.error_message: 209 attrs['message'] = decode(case.error_message, encoding) 210 if case.error_type: 211 attrs['type'] = decode(case.error_type, encoding) 212 error_element = ET.Element("error", attrs) 213 if case.error_output: 214 error_element.text = decode(case.error_output, encoding) 215 test_case_element.append(error_element) 216 217 # skippeds 218 if case.is_skipped(): 219 attrs = {'type': 'skipped'} 220 if case.skipped_message: 221 attrs['message'] = decode(case.skipped_message, encoding) 222 skipped_element = ET.Element("skipped", attrs) 223 if case.skipped_output: 224 skipped_element.text = decode(case.skipped_output, encoding) 225 test_case_element.append(skipped_element) 226 227 # test stdout 228 if case.stdout: 229 stdout_element = ET.Element("system-out") 230 stdout_element.text = decode(case.stdout, encoding) 231 test_case_element.append(stdout_element) 232 233 # test stderr 234 if case.stderr: 235 stderr_element = ET.Element("system-err") 236 stderr_element.text = decode(case.stderr, encoding) 237 test_case_element.append(stderr_element) 238 239 return xml_element 240 241 @staticmethod 242 def to_xml_string(test_suites, prettyprint=True, encoding=None): 243 """ 244 Returns the string representation of the JUnit XML document. 245 @param encoding: The encoding of the input. 246 @return: unicode string 247 """ 248 249 try: 250 iter(test_suites) 251 except TypeError: 252 raise Exception('test_suites must be a list of test suites') 253 254 xml_element = ET.Element("testsuites") 255 attributes = defaultdict(int) 256 for ts in test_suites: 257 ts_xml = ts.build_xml_doc(encoding=encoding) 258 for key in ['failures', 'errors', 'tests', 'disabled']: 259 attributes[key] += int(ts_xml.get(key, 0)) 260 for key in ['time']: 261 attributes[key] += float(ts_xml.get(key, 0)) 262 xml_element.append(ts_xml) 263 for key, value in attributes.items(): 264 xml_element.set(key, str(value)) 265 266 xml_string = ET.tostring(xml_element, encoding=encoding) 267 # is encoded now 268 xml_string = TestSuite._clean_illegal_xml_chars( 269 xml_string.decode(encoding or 'utf-8')) 270 # is unicode now 271 272 if prettyprint: 273 # minidom.parseString() works just on correctly encoded binary strings 274 xml_string = xml_string.encode(encoding or 'utf-8') 275 xml_string = xml.dom.minidom.parseString(xml_string) 276 # toprettyxml() produces unicode if no encoding is being passed or binary string with an encoding 277 xml_string = xml_string.toprettyxml(encoding=encoding) 278 if encoding: 279 xml_string = xml_string.decode(encoding) 280 # is unicode now 281 return xml_string 282 283 @staticmethod 284 def to_file(file_descriptor, test_suites, prettyprint=True, encoding=None): 285 """ 286 Writes the JUnit XML document to a file. 287 """ 288 xml_string = TestSuite.to_xml_string( 289 test_suites, prettyprint=prettyprint, encoding=encoding) 290 # has problems with encoded str with non-ASCII (non-default-encoding) characters! 291 file_descriptor.write(xml_string) 292 293 @staticmethod 294 def _clean_illegal_xml_chars(string_to_clean): 295 """ 296 Removes any illegal unicode characters from the given XML string. 297 298 @see: http://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python 299 """ 300 301 illegal_unichrs = [ 302 (0x00, 0x08), (0x0B, 0x1F), (0x7F, 0x84), (0x86, 0x9F), 303 (0xD800, 0xDFFF), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF), 304 (0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF), 305 (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), 306 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF), 307 (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), 308 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF), 309 (0x10FFFE, 0x10FFFF)] 310 311 illegal_ranges = ["%s-%s" % (unichr(low), unichr(high)) 312 for (low, high) in illegal_unichrs 313 if low < sys.maxunicode] 314 315 illegal_xml_re = re.compile(u'[%s]' % u''.join(illegal_ranges)) 316 return illegal_xml_re.sub('', string_to_clean) 317 318 319class TestCase(object): 320 """A JUnit test case with a result and possibly some stdout or stderr""" 321 322 def __init__(self, name, classname=None, elapsed_sec=None, stdout=None, 323 stderr=None, assertions=None, timestamp=None, status=None, 324 category=None, file=None, line=None, log=None, group=None, 325 url=None): 326 self.name = name 327 self.assertions = assertions 328 self.elapsed_sec = elapsed_sec 329 self.timestamp = timestamp 330 self.classname = classname 331 self.status = status 332 self.category = category 333 self.file = file 334 self.line = line 335 self.log = log 336 self.url = url 337 self.stdout = stdout 338 self.stderr = stderr 339 340 self.is_enabled = True 341 self.error_message = None 342 self.error_output = None 343 self.error_type = None 344 self.failure_message = None 345 self.failure_output = None 346 self.failure_type = None 347 self.skipped_message = None 348 self.skipped_output = None 349 350 def add_error_info(self, message=None, output=None, error_type=None): 351 """Adds an error message, output, or both to the test case""" 352 if message: 353 self.error_message = message 354 if output: 355 self.error_output = output 356 if error_type: 357 self.error_type = error_type 358 359 def add_failure_info(self, message=None, output=None, failure_type=None): 360 """Adds a failure message, output, or both to the test case""" 361 if message: 362 self.failure_message = message 363 if output: 364 self.failure_output = output 365 if failure_type: 366 self.failure_type = failure_type 367 368 def add_skipped_info(self, message=None, output=None): 369 """Adds a skipped message, output, or both to the test case""" 370 if message: 371 self.skipped_message = message 372 if output: 373 self.skipped_output = output 374 375 def is_failure(self): 376 """returns true if this test case is a failure""" 377 return self.failure_output or self.failure_message 378 379 def is_error(self): 380 """returns true if this test case is an error""" 381 return self.error_output or self.error_message 382 383 def is_skipped(self): 384 """returns true if this test case has been skipped""" 385 return self.skipped_output or self.skipped_message 386