Source code for bhopengraph.Edge

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name          : Edge.py
# Author             : Remi Gascou (@podalirius_)
# Date created       : 12 Aug 2025

from bhopengraph.Properties import Properties

# https://bloodhound.specterops.io/opengraph/schema#edge-json
EDGE_SCHEMA = {
    "title": "Generic Ingest Edge",
    "description": "Defines an edge between two nodes in a generic graph ingestion system. Each edge specifies a start and end node using either a unique identifier (id) or a name-based lookup. A kind is required to indicate the relationship type. Optional properties may include custom attributes. You may optionally constrain the start or end node to a specific kind using the kind field inside each reference.",
    "type": "object",
    "properties": {
        "start": {
            "type": "object",
            "properties": {
                "match_by": {
                    "type": "string",
                    "enum": ["id", "name"],
                    "default": "id",
                    "description": "Whether to match the start node by its unique object ID or by its name property.",
                },
                "value": {
                    "type": "string",
                    "description": "The value used for matching — either an object ID or a name, depending on match_by.",
                },
                "kind": {
                    "type": "string",
                    "description": "Optional kind filter; the referenced node must have this kind.",
                },
            },
            "required": ["value"],
        },
        "end": {
            "type": "object",
            "properties": {
                "match_by": {
                    "type": "string",
                    "enum": ["id", "name"],
                    "default": "id",
                    "description": "Whether to match the end node by its unique object ID or by its name property.",
                },
                "value": {
                    "type": "string",
                    "description": "The value used for matching — either an object ID or a name, depending on match_by.",
                },
                "kind": {
                    "type": "string",
                    "description": "Optional kind filter; the referenced node must have this kind.",
                },
            },
            "required": ["value"],
        },
        "kind": {"type": "string"},
        "properties": {
            "type": ["object", "null"],
            "description": "A key-value map of edge attributes. Values must not be objects. If a value is an array, it must contain only primitive types (e.g., strings, numbers, booleans) and must be homogeneous (all items must be of the same type).",
            "additionalProperties": {
                "type": ["string", "number", "boolean", "array"],
                "items": {"not": {"type": "object"}},
            },
        },
    },
    "required": ["start", "end", "kind"],
    "examples": [
        {
            "start": {"match_by": "id", "value": "user-1234"},
            "end": {"match_by": "id", "value": "server-5678"},
            "kind": "HasSession",
            "properties": {"timestamp": "2025-04-16T12:00:00Z", "duration_minutes": 45},
        },
        {
            "start": {"match_by": "name", "value": "alice", "kind": "User"},
            "end": {"match_by": "name", "value": "file-server-1", "kind": "Server"},
            "kind": "AccessedResource",
            "properties": {"via": "SMB", "sensitive": True},
        },
        {
            "start": {"value": "admin-1"},
            "end": {"value": "domain-controller-9"},
            "kind": "AdminTo",
            "properties": {"reason": "elevated_permissions", "confirmed": False},
        },
        {
            "start": {"match_by": "name", "value": "Printer-007"},
            "end": {"match_by": "id", "value": "network-42"},
            "kind": "ConnectedTo",
            "properties": None,
        },
    ],
}


