"""Implementation of gSpan.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import copy import itertools import time from .graph import AUTO_EDGE_ID from .graph import Graph from .graph import VACANT_GRAPH_ID from .graph import VACANT_VERTEX_LABEL def record_timestamp(func): """Record timestamp before and after call of `func`.""" def deco(self): self.timestamps[func.__name__ + '_in'] = time.time() func(self) self.timestamps[func.__name__ + '_out'] = time.time() return deco class DFSedge(object): """DFSedge class.""" def __init__(self, frm, to, vevlb): """Initialize DFSedge instance.""" self.frm = frm self.to = to self.vevlb = vevlb def __eq__(self, other): """Check equivalence of DFSedge.""" return (self.frm == other.frm and self.to == other.to and self.vevlb == other.vevlb) def __ne__(self, other): """Check if not equal.""" return not self.__eq__(other) def __repr__(self): """Represent DFScode in string way.""" return '(frm={}, to={}, vevlb={})'.format( self.frm, self.to, self.vevlb ) class DFScode(list): """DFScode is a list of DFSedge.""" def __init__(self): """Initialize DFScode.""" super().__init__() self.rmpath = list() def __eq__(self, other): """Check equivalence of DFScode.""" la, lb = len(self), len(other) if la != lb: return False for i in range(la): if self[i] != other[i]: return False return True def __ne__(self, other): """Check if not equal.""" return not self.__eq__(other) def __repr__(self): """Represent DFScode in string way.""" return ''.join(['[', ','.join( [str(dfsedge) for dfsedge in self]), ']'] ) def push_back(self, frm, to, vevlb): """Update DFScode by adding one edge.""" self.append(DFSedge(frm, to, vevlb)) return self def to_graph(self, gid=VACANT_GRAPH_ID, is_undirected=True): """Construct a graph according to the dfs code.""" g = Graph(gid, is_undirected=is_undirected, eid_auto_increment=True) for dfsedge in self: frm, to, (vlb1, elb, vlb2) = dfsedge.frm, dfsedge.to, dfsedge.vevlb if vlb1 != VACANT_VERTEX_LABEL: g.add_vertex(frm, vlb1) if vlb2 != VACANT_VERTEX_LABEL: g.add_vertex(to, vlb2) g.add_edge(AUTO_EDGE_ID, frm, to, elb) return g def from_graph(self, g): """Build DFScode from graph `g`.""" raise NotImplementedError('Not inplemented yet.') def build_rmpath(self): """Build right most path.""" self.rmpath = list() old_frm = None for i in range(len(self) - 1, -1, -1): dfsedge = self[i] frm, to = dfsedge.frm, dfsedge.to if frm < to and (old_frm is None or to == old_frm): self.rmpath.append(i) old_frm = frm return self def get_num_vertices(self): """Return number of vertices in the corresponding graph.""" return len(set( [dfsedge.frm for dfsedge in self] + [dfsedge.to for dfsedge in self] )) class PDFS(object): """PDFS class.""" def __init__(self, gid=VACANT_GRAPH_ID, edge=None, prev=None): """Initialize PDFS instance.""" self.gid = gid self.edge = edge self.prev = prev class Projected(list): """Projected is a list of PDFS. Each element of Projected is a projection one frequent graph in one original graph. """ def __init__(self): """Initialize Projected instance.""" super(Projected, self).__init__() def push_back(self, gid, edge, prev): """Update this Projected instance.""" self.append(PDFS(gid, edge, prev)) return self class History(object): """History class.""" def __init__(self, g, pdfs): """Initialize History instance.""" super(History, self).__init__() self.edges = list() self.vertices_used = collections.defaultdict(int) self.edges_used = collections.defaultdict(int) if pdfs is None: return while pdfs: e = pdfs.edge self.edges.append(e) (self.vertices_used[e.frm], self.vertices_used[e.to], self.edges_used[e.eid]) = 1, 1, 1 pdfs = pdfs.prev self.edges = self.edges[::-1] def has_vertex(self, vid): """Check if the vertex with vid exists in the history.""" return self.vertices_used[vid] == 1 def has_edge(self, eid): """Check if the edge with eid exists in the history.""" return self.edges_used[eid] == 1 class gSpan(object): """`gSpan` algorithm.""" def __init__(self, task, min_num_vertices=1, max_num_vertices=float('inf'), is_undirected=True, verbose=False, visualize=False, where=False): """Initialize gSpan instance.""" self._is_undirected = is_undirected self._task = task self._database = task.database self._min_num_vertices = min_num_vertices self._max_num_vertices = max_num_vertices self._DFScode = DFScode() self._support = 0 self._frequent_size1_subgraphs = list() # Include subgraphs with # any num(but >= 2, <= max_num_vertices) of vertices. self._counter = itertools.count() self._verbose = verbose self._visualize = visualize self._where = where self.timestamps = dict() if self._max_num_vertices < self._min_num_vertices: print('Max number of vertices can not be smaller than ' 'min number of that.\n' 'Set max_num_vertices = min_num_vertices.') self._max_num_vertices = self._min_num_vertices def time_stats(self): """Print stats of time.""" func_names = ['run'] time_deltas = collections.defaultdict(float) for fn in func_names: time_deltas[fn] = round( self.timestamps[fn + '_out'] - self.timestamps[fn + '_in'], 2 ) print('Total:\t{} s'.format(time_deltas['run'])) return self def _get_gid_subsets(self, projected): subsets = [[] for _ in self._task.gid_subsets] gids = set([g.gid for g in projected]) for gid in gids: subsets[self._gid_subset_ids[gid]].append(gid) return subsets @record_timestamp def run(self): """Run the gSpan algorithm.""" root = collections.defaultdict(Projected) gids = set([gid for gid_subset in self._task.gid_subsets for gid in gid_subset]) self._gid_subset_ids = {} for i, gid_subset in enumerate(self._task.gid_subsets): for gid in gid_subset: self._gid_subset_ids[gid] = i for gid in gids: g = self._database._graphs[gid] for vid, v in g.vertices.items(): edges = self._get_forward_root_edges(g, vid) for e in edges: root[(v.vlb, e.elb, g.vertices[e.to].vlb)].append( PDFS(gid, e, None) ) for vevlb, projected in root.items(): self._DFScode.append(DFSedge(0, 1, vevlb)) self._subgraph_mining(projected) self._DFScode.pop() def _report(self, projected): self._frequent_subgraphs.append(copy.copy(self._DFScode)) if self._DFScode.get_num_vertices() < self._min_num_vertices: return g = self._DFScode.to_graph(gid=next(self._counter), is_undirected=self._is_undirected) display_str = g.display() print('\nSupport: {}'.format(self._support)) if self._visualize: g.plot() if self._where: print('where: {}'.format(list(set([p.gid for p in projected])))) print('\n-----------------\n') def print_results(self): for i, subgraph in enumerate(self._frequent_subgraphs): g = subgraph.to_graph(gid=next(self._counter), is_undirected=self._is_undirected) g.display() print(self._subgraph_occurrences[i]) def _get_forward_root_edges(self, g, frm): result = [] v_frm = g.vertices[frm] for to, e in v_frm.edges.items(): if (not self._is_undirected) or v_frm.vlb <= g.vertices[to].vlb: result.append(e) return result def _get_backward_edge(self, g, e1, e2, history): if self._is_undirected and e1 == e2: return None for to, e in g.vertices[e2.to].edges.items(): if history.has_edge(e.eid) or e.to != e1.frm: continue # if reture here, then self._DFScodep[0] != dfs_code_min[0] # should be checked in _is_min(). or: if self._is_undirected: if e1.elb < e.elb or ( e1.elb == e.elb and g.vertices[e1.to].vlb <= g.vertices[e2.to].vlb): return e else: if g.vertices[e1.frm].vlb < g.vertices[e2.to] or ( g.vertices[e1.frm].vlb == g.vertices[e2.to] and e1.elb <= e.elb): return e # if e1.elb < e.elb or (e1.elb == e.elb and # g.vertices[e1.to].vlb <= g.vertices[e2.to].vlb): # return e return None def _get_forward_pure_edges(self, g, rm_edge, min_vlb, history): result = [] for to, e in g.vertices[rm_edge.to].edges.items(): if min_vlb <= g.vertices[e.to].vlb and ( not history.has_vertex(e.to)): result.append(e) return result def _get_forward_rmpath_edges(self, g, rm_edge, min_vlb, history): result = [] to_vlb = g.vertices[rm_edge.to].vlb for to, e in g.vertices[rm_edge.frm].edges.items(): new_to_vlb = g.vertices[to].vlb if (rm_edge.to == e.to or min_vlb > new_to_vlb or history.has_vertex(e.to)): continue if rm_edge.elb < e.elb or (rm_edge.elb == e.elb and to_vlb <= new_to_vlb): result.append(e) return result def _is_min(self): if self._verbose: print('is_min: checking {}'.format(self._DFScode)) if len(self._DFScode) == 1: return True g = self._DFScode.to_graph(gid=VACANT_GRAPH_ID, is_undirected=self._is_undirected) dfs_code_min = DFScode() root = collections.defaultdict(Projected) for vid, v in g.vertices.items(): edges = self._get_forward_root_edges(g, vid) for e in edges: root[(v.vlb, e.elb, g.vertices[e.to].vlb)].append( PDFS(g.gid, e, None)) min_vevlb = min(root.keys()) dfs_code_min.append(DFSedge(0, 1, min_vevlb)) # No need to check if is min code because of pruning in get_*_edge*. def project_is_min(projected): dfs_code_min.build_rmpath() rmpath = dfs_code_min.rmpath min_vlb = dfs_code_min[0].vevlb[0] maxtoc = dfs_code_min[rmpath[0]].to backward_root = collections.defaultdict(Projected) flag, newto = False, 0, end = 0 if self._is_undirected else -1 for i in range(len(rmpath) - 1, end, -1): if flag: break for p in projected: history = History(g, p) e = self._get_backward_edge(g, history.edges[rmpath[i]], history.edges[rmpath[0]], history) if e is not None: backward_root[e.elb].append(PDFS(g.gid, e, p)) newto = dfs_code_min[rmpath[i]].frm flag = True if flag: backward_min_elb = min(backward_root.keys()) dfs_code_min.append(DFSedge( maxtoc, newto, (VACANT_VERTEX_LABEL, backward_min_elb, VACANT_VERTEX_LABEL) )) idx = len(dfs_code_min) - 1 if self._DFScode[idx] != dfs_code_min[idx]: return False return project_is_min(backward_root[backward_min_elb]) forward_root = collections.defaultdict(Projected) flag, newfrm = False, 0 for p in projected: history = History(g, p) edges = self._get_forward_pure_edges(g, history.edges[rmpath[0]], min_vlb, history) if len(edges) > 0: flag = True newfrm = maxtoc for e in edges: forward_root[ (e.elb, g.vertices[e.to].vlb) ].append(PDFS(g.gid, e, p)) for rmpath_i in rmpath: if flag: break for p in projected: history = History(g, p) edges = self._get_forward_rmpath_edges(g, history.edges[ rmpath_i], min_vlb, history) if len(edges) > 0: flag = True newfrm = dfs_code_min[rmpath_i].frm for e in edges: forward_root[ (e.elb, g.vertices[e.to].vlb) ].append(PDFS(g.gid, e, p)) if not flag: return True forward_min_evlb = min(forward_root.keys()) dfs_code_min.append(DFSedge( newfrm, maxtoc + 1, (VACANT_VERTEX_LABEL, forward_min_evlb[0], forward_min_evlb[1])) ) idx = len(dfs_code_min) - 1 if self._DFScode[idx] != dfs_code_min[idx]: return False return project_is_min(forward_root[forward_min_evlb]) res = project_is_min(root[min_vevlb]) return res def _subgraph_mining(self, projected): gid_subsets = self._get_gid_subsets(projected) if self._task.prune(gid_subsets): return if not self._is_min(): return self._task.store(repr(self._DFScode), gid_subsets) num_vertices = self._DFScode.get_num_vertices() self._DFScode.build_rmpath() rmpath = self._DFScode.rmpath maxtoc = self._DFScode[rmpath[0]].to min_vlb = self._DFScode[0].vevlb[0] forward_root = collections.defaultdict(Projected) backward_root = collections.defaultdict(Projected) for p in projected: g = self._database._graphs[p.gid] history = History(g, p) # backward for rmpath_i in rmpath[::-1]: e = self._get_backward_edge(g, history.edges[rmpath_i], history.edges[rmpath[0]], history) if e is not None: backward_root[ (self._DFScode[rmpath_i].frm, e.elb) ].append(PDFS(g.gid, e, p)) # pure forward if num_vertices >= self._max_num_vertices: continue edges = self._get_forward_pure_edges(g, history.edges[rmpath[0]], min_vlb, history) for e in edges: forward_root[ (maxtoc, e.elb, g.vertices[e.to].vlb) ].append(PDFS(g.gid, e, p)) # rmpath forward for rmpath_i in rmpath: edges = self._get_forward_rmpath_edges(g, history.edges[rmpath_i], min_vlb, history) for e in edges: forward_root[ (self._DFScode[rmpath_i].frm, e.elb, g.vertices[e.to].vlb) ].append(PDFS(g.gid, e, p)) # backward for to, elb in backward_root: self._DFScode.append(DFSedge( maxtoc, to, (VACANT_VERTEX_LABEL, elb, VACANT_VERTEX_LABEL)) ) self._subgraph_mining(backward_root[(to, elb)]) self._DFScode.pop() # forward # No need to check if num_vertices >= self._max_num_vertices. # Because forward_root has no element. for frm, elb, vlb2 in forward_root: self._DFScode.append(DFSedge( frm, maxtoc + 1, (VACANT_VERTEX_LABEL, elb, vlb2)) ) self._subgraph_mining(forward_root[(frm, elb, vlb2)]) self._DFScode.pop() return self