1#!/usr/bin/env python3 2""" 3# Created: Mon Jun 20 17:59:46 2022 (-0400) 4# @author: Jacob Faibussowitsch 5""" 6from __future__ import annotations 7 8import os 9import abc 10import enum 11import queue 12import multiprocessing as mp 13import multiprocessing.synchronize 14import petsclinter as pl 15 16from .._typing import * 17 18from ..util._timeout import timeout, TimeoutError 19 20from ._linter import Linter 21from ._path import Path 22 23_T = TypeVar('_T') 24 25# directory names to exclude from processing, case sensitive 26exclude_dir_names = { 27 'tests', 'tutorials', 'output', 'input', 'python', 'benchmarks', 'docs', 'binding', 'contrib', 28 'ftn-mod', 'ftn-src', 'ftn-custom', 'ftn-kernels', 29 'perfstubs', 'yaml' 30} 31# directory suffixes to exclude from processing, case sensitive 32exclude_dir_suffixes = ('.dSYM', '.DS_Store') 33# file extensions to process, case sensitve 34allow_file_extensions = ('.c', '.cpp', '.cxx', '.cu', '.cc', '.h', '.hpp', '.inc') 35 36class WorkerPoolBase(abc.ABC): 37 __slots__ = ('verbose', 'warnings', 'errors_left', 'errors_fixed', 'patches') 38 39 verbose: int 40 warnings: list[CondensedDiags] 41 errors_left: list[CondensedDiags] 42 errors_fixed: list[CondensedDiags] 43 patches: list[PathDiffPair] 44 45 class QueueSignal(enum.IntEnum): 46 """ 47 Various signals to indicate return type on the data queue from child processes 48 """ 49 FILE_PATH = enum.auto() 50 EXIT_QUEUE = enum.auto() 51 52 def __init__(self, verbose: int) -> None: 53 r"""Construct a `WoekerPoolBase` 54 55 Parameters 56 ---------- 57 verbose : 58 whether to print verbose logging output (at level) 59 """ 60 super().__init__() 61 self.verbose = verbose 62 self.warnings = [] 63 self.errors_left = [] 64 self.errors_fixed = [] 65 self.patches = [] 66 return 67 68 def _vprint(self: PoolImpl, *args, **kwargs) -> None: 69 r"""Prints output, but only in verbose mode""" 70 if self.verbose: 71 pl.sync_print(*args, **kwargs) 72 return 73 74 @abc.abstractmethod 75 def _setup(self: PoolImpl, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None: 76 return 77 78 @abc.abstractmethod 79 def _consume_results(self) -> None: 80 return 81 82 @abc.abstractmethod 83 def _finalize(self) -> None: 84 return 85 86 @abc.abstractmethod 87 def put(self: PoolImpl, item: PathLike) -> None: 88 raise NotImplementedError 89 90 def setup(self: PoolImpl, compiler_flags: list[str], clang_lib: Optional[PathLike] = None, clang_options: Optional[CXTranslationUnit] = None, clang_compat_check: bool = True, werror: bool = False) -> PoolImpl: 91 r"""Set up a `WorkerPool` instance 92 93 Parameters 94 ---------- 95 compiler_flags : 96 the list of compiler flags to pass to the `Linter` 97 clang_lib : optional 98 the path to libclang 99 clang_options: optional 100 the options to pass to the `Linter`, defaults to `petsclinter.util.base_clang_options` 101 clang_compat_check: optional 102 whether to do compatibility checks (if this initializes libclang) 103 werror: 104 whether to treat warnings as errors 105 106 Returns 107 ------- 108 self: 109 the `WorkerPool` instance 110 """ 111 if clang_lib is None: 112 import clang.cindex as clx # type: ignore[import] 113 assert clx.conf.loaded, 'Must initialize libClang first' 114 clang_lib = clx.conf.get_filename() 115 116 if clang_options is None: 117 clang_options = pl.util.base_clang_options 118 119 self._setup(compiler_flags, clang_lib, clang_options, clang_compat_check, werror) 120 return self 121 122 def walk(self: PoolImpl, src_path_list: Sequence[PathLike], exclude_dirs: Optional[Collection[str]] = None, exclude_dir_suff: Optional[tuple[str, ...]] = None, allow_file_suff: Optional[tuple[str, ...]] = None) -> PoolImpl: 123 r"""Walk `src_path_list` and process it 124 125 Parameters 126 ---------- 127 src_path_list : 128 a list of paths to process 129 exclude_dirs : optional 130 a list or set to exclude from processing 131 exclude_dir_suff : optional 132 a set of suffixes to ignore 133 allow_file_suff : optional 134 a list of suffixes to explicitly allow 135 136 Returns 137 ------- 138 self : 139 the `WorkerPool` instance 140 """ 141 if exclude_dirs is None: 142 exclude_dirs = exclude_dir_names 143 if exclude_dir_suff is None: 144 exclude_dir_suff = exclude_dir_suffixes 145 if allow_file_suff is None: 146 allow_file_suff = allow_file_extensions 147 148 for src_path in src_path_list: 149 if src_path.is_file(): 150 self.put(src_path) 151 continue 152 153 _, dirs, _ = next(os.walk(src_path)) 154 dir_gen = (d for d in dirs if d not in exclude_dirs) 155 initial_dirs = {str(src_path / d) for d in dir_gen if not d.endswith(exclude_dir_suff)} 156 for root, dirs, files in os.walk(src_path): 157 self._vprint('Processing directory', root) 158 dirs[:] = [d for d in dirs if d not in exclude_dirs] 159 dirs[:] = [d for d in dirs if not d.endswith(exclude_dir_suff)] 160 for filename in (os.path.join(root, f) for f in files if f.endswith(allow_file_suff)): 161 self.put(Path(filename)) 162 # Every time we reach another top-level node we consume some of the results. This 163 # makes the eventual consume-until-empty loop much faster since the queue is not 164 # as backed up 165 if root in initial_dirs: 166 self._consume_results() 167 return self 168 169 def finalize(self: PoolImpl) -> tuple[list[CondensedDiags], list[CondensedDiags], list[CondensedDiags], list[PathDiffPair]]: 170 r"""Finalize the queue and return the results 171 172 Returns 173 ------- 174 warnings : 175 the list of warnings 176 errors_left : 177 the remaining (unfixed) errors 178 errors_fixed : 179 the fixed errors 180 patches : 181 the generated patches 182 183 Notes 184 ----- 185 If running in parallel, and workers fail to finalize in time, calls `self.__crash_and_burn()` 186 """ 187 def prune(container: list[_T]) -> list[_T]: 188 return [item for item in container if item] 189 190 self._finalize() 191 warnings = prune(self.warnings) 192 errors_left = prune(self.errors_left) 193 errors_fixed = prune(self.errors_fixed) 194 patches = prune(self.patches) 195 return warnings, errors_left, errors_fixed, patches 196 197class ParallelPool(WorkerPoolBase): 198 __slots__ = ('input_queue', 'error_queue', 'return_queue', 'lock', 'workers', 'num_workers') 199 200 class SendPacket(NamedTuple): 201 type: WorkerPoolBase.QueueSignal 202 data: Optional[PathLike] 203 204 class ReturnPacket(NamedTuple): 205 patches: List[PathDiffPair] 206 errors_left: CondensedDiags 207 errors_fixed: CondensedDiags 208 warnings: CondensedDiags 209 210 # use Union to get around not having TypeAlias until 3.10 211 CommandQueueType: TypeAlias = 'mp.JoinableQueue[SendPacket]' 212 ErrorQueueType: TypeAlias = 'mp.Queue[str]' 213 ReturnQueueType: TypeAlias = 'mp.Queue[ReturnPacket]' 214 LockType: TypeAlias = mp.synchronize.Lock 215 input_queue: CommandQueueType 216 error_queue: ErrorQueueType 217 return_queue: ReturnQueueType 218 lock: LockType 219 workers: list[mp.Process] 220 num_workers: int 221 222 def __init__(self, num_workers: int, verbose: int) -> None: 223 r"""Construct a `ParallelPool` 224 225 Parameters 226 ---------- 227 num_workers : 228 how many worker processes to spawn 229 verbose : 230 whether to print verbose output 231 """ 232 super().__init__(verbose) 233 self.input_queue = mp.JoinableQueue() 234 self.error_queue = mp.Queue() 235 self.return_queue = mp.Queue() 236 lock = mp.Lock() 237 self.lock = lock 238 self.workers = [] 239 self.num_workers = num_workers 240 241 old_sync_print = pl.sync_print 242 def lock_sync_print(*args, **kwargs) -> None: 243 with lock: 244 old_sync_print(*args, **kwargs) 245 return 246 pl.sync_print = lock_sync_print 247 return 248 249 @timeout(seconds=10) 250 def __crash_and_burn(self, message: str) -> NoReturn: 251 r"""Forcefully annihilate the pool and crash the program 252 253 Parameters 254 ---------- 255 message : 256 an informative message to print on crashing 257 258 Raises 259 ------ 260 RuntimeError : 261 raises a RuntimeError in all cases 262 """ 263 for worker in self.workers: 264 if worker is not None: 265 try: 266 worker.terminate() 267 except: 268 pass 269 raise RuntimeError(message) 270 271 def _consume_results(self) -> None: 272 r"""Consume pending results from the queue 273 274 Raises 275 ------ 276 ValueError : 277 if an unknown QueueSignal is returned from the pipe 278 """ 279 self.check() 280 return_q = self.return_queue 281 try: 282 qsize_mess = str(return_q.qsize()) 283 except NotImplementedError: 284 # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.qsize 285 # 286 # Note that this may raise NotImplementedError on Unix platforms like macOS where 287 # sem_getvalue() is not implemented. 288 qsize_mess = '0' if return_q.empty() else 'unknown (not implemented on platform)' 289 self._vprint('Estimated number of results:', qsize_mess) 290 291 while not return_q.empty(): 292 try: 293 packet = return_q.get(timeout=1) 294 except queue.Empty: 295 # this should never really happen since this thread is the only consumer of the 296 # queue, but it's here just in case 297 break 298 if not isinstance(packet, self.ReturnPacket): 299 try: # type: ignore[unreachable] 300 self.__crash_and_burn('') 301 except RuntimeError: 302 pass 303 raise ValueError(type(packet)) 304 self.errors_left.append(packet.errors_left) 305 self.errors_fixed.append(packet.errors_fixed) 306 self.patches.extend(packet.patches) 307 self.warnings.append(packet.warnings) 308 return 309 310 def _setup(self, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None: 311 r"""Setup a `ParallelPool` 312 313 Parameters 314 ---------- 315 compiler_flags : 316 the list of general compiler flags to parse the files with 317 clang_lib : 318 the path to libclang 319 clang_options : 320 the translation unit options to parse the files with 321 clang_compat_check : 322 whether to do compatibility checks for libclang 323 werror : 324 whether to consider warnings as errors 325 326 Notes 327 ----- 328 This routine is what actually spawns the processes 329 """ 330 from ..queue_main import queue_main 331 from ..checks._register import check_function_map, classid_map 332 from ._diag import DiagnosticManager 333 334 # in case we are double-calling this 335 self._flush_workers() 336 assert len(self.workers) == 0 337 for i in range(self.num_workers): 338 worker = mp.Process( 339 target=queue_main, 340 args=( 341 clang_lib, clang_compat_check, check_function_map, classid_map, DiagnosticManager, 342 compiler_flags, clang_options, self.verbose, werror, self.error_queue, self.return_queue, 343 self.input_queue, self.lock 344 ), 345 name=f'[{i}]' 346 ) 347 worker.start() 348 self.workers.append(worker) 349 return 350 351 def _flush_workers(self) -> None: 352 r"""Reap any existing workers 353 354 Notes 355 ----- 356 Tries to gracefully terminate any existing workers, but will crash if it is not able to. Does 357 nothing if no workers active. 358 """ 359 import time 360 361 @timeout(seconds=3) 362 def timeout_join() -> None: 363 self.input_queue.join() 364 return 365 366 # join here to colocate error messages if needs be 367 try: 368 timeout_join() 369 except TimeoutError: 370 # OK if we timeout, likely there is a problem so this join is a deadlock 371 pass 372 self.check() 373 # send stop-signal to child processes 374 for _ in self.workers: 375 self.put_packet(self.SendPacket(type=self.QueueSignal.EXIT_QUEUE, data=None)) 376 377 self._consume_results() 378 # If there is a lot of data being sent from the child processes, they may not fully 379 # flush in time for us to catch their output in the first consume_results() call. 380 # 381 # We need to spin (and continue consuming results) until all processes have have 382 # exited, since they will only fully exit once they flush their pipes. 383 @timeout(seconds=60) 384 def reap_workers() -> None: 385 while 1: 386 live_list = [w.is_alive() for w in self.workers] 387 for worker, alive in zip(self.workers, live_list): 388 self._vprint( 389 'Checking whether process', worker.name, 'has finished:', 'no' if alive else 'yes' 390 ) 391 if alive: 392 worker.join(timeout=1) 393 self._consume_results() 394 if sum(live_list) == 0: 395 break 396 return 397 398 try: 399 reap_workers() 400 except TimeoutError: 401 mess = '\n'.join( 402 f'{worker.name}: {"alive" if worker.is_alive() else "terminated"}' for worker in self.workers 403 ) 404 self.__crash_and_burn(f'Timed out! Workers failed to terminate:\n{mess}') 405 self.workers = [] 406 return 407 408 def _finalize(self) -> None: 409 r"""Finalize a `ParallelPool` 410 411 Notes 412 ----- 413 In addition to reaping all workers, this also closes all queues 414 """ 415 self._flush_workers() 416 self.error_queue.close() 417 self.return_queue.close() 418 return 419 420 def check(self) -> None: 421 r"""Check for errors from the queue 422 423 Notes 424 ----- 425 Calls `self.__crash_and_burn()` if any errors are detected, but does nothing if running in 426 serial 427 """ 428 stop_multiproc = False 429 timeout_it = 0 430 max_timeouts = 3 431 while not self.error_queue.empty(): 432 # while this does get recreated for every error, we do not want to needlessly 433 # reinitialize it when no errors exist. If we get to this point however we no longer 434 # care about performance as we are about to crash everything. 435 try: 436 exception = self.error_queue.get(timeout=.5) 437 except queue.Empty: 438 # Queue is not empty (we were in the loop), but we timed out on the get. Should 439 # not happen yet here we are. Try a few couple more times, otherwise bail 440 timeout_it += 1 441 if timeout_it > max_timeouts: 442 break 443 continue 444 445 err_bars = ''.join(['[ERROR]', 85 * '-', '[ERROR]\n']) 446 try: 447 err_mess = f'{err_bars}{str(exception)}{err_bars}' 448 except: 449 err_mess = exception 450 451 print(err_mess, flush=True) 452 stop_multiproc = True 453 timeout_it = 0 454 if stop_multiproc: 455 self.__crash_and_burn('Error in child process detected') 456 return 457 458 def put_packet(self, packet: SendPacket) -> None: 459 r"""Put a `SendPacket` onto the queue 460 461 Parameters 462 ---------- 463 packet : 464 the packet 465 """ 466 # continuously put files onto the queue, if the queue is full we block for 467 # queueTimeout seconds and if we still cannot insert to the queue we check 468 # children for errors. If no errors are found we try again. 469 assert isinstance(packet, self.SendPacket) 470 while 1: 471 try: 472 self.input_queue.put(packet, True, 2) 473 except queue.Full: 474 # we don't want to join here since a child may have encountered an error! 475 self.check() 476 self._consume_results() 477 else: 478 # only get here if put is successful 479 break 480 return 481 482 def put(self, path: PathLike) -> None: 483 r"""Put an filepath into the `ParallelPool`s processing queue 484 485 Parameters 486 ---------- 487 path : 488 the path to be processed 489 """ 490 return self.put_packet(self.SendPacket(type=self.QueueSignal.FILE_PATH, data=path)) 491 492class SerialPool(WorkerPoolBase): 493 __slots__ = ('linter',) 494 495 linter: Linter 496 497 def __init__(self, verbose: int) -> None: 498 r"""Construct a `SerialPool` 499 500 Parameters 501 ---------- 502 verbose : 503 whether to print verbose output 504 """ 505 super().__init__(verbose) 506 return 507 508 def _setup(self, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None: 509 r"""Setup a `SerialPool` 510 511 Parameters 512 ---------- 513 compiler_flags : 514 the list of general compiler flags to parse the files with 515 clang_lib : 516 the path to libclang 517 clang_options : 518 the translation unit options to parse the files with 519 clang_compat_check : 520 whether to do compatibility checks for libclang 521 werror : 522 whether to consider warnings as errors 523 524 Notes 525 ----- 526 This routine is what constructs the `Linter` instance 527 """ 528 self.linter = Linter( 529 compiler_flags, clang_options=clang_options, verbose=self.verbose, werror=werror 530 ) 531 return 532 533 def _consume_results(self) -> None: 534 r"""Consume results for a `SerialPool`, does nothing""" 535 return 536 537 def _finalize(self) -> None: 538 r"""Finalize a `SerialPool`, does nothing""" 539 return 540 541 def put(self, item: PathLike) -> None: 542 r"""Put an item into a `SerialPool`s processing queue 543 544 Parameters 545 ---------- 546 item : 547 the item to process 548 """ 549 err_left, err_fixed, warnings, patches = self.linter.parse(item).diagnostics() 550 self.errors_left.append(err_left) 551 self.errors_fixed.append(err_fixed) 552 self.warnings.append(warnings) 553 self.patches.extend(patches) 554 return 555 556def WorkerPool(num_workers: int, verbose: int) -> Union[SerialPool, ParallelPool]: 557 r"""Construct a `WorkerPool` 558 559 Parameters 560 ---------- 561 num_workers : 562 the number of worker threads the pool should own 563 verbose : 564 what level to print verbose output at 565 566 Returns 567 ------- 568 pool : 569 the pool instance 570 571 Notes 572 ----- 573 If `num_workers` is < 0, then the number of processes is automatically determined, usually equal to 574 the number logical cores for the current machine. 575 """ 576 if num_workers < 0: 577 # take number of cores - 1, up to a maximum of 16 as not to overload big machines 578 num_workers = min(max(mp.cpu_count() - 1, 1), 16) 579 580 if num_workers in (0, 1): 581 if verbose: 582 pl.sync_print(f'Number of worker processes ({num_workers}) too small, disabling multiprocessing') 583 return SerialPool(verbose) 584 else: 585 if verbose: 586 pl.sync_print(f'Number of worker processes ({num_workers}) sufficient, enabling multiprocessing') 587 return ParallelPool(num_workers, verbose) 588