[docs] class Edge(object): """ Edge class representing a directed edge in the OpenGraph. Follows BloodHound OpenGraph schema requirements with start/end nodes, kind, and properties. All edges are directed and one-way as per BloodHound requirements. Sources: - https://bloodhound.specterops.io/opengraph/schema#edges - https://bloodhound.specterops.io/opengraph/schema#minimal-working-json """
[docs] def __init__( self, start_node: str, end_node: str, kind: str, properties: Properties = None, start_match_by: str = "id", end_match_by: str = "id", ): """ Initialize an Edge. Args: - start_node (str): ID of the source node - end_node (str): ID of the destination node - kind (str): Type/class of the edge relationship - properties (Properties): Edge properties """ if not start_node: raise ValueError("Start node ID cannot be empty") if not end_node: raise ValueError("End node ID cannot be empty") if not kind: raise ValueError("Edge kind cannot be empty") self.start_node = start_node self.end_node = end_node self.kind = kind self.properties = properties or Properties() self.start_match_by = start_match_by self.end_match_by = end_match_by
[docs] def set_property(self, key: str, value): """ Set a property on the edge. Args: - key (str): Property name - value: Property value """ self.properties[key] = value
[docs] def get_property(self, key: str, default=None): """ Get a property from the edge. Args: - key (str): Property name - default: Default value if property doesn't exist Returns: - Property value or default """ return self.properties.get_property(key, default)
[docs] def remove_property(self, key: str): """ Remove a property from the edge. Args: - key (str): Property name to remove """ self.properties.remove_property(key)
[docs] def to_dict(self) -> dict: """ Convert edge to dictionary for JSON serialization. Returns: - dict: Edge as dictionary following BloodHound OpenGraph schema """ edge_dict = { "kind": self.kind, "start": {"value": self.start_node, "match_by": self.start_match_by}, "end": {"value": self.end_node, "match_by": self.end_match_by}, } # Only include properties if they exist and are not empty if self.properties and len(self.properties) > 0: edge_dict["properties"] = self.properties.to_dict() return edge_dict
[docs] @classmethod def from_dict(cls, edge_data: dict): """ Create an Edge instance from a dictionary. Args: - edge_data (dict): Dictionary containing edge data Returns: - Edge: Edge instance or None if data is invalid """ try: if "kind" not in edge_data: return None kind = edge_data["kind"] # Handle different edge data formats start_node = None end_node = None start_match_by = None end_match_by = None if "start" in edge_data and "end" in edge_data: # Direct format: {"start": "id", "end": "id"} start_node = edge_data["start"]["value"] start_match_by = edge_data["start"]["match_by"] end_node = edge_data["end"]["value"] end_match_by = edge_data["end"]["match_by"] if not start_node or not end_node: return None properties_data = edge_data.get("properties", {}) # Create Properties instance if properties data exists properties = None if properties_data: properties = Properties() for key, value in properties_data.items(): properties[key] = value return cls( start_node, end_node, kind, properties, start_match_by, end_match_by ) except (KeyError, TypeError, ValueError): return None
[docs] def get_start_node(self) -> str: """ Get the start node ID. Returns: - str: Start node ID """ return self.start_node
[docs] def get_end_node(self) -> str: """ Get the end node ID. Returns: - str: End node ID """ return self.end_node
[docs] def get_kind(self) -> str: """ Get the edge kind/type. Returns: - str: Edge kind """ return self.kind
[docs] def get_unique_id(self) -> str: """ Get a unique ID for the edge. Returns: - str: Unique ID for the edge """ return f"[{self.start_match_by}:{self.start_node}]-({self.kind})->[{self.end_match_by}:{self.end_node}]"
[docs] def __eq__(self, other): """ Check if two edges are equal based on their start, end, and kind. Args: - other (Edge): The other edge to compare to Returns: - bool: True if the edges are equal, False otherwise """ if isinstance(other, Edge): return ( self.start_node == other.start_node and self.end_node == other.end_node and self.kind == other.kind ) return False
[docs] def __hash__(self): """ Hash based on start, end, and kind for use in sets and as dictionary keys. Returns: - int: Hash of the start, end, and kind """ return hash((self.start_node, self.end_node, self.kind))
[docs] def validate(self) -> tuple[bool, list[str]]: """ Validate the edge against the EDGE_SCHEMA. Returns: - tuple[bool, list[str]]: (is_valid, list_of_errors) """ errors = [] # Validate required fields if not self.start_node or self.start_node is None: errors.append("Start node cannot be empty") elif not isinstance(self.start_node, str): errors.append("Start node must be a string") if not self.end_node or self.end_node is None: errors.append("End node cannot be empty") elif not isinstance(self.end_node, str): errors.append("End node must be a string") if not self.kind or self.kind is None: errors.append("Edge kind cannot be empty") elif not isinstance(self.kind, str): errors.append("Edge kind must be a string") # Validate match_by values if not isinstance(self.start_match_by, str): errors.append("Start match_by must be a string") elif self.start_match_by not in ["id", "name"]: errors.append("Start match_by must be either 'id' or 'name'") if not isinstance(self.end_match_by, str): errors.append("End match_by must be a string") elif self.end_match_by not in ["id", "name"]: errors.append("End match_by must be either 'id' or 'name'") # Validate properties if they exist if self.properties is not None: if not isinstance(self.properties, Properties): errors.append("Properties must be a Properties instance") else: is_props_valid, prop_errors = self.properties.validate() if not is_props_valid: errors.extend(prop_errors) return len(errors) == 0, errors
def __repr__(self) -> str: return f"Edge(start='{self.start_node}', end='{self.end_node}', kind='{self.kind}', properties={self.properties})"