xref: /petsc/lib/petsc/bin/maint/petsclinter/petsclinter/classes/_patch.py (revision 4c7cc9c8e01791debde927bc0816c9a347055c8f)
1#!/usr/bin/env python3
2"""
3# Created: Mon Jun 20 19:33:25 2022 (-0400)
4# @author: Jacob Faibussowitsch
5"""
6from __future__ import annotations
7
8from .._typing import *
9
10from ._src_pos    import SourceRange
11from ._attr_cache import AttributeCache
12
13class Delta:
14  __slots__ = 'value', 'extent', 'offset'
15
16  value: str
17  extent: SourceRange
18  offset: int
19
20  def __init__(self, value: str, extent: SourceRange, ctxlines: int) -> None:
21    r"""Construct a `Delta`
22
23    Parameters
24    ----------
25    value :
26      the value to replace the `extent` with
27    extent :
28      a source range to be replaced
29    ctxlines :
30      number of lines before and after -- context lines -- to include in the source
31    """
32    self.value  = str(value)
33    self.extent = extent
34    begin       = extent.start
35    self.offset = begin.offset - begin.column - sum(
36      map(len, extent.raw(num_context=ctxlines).splitlines(True)[:ctxlines])
37    ) + 1
38    return
39
40  def deleter(self) -> bool:
41    r"""Is this `Delta` a deletion delta
42
43    Returns
44    -------
45    ret :
46      True if this `Delta` deletes its entire range, False otherwise
47    """
48    return self.value == ''
49
50  def apply(self, src: str, offset: int) -> tuple[str, int]:
51    r"""Apply the delta
52
53    Parameters
54    ----------
55    src :
56      the source to modify
57    offset :
58      offset into `src` at which this delta supposedly applies
59
60    Returns
61    -------
62    new_src, new_offset :
63      the updated `src` and the new offset value
64    """
65    extent       = self.extent
66    src_offset   = offset - self.offset
67    begin_offset = extent.start.offset + src_offset
68    end_offset   = extent.end.offset + src_offset
69    new_src      = f'{src[:begin_offset]}{self.value}{src[end_offset:]}'
70    new_offset   = offset + len(new_src) - len(src)
71    return new_src, new_offset
72
73  def is_deletion_superset_of(self, other: Delta) -> bool:
74    r"""Determine if self's change deletes all of other's extent.
75
76    If this is the case, `other` is a pointless patch and can be discarded
77
78    Parameters
79    ----------
80    other :
81      the other `Delta`
82
83    Returns
84    -------
85    ret :
86      True if `self` deletes all of `other`, False otherwise
87    """
88    return self.deleter() and other.extent in self.extent
89
90class Patch(AttributeCache):
91  __global_counter: ClassVar[int] = 0
92  __slots__                       = 'extent', 'ctxlines', 'deltas', 'weak_data', '_cache', 'id'
93
94  extent: SourceRange
95  ctxlines: int
96  deltas: tuple[Delta, ...]
97  weak_data: list[WeakListRef]
98  id: int
99
100  def __init__(self, src_range: SourceRangeLike, value: str, contextlines: int = 2) -> None:
101    r"""Construct a `Patch`
102
103    Parameters
104    ----------
105    src_range :
106      the range in source to patch
107    value :
108      the replacement string for `src_range`
109    contextlines : 2, optional
110      the number of lines before and after `src_range` to include in the source
111    """
112    super().__init__()
113    self.extent    = SourceRange.cast(src_range)
114    self.ctxlines  = contextlines
115    self.deltas    = (Delta(value, self.extent, self.ctxlines),)
116    self.weak_data = []
117    self.id        = Patch.__global_counter
118    Patch.__global_counter += 1
119    return
120
121  @classmethod
122  def from_cursor(cls, cursor: CursorLike, value: str, **kwargs) -> Patch:
123    r"""Construct a `Patch` from a cursor
124
125    Parameters
126    ----------
127    cursor :
128      the cursor to take the extent from
129    value :
130      the value to replace the cursors extent with
131    **kwargs : optional
132      additional keyword arguments to pass to the `Patch` constructor
133
134    Returns
135    -------
136    patch :
137      the `Patch` object
138    """
139    return cls(cursor.extent, value, **kwargs)
140
141  def _make_source(self) -> str:
142    r"""Instantiate the initial raw source for this `Patch`
143
144    Returns
145    -------
146    src :
147      the text
148    """
149    return self.extent.raw(num_context=self.ctxlines)
150
151  def _contiguous_extent(self) -> bool:
152    r"""Does my extent (which is the union of the extents of my all my deltas) have no holes?
153
154    Returns
155    -------
156    ret :
157      True if this `Patch` is contiguous, False otherwise
158    """
159    cache_entry = 'contiguous'
160    deltas      = self.deltas
161    if len(deltas) == 1:
162      ret: bool = self._cache.setdefault(cache_entry, True)
163    else:
164      ret = self._get_cached(
165        cache_entry, lambda: all(p.extent.overlaps(c.extent) for p, c in zip(deltas[:-1], deltas[1:]))
166      )
167    return ret
168
169
170  def discard(self) -> None:
171    r"""Drop the error messages corresponding to this patch from the linter"""
172    del_indices = []
173    self_id     = self.id
174    assert self_id >= 0
175    for i, weak_ref in enumerate(self.weak_data):
176      elist = weak_ref()
177
178      if elist is not None:
179        del_indices.append(i)
180        # I don't know how this list could ever be longer than a single element, but I
181        # guess it does not hurt to handle this case?
182        idx = [eid for eid, (_, _, patch_id) in enumerate(elist) if patch_id == self_id]
183        if not idx:
184          raise RuntimeError('could not locate weakref idx for patch')
185        for i in reversed(idx):
186          del elist[i] # delete our entry in the error message list
187
188    for i in reversed(del_indices):
189      del self.weak_data[i]
190    return
191
192  def attach(self, cursor_id_errors: WeakListRef) -> None:
193    r"""Attach a weak reference to this `Patch`s entries in the linters errors
194
195    Parameters
196    ----------
197    cursor_id_errors :
198      the linters cursor-specific list of errors
199
200    Notes
201    -----
202    This is kind of a cludge and should probably be removed
203    """
204    self.weak_data.append(cursor_id_errors)
205    return
206
207  def is_deletion_superset_of(self, other: Patch) -> bool:
208    r"""Determine if any of self's deltas delete all of other's extent.
209
210    Parameters
211    ----------
212    other :
213      the other patch to compare against
214
215    Returns
216    -------
217    ret :
218      True if `self` deletes all of `other`, False otherwise
219    """
220    assert not self is other
221    oextent = other.extent
222    if oextent in self.extent:
223      deltas = self.deltas
224      # first check if any one delta deletes all of other, then check if all deltas are deleters,
225      return any(d.deleter() and oextent in d.extent for d in deltas) or \
226        (self._contiguous_extent() and all(d.deleter() for d in deltas))
227    return False
228
229  @staticmethod
230  def cull_deltas(deltas: tuple[Delta, ...]) -> tuple[Delta, ...]:
231    r"""Remove any pointless `Delta`s
232
233    Parameters
234    ----------
235    deltas :
236      a set of `Delta`'s to cull
237
238    Returns
239    -------
240    ret :
241      a typle of `Delta`'s which has all deleted subsets removed, i.e. no `Delta` should be a deletion
242      superset of any other
243    """
244    return tuple([
245      d_i for d_i in deltas if not any(
246        d_j.is_deletion_superset_of(d_i) for d_j in deltas if d_j is not d_i
247      )
248    ])
249
250  def merge(self, other: Patch) -> Patch:
251    r"""Merge a patch with another patch.
252
253    If either of the patches is a 'deletion superset' of the other the redundant patch is discarded
254    from the linter.
255
256    Parameters
257    ----------
258    other :
259      the `Patch` to merge with
260
261    Returns
262    -------
263    ret :
264      the merged `Patch`
265
266    Raises
267    ------
268    TypeError
269      if `other` is not a `Patch`
270    """
271    if not isinstance(other, type(self)):
272      raise TypeError(type(other))
273
274    if self is other:
275      return self
276
277    if self.is_deletion_superset_of(other):
278      other.discard()
279      return self
280
281    if other.is_deletion_superset_of(self):
282      self.discard()
283      return other
284
285    assert self._make_source() == other._make_source(), 'Need to update offset calculation to handle arbitrary src'
286    assert self.ctxlines == other.ctxlines, 'Need to update ctxlines to handle arbitrary src'
287
288    self.extent = self.extent.merge_with(other.extent)
289    # uncomment when it handles arbitrary source
290    # self.src    = self._make_source()
291    # fixes and ranges must be applied in order
292    combined    = self.cull_deltas(self.deltas + other.deltas)
293    argsort     = sorted(range(len(combined)), key=lambda x: combined[x].extent)
294    self.deltas = tuple(combined[i] for i in argsort)
295    self._cache = {}
296    self.weak_data.extend(other.weak_data)
297    return self
298
299  def collapse(self) -> str:
300    r"""Collapses a the list of fixes and produces into a modified output
301
302    Returns
303    -------
304    src:
305      the patched source
306    """
307    # Fixes probably should not overwrite each other (for now), so we error out, but this
308    # is arguably a completely valid case. I just have not seen an example of it that I
309    # can use to debug with yet.
310    def do_collapse() -> str:
311      idx_delta = 0
312      src       = old_src = self._make_source()
313      for delta in self.deltas:
314        src, idx_delta = delta.apply(src, idx_delta)
315
316      assert src != old_src, 'Patch did not seem to do anything!'
317      return src
318
319    return self._get_cached('fixed', do_collapse)
320
321
322  def view(self) -> None:
323    r"""Visualize the action of the `Patch`"""
324    import difflib
325    import petsclinter as pl
326
327    idx_delta = 0
328    before    = self._make_source()
329    for i, delta in enumerate(self.deltas):
330      pl.sync_print('Delta:', i, f'({delta})')
331      after, idx_delta = delta.apply(before, idx_delta)
332      pl.sync_print(''.join(difflib.unified_diff(
333        before.splitlines(True), after.splitlines(True), fromfile='Original', tofile='Modified'
334      )))
335      before = after
336    return
337