1#!/usr/bin/env python3 2""" 3# Created: Mon Jun 20 19:33:25 2022 (-0400) 4# @author: Jacob Faibussowitsch 5""" 6from __future__ import annotations 7 8from .._typing import * 9 10from ._src_pos import SourceRange 11from ._attr_cache import AttributeCache 12 13class Delta: 14 __slots__ = 'value', 'extent', 'offset' 15 16 value: str 17 extent: SourceRange 18 offset: int 19 20 def __init__(self, value: str, extent: SourceRange, ctxlines: int) -> None: 21 r"""Construct a `Delta` 22 23 Parameters 24 ---------- 25 value : 26 the value to replace the `extent` with 27 extent : 28 a source range to be replaced 29 ctxlines : 30 number of lines before and after -- context lines -- to include in the source 31 """ 32 self.value = str(value) 33 self.extent = extent 34 begin = extent.start 35 self.offset = begin.offset - begin.column - sum( 36 map(len, extent.raw(num_context=ctxlines).splitlines(True)[:ctxlines]) 37 ) + 1 38 return 39 40 def deleter(self) -> bool: 41 r"""Is this `Delta` a deletion delta 42 43 Returns 44 ------- 45 ret : 46 True if this `Delta` deletes its entire range, False otherwise 47 """ 48 return self.value == '' 49 50 def apply(self, src: str, offset: int) -> tuple[str, int]: 51 r"""Apply the delta 52 53 Parameters 54 ---------- 55 src : 56 the source to modify 57 offset : 58 offset into `src` at which this delta supposedly applies 59 60 Returns 61 ------- 62 new_src, new_offset : 63 the updated `src` and the new offset value 64 """ 65 extent = self.extent 66 src_offset = offset - self.offset 67 begin_offset = extent.start.offset + src_offset 68 end_offset = extent.end.offset + src_offset 69 new_src = f'{src[:begin_offset]}{self.value}{src[end_offset:]}' 70 new_offset = offset + len(new_src) - len(src) 71 return new_src, new_offset 72 73 def is_deletion_superset_of(self, other: Delta) -> bool: 74 r"""Determine if self's change deletes all of other's extent. 75 76 If this is the case, `other` is a pointless patch and can be discarded 77 78 Parameters 79 ---------- 80 other : 81 the other `Delta` 82 83 Returns 84 ------- 85 ret : 86 True if `self` deletes all of `other`, False otherwise 87 """ 88 return self.deleter() and other.extent in self.extent 89 90class Patch(AttributeCache): 91 __global_counter: ClassVar[int] = 0 92 __slots__ = 'extent', 'ctxlines', 'deltas', 'weak_data', '_cache', 'id' 93 94 extent: SourceRange 95 ctxlines: int 96 deltas: tuple[Delta, ...] 97 weak_data: list[WeakListRef] 98 id: int 99 100 def __init__(self, src_range: SourceRangeLike, value: str, contextlines: int = 2) -> None: 101 r"""Construct a `Patch` 102 103 Parameters 104 ---------- 105 src_range : 106 the range in source to patch 107 value : 108 the replacement string for `src_range` 109 contextlines : 2, optional 110 the number of lines before and after `src_range` to include in the source 111 """ 112 super().__init__() 113 self.extent = SourceRange.cast(src_range) 114 self.ctxlines = contextlines 115 self.deltas = (Delta(value, self.extent, self.ctxlines),) 116 self.weak_data = [] 117 self.id = Patch.__global_counter 118 Patch.__global_counter += 1 119 return 120 121 @classmethod 122 def from_cursor(cls, cursor: CursorLike, value: str, **kwargs) -> Patch: 123 r"""Construct a `Patch` from a cursor 124 125 Parameters 126 ---------- 127 cursor : 128 the cursor to take the extent from 129 value : 130 the value to replace the cursors extent with 131 **kwargs : optional 132 additional keyword arguments to pass to the `Patch` constructor 133 134 Returns 135 ------- 136 patch : 137 the `Patch` object 138 """ 139 return cls(cursor.extent, value, **kwargs) 140 141 def _make_source(self) -> str: 142 r"""Instantiate the initial raw source for this `Patch` 143 144 Returns 145 ------- 146 src : 147 the text 148 """ 149 return self.extent.raw(num_context=self.ctxlines) 150 151 def _contiguous_extent(self) -> bool: 152 r"""Does my extent (which is the union of the extents of my all my deltas) have no holes? 153 154 Returns 155 ------- 156 ret : 157 True if this `Patch` is contiguous, False otherwise 158 """ 159 cache_entry = 'contiguous' 160 deltas = self.deltas 161 if len(deltas) == 1: 162 ret: bool = self._cache.setdefault(cache_entry, True) 163 else: 164 ret = self._get_cached( 165 cache_entry, lambda: all(p.extent.overlaps(c.extent) for p, c in zip(deltas[:-1], deltas[1:])) 166 ) 167 return ret 168 169 170 def discard(self) -> None: 171 r"""Drop the error messages corresponding to this patch from the linter""" 172 del_indices = [] 173 self_id = self.id 174 assert self_id >= 0 175 for i, weak_ref in enumerate(self.weak_data): 176 elist = weak_ref() 177 178 if elist is not None: 179 del_indices.append(i) 180 # I don't know how this list could ever be longer than a single element, but I 181 # guess it does not hurt to handle this case? 182 idx = [eid for eid, (_, _, patch_id) in enumerate(elist) if patch_id == self_id] 183 if not idx: 184 raise RuntimeError('could not locate weakref idx for patch') 185 for i in reversed(idx): 186 del elist[i] # delete our entry in the error message list 187 188 for i in reversed(del_indices): 189 del self.weak_data[i] 190 return 191 192 def attach(self, cursor_id_errors: WeakListRef) -> None: 193 r"""Attach a weak reference to this `Patch`s entries in the linters errors 194 195 Parameters 196 ---------- 197 cursor_id_errors : 198 the linters cursor-specific list of errors 199 200 Notes 201 ----- 202 This is kind of a cludge and should probably be removed 203 """ 204 self.weak_data.append(cursor_id_errors) 205 return 206 207 def is_deletion_superset_of(self, other: Patch) -> bool: 208 r"""Determine if any of self's deltas delete all of other's extent. 209 210 Parameters 211 ---------- 212 other : 213 the other patch to compare against 214 215 Returns 216 ------- 217 ret : 218 True if `self` deletes all of `other`, False otherwise 219 """ 220 assert not self is other 221 oextent = other.extent 222 if oextent in self.extent: 223 deltas = self.deltas 224 # first check if any one delta deletes all of other, then check if all deltas are deleters, 225 return any(d.deleter() and oextent in d.extent for d in deltas) or \ 226 (self._contiguous_extent() and all(d.deleter() for d in deltas)) 227 return False 228 229 @staticmethod 230 def cull_deltas(deltas: tuple[Delta, ...]) -> tuple[Delta, ...]: 231 r"""Remove any pointless `Delta`s 232 233 Parameters 234 ---------- 235 deltas : 236 a set of `Delta`'s to cull 237 238 Returns 239 ------- 240 ret : 241 a typle of `Delta`'s which has all deleted subsets removed, i.e. no `Delta` should be a deletion 242 superset of any other 243 """ 244 return tuple([ 245 d_i for d_i in deltas if not any( 246 d_j.is_deletion_superset_of(d_i) for d_j in deltas if d_j is not d_i 247 ) 248 ]) 249 250 def merge(self, other: Patch) -> Patch: 251 r"""Merge a patch with another patch. 252 253 If either of the patches is a 'deletion superset' of the other the redundant patch is discarded 254 from the linter. 255 256 Parameters 257 ---------- 258 other : 259 the `Patch` to merge with 260 261 Returns 262 ------- 263 ret : 264 the merged `Patch` 265 266 Raises 267 ------ 268 TypeError 269 if `other` is not a `Patch` 270 """ 271 if not isinstance(other, type(self)): 272 raise TypeError(type(other)) 273 274 if self is other: 275 return self 276 277 if self.is_deletion_superset_of(other): 278 other.discard() 279 return self 280 281 if other.is_deletion_superset_of(self): 282 self.discard() 283 return other 284 285 assert self._make_source() == other._make_source(), 'Need to update offset calculation to handle arbitrary src' 286 assert self.ctxlines == other.ctxlines, 'Need to update ctxlines to handle arbitrary src' 287 288 self.extent = self.extent.merge_with(other.extent) 289 # uncomment when it handles arbitrary source 290 # self.src = self._make_source() 291 # fixes and ranges must be applied in order 292 combined = self.cull_deltas(self.deltas + other.deltas) 293 argsort = sorted(range(len(combined)), key=lambda x: combined[x].extent) 294 self.deltas = tuple(combined[i] for i in argsort) 295 self._cache = {} 296 self.weak_data.extend(other.weak_data) 297 return self 298 299 def collapse(self) -> str: 300 r"""Collapses a the list of fixes and produces into a modified output 301 302 Returns 303 ------- 304 src: 305 the patched source 306 """ 307 # Fixes probably should not overwrite each other (for now), so we error out, but this 308 # is arguably a completely valid case. I just have not seen an example of it that I 309 # can use to debug with yet. 310 def do_collapse() -> str: 311 idx_delta = 0 312 src = old_src = self._make_source() 313 for delta in self.deltas: 314 src, idx_delta = delta.apply(src, idx_delta) 315 316 assert src != old_src, 'Patch did not seem to do anything!' 317 return src 318 319 return self._get_cached('fixed', do_collapse) 320 321 322 def view(self) -> None: 323 r"""Visualize the action of the `Patch`""" 324 import difflib 325 import petsclinter as pl 326 327 idx_delta = 0 328 before = self._make_source() 329 for i, delta in enumerate(self.deltas): 330 pl.sync_print('Delta:', i, f'({delta})') 331 after, idx_delta = delta.apply(before, idx_delta) 332 pl.sync_print(''.join(difflib.unified_diff( 333 before.splitlines(True), after.splitlines(True), fromfile='Original', tofile='Modified' 334 ))) 335 before = after 336 return 337