from __future__ import generators from __future__ import print_function from functools import reduce class DirectedGraph(object): '''This class is for directed graphs with vertices of arbitrary type''' def __init__(self, vertices = []): '''Create a graph''' self.vertices = [] self.inEdges = {} self.outEdges = {} map(self.addVertex, vertices) return def __len__(self): return len(self.vertices) def __str__(self): return 'DirectedGraph with '+str(len(self.vertices))+' vertices and '+str(reduce(lambda k,l: k+l, [len(edgeList) for edgeList in self.inEdges.values()], 0))+' edges' def addVertex(self, vertex): '''Add a vertex if it does not already exist in the vertex list - Should be able to use Set in Python 2.3''' if vertex is None: return if not vertex in self.vertices: self.vertices.append(vertex) self.clearEdges(vertex) return def addEdges(self, vertex, inputs = [], outputs = []): '''Define the in and out edges for a vertex by listing the other vertices defining the edges - If any vertex does not exist in the graph, it is created''' self.addVertex(vertex) for input in inputs: self.addVertex(input) if not vertex is None and not input is None: if not input in self.inEdges[vertex]: self.inEdges[vertex].append(input) if not vertex in self.outEdges[input]: self.outEdges[input].append(vertex) for output in outputs: self.addVertex(output) if not vertex is None and not output is None: if not vertex in self.inEdges[output]: self.inEdges[output].append(vertex) if not output in self.outEdges[vertex]: self.outEdges[vertex].append(output) return def getEdges(self, vertex): return (self.inEdges[vertex], self.outEdges[vertex]) def clearEdges(self, vertex, inOnly = 0, outOnly = 0): if inOnly and outOnly: raise RuntimeError('Inconsistent arguments') if not outOnly: self.inEdges[vertex] = [] if not inOnly: self.outEdges[vertex] = [] return def removeVertex(self, vertex): '''Remove a vertex if it already exists in the vertex list - Also removes all associated edges''' if vertex is None: return if vertex in self.vertices: self.vertices.remove(vertex) del self.inEdges[vertex] del self.outEdges[vertex] for v in self.vertices: if vertex in self.inEdges[v]: self.inEdges[v].remove(vertex) if vertex in self.outEdges[v]: self.outEdges[v].remove(vertex) return def replaceVertex(self, vertex, newVertex): '''Replace a vertex with newVertex if it already exists in the vertex list - Also transfers all associated edges''' if vertex is None or newVertex is None: return self.addEdges(newVertex, self.inEdges[vertex], self.outEdges[vertex]) self.removeVertex(vertex) return def addSubgraph(self, graph): '''Add the vertices and edges of another graph into this one''' map(self.addVertex, graph.vertices) map(lambda v: self.addEdges(v, *graph.getEdges(v)), graph.vertices) return def removeSubgraph(self, graph): '''Remove the vertices and edges of a subgraph, and all the edges connected to it''' map(self.removeVertex, graph.vertices) return def printIndent(self, indent): import sys for i in range(indent): sys.stdout.write(' ') def display(self): print('I am a DirectedGraph with '+str(len(self.vertices))+' vertices') for vertex in DirectedGraph.breadthFirstSearch(self): self.printIndent(vertex.__level) print('('+str(self.vertices.index(vertex))+') '+str(vertex.__class__.__module__)+' in: '+str(map(self.vertices.index, self.inEdges[vertex]))+' out: '+str(map(self.vertices.index, self.outEdges[vertex]))) return def appendGraph(self, graph): '''Join every leaf of this graph to every root of the input graph, leaving the result in this graph''' leaves = DirectedGraph.getLeaves(self) self.addSubgraph(graph) map(lambda v: self.addEdges(v, outputs = DirectedGraph.getRoots(graph)), leaves) return self def prependGraph(self, graph): '''Join every leaf of the input graph to every root of this graph, leaving the result in this graph''' roots = DirectedGraph.getRoots(self) self.addSubgraph(graph) map(lambda v: self.addEdges(v, outputs = roots), DirectedGraph.getLeaves(graph)) return self def getRoots(graph): '''Return all the sources in the graph (nodes without entering edges)''' return [v for v in graph.vertices if not len(graph.getEdges(v)[0])] getRoots = staticmethod(getRoots) def getLeaves(graph): '''Return all the sinks in the graph (nodes without exiting edges)''' return [v for v in graph.vertices if not len(graph.getEdges(v)[1])] getLeaves = staticmethod(getLeaves) def depthFirstVisit(graph, vertex, seen = None, returnFinished = 0, outEdges = 1): '''This is a generator returning vertices in a depth-first traversal only for the subtree rooted at vertex - If returnFinished is True, return a vertex when it finishes - Otherwise, return a vertex when it is first seen - If outEdges is True, proceed along these, otherwise use inEdges''' if seen is None: seen = [] seen.append(vertex) if not returnFinished: yield vertex # Cute trick since outEdges is index 1, and inEdges is index 0 for v in graph.getEdges(vertex)[outEdges]: if not v in seen: try: for v2 in DirectedGraph.depthFirstVisit(graph, v, seen, returnFinished, outEdges): yield v2 except StopIteration: pass if returnFinished: yield vertex return depthFirstVisit = staticmethod(depthFirstVisit) def depthFirstSearch(graph, returnFinished = 0, outEdges = 1): '''This is a generator returning vertices in a depth-first traversal - If returnFinished is True, return a vertex when it finishes - Otherwise, return a vertex when it is first seen - If outEdges is True, proceed along these, otherwise use inEdges''' seen = [] for vertex in graph.vertices: if not vertex in seen: try: for v in DirectedGraph.depthFirstVisit(graph, vertex, seen, returnFinished, outEdges): yield v except StopIteration: pass return depthFirstSearch = staticmethod(depthFirstSearch) def breadthFirstSearch(graph, returnFinished = 0): '''This is a generator returning vertices in a breadth-first traversal - If returnFinished is True, return a vertex when it finishes - Otherwise, return a vertex when it is first seen''' queue = DirectedGraph.getRoots(graph)[0:1] if not len(queue): return seen = [queue[0]] if not returnFinished: queue[0].__level = 0 yield queue[0] while len(queue): vertex = queue[-1] for v in graph.getEdges(vertex)[1]: if not v in seen: seen.append(v) v.__level = vertex.__level + 1 queue.insert(0, v) if not returnFinished: yield v vertex = queue.pop() if returnFinished: yield vertex return def topologicalSort(graph, start = None, outEdges = 1): '''Reorder the vertices using topological sort''' if start is None: vertices = [vertex for vertex in DirectedGraph.depthFirstSearch(graph, returnFinished = 1, outEdges = outEdges)] else: vertices = [vertex for vertex in DirectedGraph.depthFirstVisit(graph, start, returnFinished = 1, outEdges = outEdges)] vertices.reverse() for vertex in vertices: yield vertex return topologicalSort = staticmethod(topologicalSort)