Source code for bhopengraph.OpenGraph

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name          : OpenGraph.py
# Author             : Remi Gascou (@podalirius_)
# Date created       : 12 Aug 2025

import json
from typing import Dict, List, Optional, Set

from bhopengraph.Edge import Edge
from bhopengraph.Node import Node


[docs] class OpenGraph(object): """ OpenGraph class for managing a graph structure compatible with BloodHound OpenGraph. Follows BloodHound OpenGraph schema requirements and best practices. Sources: - https://bloodhound.specterops.io/opengraph/schema#opengraph - https://bloodhound.specterops.io/opengraph/schema#minimal-working-json - https://bloodhound.specterops.io/opengraph/best-practices """
[docs] def __init__(self, source_kind: str = None): """ Initialize an OpenGraph. Args: - source_kind (str): Optional source kind for all nodes in the graph """ self.nodes: Dict[str, Node] = {} self.edges: Dict[str, Edge] = {} self.source_kind = source_kind
# Edges methods @staticmethod def _edge_key(edge: Edge) -> str: """ Generate a unique key for an edge based on start_node, end_node, and kind. Args: - edge (Edge): Edge to generate key for Returns: - str: Unique key for the edge """ return f"{edge.start_node}|{edge.end_node}|{edge.kind}"
[docs] def add_edge(self, edge: Edge) -> bool: """ Add an edge to the graph if it doesn't already exist and if the start and end nodes exist. Args: - edge (Edge): Edge to add Returns: - bool: True if edge was added, False if start or end node doesn't exist """ if edge.start_node not in self.nodes: return False if edge.end_node not in self.nodes: return False edge_key = self._edge_key(edge) if edge_key in self.edges: return False self.edges[edge_key] = edge return True
[docs] def add_edges(self, edges: List[Edge]) -> bool: """ Add a list of edges to the graph. Returns: - bool: True if all edges were added successfully, False if any failed """ success = True for edge in edges: if not self.add_edge(edge): success = False return success
[docs] def add_edge_without_validation(self, edge: Edge) -> bool: """ Add an edge to the graph. If an edge with the same key already exists, it will be overwritten. Args: - edge (Edge): Edge to add Returns: - bool: True if edge was added, False if edge is invalid """ if not isinstance(edge, Edge): return False edge_key = self._edge_key(edge) self.edges[edge_key] = edge return True
[docs] def add_edges_without_validation(self, edges: List[Edge]) -> bool: """ Add a list of edges to the graph without validation. Args: - edges (List[Edge]): List of edges to add Returns: - bool: True if edges were added successfully """ if not isinstance(edges, list): return False for edge in edges: self.add_edge_without_validation(edge) return True
[docs] def get_edges_by_kind(self, kind: str) -> List[Edge]: """ Get all edges of a specific kind. Args: - kind (str): Kind/type to filter by Returns: - List[Edge]: List of edges with the specified kind """ return [edge for edge in self.edges.values() if edge.kind == kind]
[docs] def get_edges_from_node(self, node_id: str) -> List[Edge]: """ Get all edges starting from a specific node. Args: - node_id (str): ID of the source node Returns: - List[Edge]: List of edges starting from the specified node """ return [edge for edge in self.edges.values() if edge.start_node == node_id]
[docs] def get_edges_to_node(self, node_id: str) -> List[Edge]: """ Get all edges ending at a specific node. Args: - node_id (str): ID of the destination node Returns: - List[Edge]: List of edges ending at the specified node """ return [edge for edge in self.edges.values() if edge.end_node == node_id]
[docs] def get_isolated_edges(self) -> List[Edge]: """ Get all edges that have no start or end node. These are edges that are not connected to any other nodes in the graph. Returns: - List[Edge]: List of edges with no start or end node """ return [ edge for edge in self.edges.values() if edge.start_node not in self.nodes or edge.end_node not in self.nodes ]
[docs] def get_isolated_edges_count(self) -> int: """ Get the total number of Isolated edges in the graph. These are edges that are not connected to any other nodes in the graph. Returns: - int: Number of Isolated edges """ return len(self.get_isolated_edges())
[docs] def get_edge_count(self) -> int: """ Get the total number of edges in the graph. Returns: - int: Number of edges """ return len(self.edges)
# Nodes methods
[docs] def add_node(self, node: Node) -> bool: """ Add a node to the graph. Args: - node (Node): Node to add Returns: - bool: True if node was added, False if node with same ID already exists """ if node.id in self.nodes: return False # Add source_kind to node kinds if specified if self.source_kind and self.source_kind not in node.kinds: node.add_kind(self.source_kind) self.nodes[node.id] = node return True
[docs] def add_nodes(self, nodes: List[Node]) -> bool: """ Add a list of nodes to the graph. """ for node in nodes: self.add_node(node) return True
[docs] def add_node_without_validation(self, node: Node) -> bool: """ Add a node to the graph without validation. Args: - node (Node): Node to add Returns: - bool: True if node was added, False if node is invalid """ if not isinstance(node, Node): return False self.nodes[node.id] = node return True
[docs] def add_nodes_without_validation(self, nodes: List[Node]) -> bool: """ Add a list of nodes to the graph without validation. Args: - nodes (List[Node]): List of nodes to add Returns: - bool: True if nodes were added successfully """ if not isinstance(nodes, list): return False for node in nodes: self.add_node_without_validation(node) return True
[docs] def get_node_by_id(self, id: str) -> Optional[Node]: """ Get a node by ID. Args: - id (str): ID of the node to retrieve Returns: - Node: The node if found, None otherwise """ return self.nodes.get(id)
[docs] def get_nodes_by_kind(self, kind: str) -> List[Node]: """ Get all nodes of a specific kind. Args: - kind (str): Kind/type to filter by Returns: - List[Node]: List of nodes with the specified kind """ return [node for node in self.nodes.values() if node.has_kind(kind)]
[docs] def get_node_count(self) -> int: """ Get the total number of nodes in the graph. Returns: - int: Number of nodes """ return len(self.nodes.keys())
[docs] def get_isolated_nodes(self) -> List[Node]: """ Get all nodes that have no edges. These are nodes that are not connected to any other nodes in the graph. Returns: - List[Node]: List of nodes with no edges """ return [ node for node in self.nodes.values() if not self.get_edges_from_node(node.id) and not self.get_edges_to_node(node.id) ]
[docs] def get_isolated_nodes_count(self) -> int: """ Get the total number of Isolated nodes in the graph. These are nodes that are not connected to any other nodes in the graph. Returns: - int: Number of Isolated nodes """ return len(self.get_isolated_nodes())
[docs] def remove_node_by_id(self, id: str) -> bool: """ Remove a node and all its associated edges from the graph. Args: - id (str): ID of the node to remove Returns: - bool: True if node was removed, False if node doesn't exist """ if id not in self.nodes: return False # Remove the node del self.nodes[id] # Remove all edges that reference this node edges_to_remove = [ key for key, edge in self.edges.items() if edge.start_node == id or edge.end_node == id ] for key in edges_to_remove: del self.edges[key] return True
# Paths methods
[docs] def find_paths( self, start_id: str, end_id: str, max_depth: int = 10 ) -> List[List[str]]: """ Find all paths between two nodes using BFS. Args: - start_id (str): Starting node ID - end_id (str): Target node ID - max_depth (int): Maximum path length to search Returns: - List[List[str]]: List of paths, where each path is a list of node IDs """ if start_id not in self.nodes or end_id not in self.nodes: return [] if start_id == end_id: return [[start_id]] paths = [] queue = [(start_id, [start_id])] while queue and len(queue[0][1]) <= max_depth: current_id, path = queue.pop(0) current_depth = len(path) # Only explore if we haven't reached max depth if current_depth >= max_depth: continue for edge in self.get_edges_from_node(current_id): next_id = edge.end_node # Check if next_id is not already in the current path (prevents cycles) if next_id not in path: new_path = path + [next_id] if next_id == end_id: paths.append(new_path) else: queue.append((next_id, new_path)) return paths
[docs] def get_connected_components(self) -> List[Set[str]]: """ Find all connected components in the graph. Returns: - List[Set[str]]: List of connected component sets """ visited = set() components = [] for node_id in self.nodes: if node_id not in visited: component = set() stack = [node_id] while stack: current = stack.pop() if current not in visited: visited.add(current) component.add(current) # Add all adjacent nodes for edge in self.get_edges_from_node(current): if edge.end_node not in visited: stack.append(edge.end_node) for edge in self.get_edges_to_node(current): if edge.start_node not in visited: stack.append(edge.start_node) components.append(component) return components
[docs] def validate_graph(self) -> tuple[bool, list[str]]: """ Validate the graph for common issues including node and edge validation. Validates: - All nodes using their individual validate() methods - All edges using their individual validate() methods - Graph structure issues (isolated nodes/edges) Returns: - tuple[bool, list[str]]: (is_valid, list_of_errors) """ errors = [] # Validate all nodes for node_id, node in self.nodes.items(): is_node_valid, node_errors = node.validate() if not is_node_valid: for error in node_errors: errors.append(f"Node '{node_id}': {error}") # Validate all edges for edge_key, edge in self.edges.items(): is_edge_valid, edge_errors = edge.validate() if not is_edge_valid: for error in edge_errors: errors.append( f"Edge {edge_key} ({edge.start_node}->{edge.end_node}): {error}" ) # Check for graph structure issues # Pre-compute edge mappings for O(1) lookups start_node_edges = {} end_node_edges = {} # Build edge mappings and check for isolated edges for edge_key, edge in self.edges.items(): # Check for isolated edges (edges referencing non-existent nodes) if edge.start_node not in self.nodes: errors.append( f"Edge {edge_key} ({edge.start_node}->{edge.end_node}): Start node '{edge.start_node}' does not exist" ) else: # Build start node mapping if edge.start_node not in start_node_edges: start_node_edges[edge.start_node] = [] start_node_edges[edge.start_node].append(edge) if edge.end_node not in self.nodes: errors.append( f"Edge {edge_key} ({edge.start_node}->{edge.end_node}): End node '{edge.end_node}' does not exist" ) else: # Build end node mapping if edge.end_node not in end_node_edges: end_node_edges[edge.end_node] = [] end_node_edges[edge.end_node].append(edge) # Check for isolated nodes using pre-computed mappings for node_id in self.nodes: # O(1) lookup instead of O(m) scan has_outgoing = node_id in start_node_edges has_incoming = node_id in end_node_edges if not has_outgoing and not has_incoming: errors.append( f"Node '{node_id}' is isolated (no incoming or outgoing edges)" ) return len(errors) == 0, errors
# Export methods
[docs] def export_json( self, include_metadata: bool = True, indent: None | int = None ) -> str: """ Export the graph to JSON format compatible with BloodHound OpenGraph. Args: - include_metadata (bool): Whether to include metadata in the export Returns: - str: JSON string representation of the graph """ graph_data = { "graph": { "nodes": [node.to_dict() for node in self.nodes.values()], "edges": [edge.to_dict() for edge in self.edges.values()], } } if include_metadata and self.source_kind: graph_data["metadata"] = {"source_kind": self.source_kind} return json.dumps(graph_data, indent=indent)
[docs] def export_to_file( self, filename: str, include_metadata: bool = True, indent: None | int = None ) -> bool: """ Export the graph to a JSON file. Args: - filename (str): Name of the file to write - include_metadata (bool): Whether to include metadata in the export Returns: - bool: True if export was successful, False otherwise """ try: json_data = self.export_json(include_metadata, indent) with open(filename, "w") as f: f.write(json_data) return True except (IOError, OSError, TypeError): return False
[docs] def export_to_dict(self) -> Dict: """ Export the graph to a dictionary. """ return { "graph": { "nodes": [node.to_dict() for node in self.nodes.values()], "edges": [edge.to_dict() for edge in self.edges.values()], }, "metadata": { "source_kind": (self.source_kind if self.source_kind else None) }, }
# Import methods
[docs] def import_from_json(self, json_data: str) -> bool: """ Load graph data from a JSON string. """ return self.import_from_dict(json.loads(json_data))
[docs] def import_from_file(self, filename: str) -> bool: """ Load graph data from a JSON file. Args: - filename (str): Name of the file to read Returns: - bool: True if load was successful, False otherwise """ try: with open(filename, "r") as f: data = json.load(f) return self.import_from_dict(data) except (IOError, OSError, json.JSONDecodeError): return False
[docs] def import_from_dict(self, data: Dict) -> bool: """ Load graph data from a dictionary (typically from JSON). Args: - data (Dict): Dictionary containing graph data Returns: - bool: True if load was successful, False otherwise """ try: if "graph" not in data: return False graph_data = data["graph"] # Load nodes if "nodes" in graph_data: for node_data in graph_data["nodes"]: node = Node.from_dict(node_data) if node: self.nodes[node.id] = node # Load edges if "edges" in graph_data: for edge_data in graph_data["edges"]: edge = Edge.from_dict(edge_data) if edge: edge_key = self._edge_key(edge) self.edges[edge_key] = edge # Load metadata if "metadata" in data and "source_kind" in data["metadata"]: self.source_kind = data["metadata"]["source_kind"] return True except (KeyError, TypeError, ValueError): return False
# Other methods
[docs] def clear(self) -> None: """ Clear all nodes and edges from the graph. """ self.nodes.clear() self.edges.clear()
[docs] def __len__(self) -> int: """ Return the total number of nodes and edges. Returns: - int: Total number of nodes and edges """ return len(self.nodes) + len(self.edges)
def __repr__(self) -> str: return f"OpenGraph(nodes={len(self.nodes)}, edges={len(self.edges)}, source_kind='{self.source_kind}')"