xref: /petsc/lib/petsc/bin/maint/petsclinter/petsclinter/classes/_pool.py (revision b0dcfd164860a975c76f90dabf1036901aab1c4e)
1#!/usr/bin/env python3
2"""
3# Created: Mon Jun 20 17:59:46 2022 (-0400)
4# @author: Jacob Faibussowitsch
5"""
6from __future__ import annotations
7
8import os
9import abc
10import enum
11import queue
12import multiprocessing as mp
13import multiprocessing.synchronize
14import petsclinter     as pl
15
16from .._typing import *
17
18from ..util._timeout import timeout, TimeoutError
19
20from ._linter import Linter
21from ._path   import Path
22
23_T = TypeVar('_T')
24
25# directory names to exclude from processing, case sensitive
26exclude_dir_names = {
27  'tests', 'tutorials', 'output', 'input', 'python', 'benchmarks', 'docs', 'binding', 'contrib',
28  'ftn-mod', 'ftn-src', 'ftn-custom', 'ftn-kernels',
29  'perfstubs', 'yaml'
30}
31# directory suffixes to exclude from processing, case sensitive
32exclude_dir_suffixes  = ('.dSYM', '.DS_Store')
33# file extensions to process, case sensitve
34allow_file_extensions = ('.c', '.cpp', '.cxx', '.cu', '.cc', '.h', '.hpp', '.inc')
35
36class WorkerPoolBase(abc.ABC):
37  __slots__ = ('verbose', 'warnings', 'errors_left', 'errors_fixed', 'patches')
38
39  verbose: int
40  warnings: list[CondensedDiags]
41  errors_left: list[CondensedDiags]
42  errors_fixed: list[CondensedDiags]
43  patches: list[PathDiffPair]
44
45  class QueueSignal(enum.IntEnum):
46    """
47    Various signals to indicate return type on the data queue from child processes
48    """
49    FILE_PATH  = enum.auto()
50    EXIT_QUEUE = enum.auto()
51
52  def __init__(self, verbose: int) -> None:
53    r"""Construct a `WoekerPoolBase`
54
55    Parameters
56    ----------
57    verbose :
58      whether to print verbose logging output (at level)
59    """
60    super().__init__()
61    self.verbose      = verbose
62    self.warnings     = []
63    self.errors_left  = []
64    self.errors_fixed = []
65    self.patches      = []
66    return
67
68  def _vprint(self: PoolImpl, *args, **kwargs) -> None:
69    r"""Prints output, but only in verbose mode"""
70    if self.verbose:
71      pl.sync_print(*args, **kwargs)
72    return
73
74  @abc.abstractmethod
75  def _setup(self: PoolImpl, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None:
76    return
77
78  @abc.abstractmethod
79  def _consume_results(self) -> None:
80    return
81
82  @abc.abstractmethod
83  def _finalize(self) -> None:
84    return
85
86  @abc.abstractmethod
87  def put(self: PoolImpl, item: PathLike) -> None:
88    raise NotImplementedError
89
90  def setup(self: PoolImpl, compiler_flags: list[str], clang_lib: Optional[PathLike] = None, clang_options: Optional[CXTranslationUnit] = None, clang_compat_check: bool = True, werror: bool = False) -> PoolImpl:
91    r"""Set up a `WorkerPool` instance
92
93    Parameters
94    ----------
95    compiler_flags :
96      the list of compiler flags to pass to the `Linter`
97    clang_lib : optional
98      the path to libclang
99    clang_options: optional
100      the options to pass to the `Linter`, defaults to `petsclinter.util.base_clang_options`
101    clang_compat_check: optional
102      whether to do compatibility checks (if this initializes libclang)
103    werror:
104      whether to treat warnings as errors
105
106    Returns
107    -------
108    self:
109      the `WorkerPool` instance
110    """
111    if clang_lib is None:
112      import clang.cindex as clx # type: ignore[import]
113      assert clx.conf.loaded, 'Must initialize libClang first'
114      clang_lib = clx.conf.get_filename()
115
116    if clang_options is None:
117      clang_options = pl.util.base_clang_options
118
119    self._setup(compiler_flags, clang_lib, clang_options, clang_compat_check, werror)
120    return self
121
122  def walk(self: PoolImpl, src_path_list: Sequence[PathLike], exclude_dirs: Optional[Collection[str]] = None, exclude_dir_suff: Optional[tuple[str, ...]] = None, allow_file_suff: Optional[tuple[str, ...]] = None) -> PoolImpl:
123    r"""Walk `src_path_list` and process it
124
125    Parameters
126    ----------
127    src_path_list :
128      a list of paths to process
129    exclude_dirs : optional
130      a list or set to exclude from processing
131    exclude_dir_suff : optional
132      a set of suffixes to ignore
133    allow_file_suff : optional
134      a list of suffixes to explicitly allow
135
136    Returns
137    -------
138    self :
139      the `WorkerPool` instance
140    """
141    if exclude_dirs is None:
142      exclude_dirs = exclude_dir_names
143    if exclude_dir_suff is None:
144      exclude_dir_suff = exclude_dir_suffixes
145    if allow_file_suff is None:
146      allow_file_suff = allow_file_extensions
147
148    for src_path in src_path_list:
149      if src_path.is_file():
150        self.put(src_path)
151        continue
152
153      _, dirs, _   = next(os.walk(src_path))
154      dir_gen      = (d for d in dirs if d not in exclude_dirs)
155      initial_dirs = {str(src_path / d) for d in dir_gen if not d.endswith(exclude_dir_suff)}
156      for root, dirs, files in os.walk(src_path):
157        self._vprint('Processing directory', root)
158        dirs[:] = [d for d in dirs if d not in exclude_dirs]
159        dirs[:] = [d for d in dirs if not d.endswith(exclude_dir_suff)]
160        for filename in (os.path.join(root, f) for f in files if f.endswith(allow_file_suff)):
161          self.put(Path(filename))
162        # Every time we reach another top-level node we consume some of the results. This
163        # makes the eventual consume-until-empty loop much faster since the queue is not
164        # as backed up
165        if root in initial_dirs:
166          self._consume_results()
167    return self
168
169  def finalize(self: PoolImpl) -> tuple[list[CondensedDiags], list[CondensedDiags], list[CondensedDiags], list[PathDiffPair]]:
170    r"""Finalize the queue and return the results
171
172    Returns
173    -------
174    warnings :
175      the list of warnings
176    errors_left :
177      the remaining (unfixed) errors
178    errors_fixed :
179      the fixed errors
180    patches :
181      the generated patches
182
183    Notes
184    -----
185    If running in parallel, and workers fail to finalize in time, calls `self.__crash_and_burn()`
186    """
187    def prune(container: list[_T]) -> list[_T]:
188      return [item for item in container if item]
189
190    self._finalize()
191    warnings     = prune(self.warnings)
192    errors_left  = prune(self.errors_left)
193    errors_fixed = prune(self.errors_fixed)
194    patches      = prune(self.patches)
195    return warnings, errors_left, errors_fixed, patches
196
197class ParallelPool(WorkerPoolBase):
198  __slots__ = ('input_queue', 'error_queue', 'return_queue', 'lock', 'workers', 'num_workers')
199
200  class SendPacket(NamedTuple):
201    type: WorkerPoolBase.QueueSignal
202    data: Optional[PathLike]
203
204  class ReturnPacket(NamedTuple):
205    patches: List[PathDiffPair]
206    errors_left: CondensedDiags
207    errors_fixed: CondensedDiags
208    warnings: CondensedDiags
209
210  # use Union to get around not having TypeAlias until 3.10
211  CommandQueueType: TypeAlias = 'mp.JoinableQueue[SendPacket]'
212  ErrorQueueType: TypeAlias   = 'mp.Queue[str]'
213  ReturnQueueType: TypeAlias  = 'mp.Queue[ReturnPacket]'
214  LockType: TypeAlias         = mp.synchronize.Lock
215  input_queue: CommandQueueType
216  error_queue: ErrorQueueType
217  return_queue: ReturnQueueType
218  lock: LockType
219  workers: list[mp.Process]
220  num_workers: int
221
222  def __init__(self, num_workers: int, verbose: int) -> None:
223    r"""Construct a `ParallelPool`
224
225    Parameters
226    ----------
227    num_workers :
228      how many worker processes to spawn
229    verbose :
230      whether to print verbose output
231    """
232    super().__init__(verbose)
233    self.input_queue  = mp.JoinableQueue()
234    self.error_queue  = mp.Queue()
235    self.return_queue = mp.Queue()
236    lock              = mp.Lock()
237    self.lock         = lock
238    self.workers      = []
239    self.num_workers  = num_workers
240
241    old_sync_print = pl.sync_print
242    def lock_sync_print(*args, **kwargs) -> None:
243      with lock:
244        old_sync_print(*args, **kwargs)
245      return
246    pl.sync_print = lock_sync_print
247    return
248
249  @timeout(seconds=10)
250  def __crash_and_burn(self, message: str) -> NoReturn:
251    r"""Forcefully annihilate the pool and crash the program
252
253    Parameters
254    ----------
255    message :
256      an informative message to print on crashing
257
258    Raises
259    ------
260    RuntimeError :
261      raises a RuntimeError in all cases
262    """
263    for worker in self.workers:
264      if worker is not None:
265        try:
266          worker.terminate()
267        except:
268          pass
269    raise RuntimeError(message)
270
271  def _consume_results(self) -> None:
272    r"""Consume pending results from the queue
273
274    Raises
275    ------
276    ValueError :
277      if an unknown QueueSignal is returned from the pipe
278    """
279    self.check()
280    return_q = self.return_queue
281    try:
282      qsize_mess = str(return_q.qsize())
283    except NotImplementedError:
284      # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.qsize
285      #
286      # Note that this may raise NotImplementedError on Unix platforms like macOS where
287      # sem_getvalue() is not implemented.
288      qsize_mess = '0' if return_q.empty() else 'unknown (not implemented on platform)'
289    self._vprint('Estimated number of results:', qsize_mess)
290
291    while not return_q.empty():
292      try:
293        packet = return_q.get(timeout=1)
294      except queue.Empty:
295        # this should never really happen since this thread is the only consumer of the
296        # queue, but it's here just in case
297        break
298      if not isinstance(packet, self.ReturnPacket):
299        try: # type: ignore[unreachable]
300          self.__crash_and_burn('')
301        except RuntimeError:
302          pass
303        raise ValueError(type(packet))
304      self.errors_left.append(packet.errors_left)
305      self.errors_fixed.append(packet.errors_fixed)
306      self.patches.extend(packet.patches)
307      self.warnings.append(packet.warnings)
308    return
309
310  def _setup(self, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None:
311    r"""Setup a `ParallelPool`
312
313    Parameters
314    ----------
315    compiler_flags :
316      the list of general compiler flags to parse the files with
317    clang_lib :
318      the path to libclang
319    clang_options :
320      the translation unit options to parse the files with
321    clang_compat_check :
322      whether to do compatibility checks for libclang
323    werror :
324      whether to consider warnings as errors
325
326    Notes
327    -----
328    This routine is what actually spawns the processes
329    """
330    from ..queue_main       import queue_main
331    from ..checks._register import check_function_map, classid_map
332    from ._diag             import DiagnosticManager
333
334    # in case we are double-calling this
335    self._flush_workers()
336    assert len(self.workers) == 0
337    for i in range(self.num_workers):
338      worker = mp.Process(
339        target=queue_main,
340        args=(
341          clang_lib, clang_compat_check, check_function_map, classid_map, DiagnosticManager,
342          compiler_flags, clang_options, self.verbose, werror, self.error_queue, self.return_queue,
343          self.input_queue, self.lock
344        ),
345        name=f'[{i}]'
346      )
347      worker.start()
348      self.workers.append(worker)
349    return
350
351  def _flush_workers(self) -> None:
352    r"""Reap any existing workers
353
354    Notes
355    -----
356    Tries to gracefully terminate any existing workers, but will crash if it is not able to. Does
357    nothing if no workers active.
358    """
359    import time
360
361    @timeout(seconds=3)
362    def timeout_join() -> None:
363      self.input_queue.join()
364      return
365
366    # join here to colocate error messages if needs be
367    try:
368      timeout_join()
369    except TimeoutError:
370      # OK if we timeout, likely there is a problem so this join is a deadlock
371      pass
372    self.check()
373    # send stop-signal to child processes
374    for _ in self.workers:
375      self.put_packet(self.SendPacket(type=self.QueueSignal.EXIT_QUEUE, data=None))
376
377    self._consume_results()
378    # If there is a lot of data being sent from the child processes, they may not fully
379    # flush in time for us to catch their output in the first consume_results() call.
380    #
381    # We need to spin (and continue consuming results) until all processes have have
382    # exited, since they will only fully exit once they flush their pipes.
383    @timeout(seconds=60)
384    def reap_workers() -> None:
385      while 1:
386        live_list = [w.is_alive() for w in self.workers]
387        for worker, alive in zip(self.workers, live_list):
388          self._vprint(
389            'Checking whether process', worker.name, 'has finished:', 'no' if alive else 'yes'
390          )
391          if alive:
392            worker.join(timeout=1)
393            self._consume_results()
394        if sum(live_list) == 0:
395          break
396      return
397
398    try:
399      reap_workers()
400    except TimeoutError:
401      mess = '\n'.join(
402        f'{worker.name}: {"alive" if worker.is_alive() else "terminated"}' for worker in self.workers
403      )
404      self.__crash_and_burn(f'Timed out! Workers failed to terminate:\n{mess}')
405    self.workers = []
406    return
407
408  def _finalize(self) -> None:
409    r"""Finalize a `ParallelPool`
410
411    Notes
412    -----
413    In addition to reaping all workers, this also closes all queues
414    """
415    self._flush_workers()
416    self.error_queue.close()
417    self.return_queue.close()
418    return
419
420  def check(self) -> None:
421    r"""Check for errors from the queue
422
423    Notes
424    -----
425    Calls `self.__crash_and_burn()` if any errors are detected, but does nothing if running in
426    serial
427    """
428    stop_multiproc = False
429    timeout_it     = 0
430    max_timeouts   = 3
431    while not self.error_queue.empty():
432      # while this does get recreated for every error, we do not want to needlessly
433      # reinitialize it when no errors exist. If we get to this point however we no longer
434      # care about performance as we are about to crash everything.
435      try:
436        exception = self.error_queue.get(timeout=.5)
437      except queue.Empty:
438        # Queue is not empty (we were in the loop), but we timed out on the get. Should
439        # not happen yet here we are. Try a few couple more times, otherwise bail
440        timeout_it += 1
441        if timeout_it > max_timeouts:
442          break
443        continue
444
445      err_bars = ''.join(['[ERROR]', 85 * '-', '[ERROR]\n'])
446      try:
447        err_mess = f'{err_bars}{str(exception)}{err_bars}'
448      except:
449        err_mess = exception
450
451      print(err_mess, flush=True)
452      stop_multiproc = True
453      timeout_it     = 0
454    if stop_multiproc:
455      self.__crash_and_burn('Error in child process detected')
456    return
457
458  def put_packet(self, packet: SendPacket) -> None:
459    r"""Put a `SendPacket` onto the queue
460
461    Parameters
462    ----------
463    packet :
464      the packet
465    """
466    # continuously put files onto the queue, if the queue is full we block for
467    # queueTimeout seconds and if we still cannot insert to the queue we check
468    # children for errors. If no errors are found we try again.
469    assert isinstance(packet, self.SendPacket)
470    while 1:
471      try:
472        self.input_queue.put(packet, True, 2)
473      except queue.Full:
474        # we don't want to join here since a child may have encountered an error!
475        self.check()
476        self._consume_results()
477      else:
478        # only get here if put is successful
479        break
480    return
481
482  def put(self, path: PathLike) -> None:
483    r"""Put an filepath into the `ParallelPool`s processing queue
484
485    Parameters
486    ----------
487    path :
488      the path to be processed
489    """
490    return self.put_packet(self.SendPacket(type=self.QueueSignal.FILE_PATH, data=path))
491
492class SerialPool(WorkerPoolBase):
493  __slots__ = ('linter',)
494
495  linter: Linter
496
497  def __init__(self, verbose: int) -> None:
498    r"""Construct a `SerialPool`
499
500    Parameters
501    ----------
502    verbose :
503      whether to print verbose output
504    """
505    super().__init__(verbose)
506    return
507
508  def _setup(self, compiler_flags: list[str], clang_lib: PathLike, clang_options: CXTranslationUnit, clang_compat_check: bool, werror: bool) -> None:
509    r"""Setup a `SerialPool`
510
511    Parameters
512    ----------
513    compiler_flags :
514      the list of general compiler flags to parse the files with
515    clang_lib :
516      the path to libclang
517    clang_options :
518      the translation unit options to parse the files with
519    clang_compat_check :
520      whether to do compatibility checks for libclang
521    werror :
522      whether to consider warnings as errors
523
524    Notes
525    -----
526    This routine is what constructs the `Linter` instance
527    """
528    self.linter = Linter(
529      compiler_flags, clang_options=clang_options, verbose=self.verbose, werror=werror
530    )
531    return
532
533  def _consume_results(self) -> None:
534    r"""Consume results for a `SerialPool`, does nothing"""
535    return
536
537  def _finalize(self) -> None:
538    r"""Finalize a `SerialPool`, does nothing"""
539    return
540
541  def put(self, item: PathLike) -> None:
542    r"""Put an item into a `SerialPool`s processing queue
543
544    Parameters
545    ----------
546    item :
547      the item to process
548    """
549    err_left, err_fixed, warnings, patches = self.linter.parse(item).diagnostics()
550    self.errors_left.append(err_left)
551    self.errors_fixed.append(err_fixed)
552    self.warnings.append(warnings)
553    self.patches.extend(patches)
554    return
555
556def WorkerPool(num_workers: int, verbose: int) -> Union[SerialPool, ParallelPool]:
557  r"""Construct a `WorkerPool`
558
559  Parameters
560  ----------
561  num_workers :
562    the number of worker threads the pool should own
563  verbose :
564    what level to print verbose output at
565
566  Returns
567  -------
568  pool :
569    the pool instance
570
571  Notes
572  -----
573  If `num_workers` is < 0, then the number of processes is automatically determined, usually equal to
574  the number logical cores for the current machine.
575  """
576  if num_workers < 0:
577    # take number of cores - 1, up to a maximum of 16 as not to overload big machines
578    num_workers = min(max(mp.cpu_count() - 1, 1), 16)
579
580  if num_workers in (0, 1):
581    if verbose:
582      pl.sync_print(f'Number of worker processes ({num_workers}) too small, disabling multiprocessing')
583    return SerialPool(verbose)
584  else:
585    if verbose:
586      pl.sync_print(f'Number of worker processes ({num_workers}) sufficient, enabling multiprocessing')
587    return ParallelPool(num_workers, verbose)
588