Skip to content

type_bridge

type_bridge

TypeBridge - A Python ORM for TypeDB with Attribute-based API.

Attribute

Attribute(value=None)

Bases: ABC

Base class for TypeDB attributes.

Attributes in TypeDB are value types that can be owned by entities and relations.

Attribute instances can store values, allowing type-safe construction: Name("Alice") # Creates Name instance with value "Alice" Age(30) # Creates Age instance with value 30

Type name formatting

You can control how the class name is converted to TypeDB attribute name using the 'case' class variable or 'attr_name' for explicit control.

Example

class Name(String): pass # TypeDB attribute: "Name" (default CLASS_NAME)

class PersonName(String): case = TypeNameCase.SNAKE_CASE # TypeDB attribute: "person_name"

class PersonName(String): attr_name = "full_name" # Explicit override

class Age(Integer): pass

class Person(Entity): name: Name age: Age

Direct instantiation with wrapped types (best practice):

person = Person(name=Name("Alice"), age=Age(30))

Initialize attribute with a value.

Parameters:

Name Type Description Default
value Any

The value to store in this attribute instance

None
Source code in type_bridge/attribute/base.py
@abstractmethod
def __init__(self, value: Any = None):
    """Initialize attribute with a value.

    Args:
        value: The value to store in this attribute instance
    """
    self._value = value

value property

value

Get the stored value.

__init_subclass__

__init_subclass__(**kwargs)

Called when a subclass is created.

Source code in type_bridge/attribute/base.py
def __init_subclass__(cls, **kwargs):
    """Called when a subclass is created."""
    super().__init_subclass__(**kwargs)

    # Import here to avoid circular dependency
    from type_bridge.attribute.flags import (
        AttributeFlags,
        TypeNameCase,
        format_type_name,
    )

    # Determine the attribute name for this subclass
    # Priority: flags.name > attr_name > flags.case > class.case > default CLASS_NAME
    flags = getattr(cls, "flags", None)
    if isinstance(flags, AttributeFlags) and flags.name is not None:
        # flags.name has highest priority
        computed_name = flags.name
    elif cls.attr_name is not None:
        # Explicit attr_name takes precedence over formatting
        computed_name = cls.attr_name
    else:
        # Determine case formatting
        # Priority: flags.case > class.case > default CLASS_NAME
        if isinstance(flags, AttributeFlags) and flags.case is not None:
            case = flags.case
        elif cls.case is not None:
            case = cls.case
        else:
            case = TypeNameCase.CLASS_NAME

        # Apply case formatting to class name
        computed_name = format_type_name(cls.__name__, case)

    # Always set the attribute name for each new subclass (don't inherit from parent)
    # This ensures Name(String) gets _attr_name="name", not "string"
    cls._attr_name = computed_name

    # Skip validation for built-in attribute types (Boolean, Integer, String, etc.)
    # These are framework-provided and intentionally use TypeQL reserved words
    is_builtin = cls.__module__.startswith("type_bridge.attribute")

    # Validate attribute name doesn't conflict with TypeDB built-ins
    # Only validate user-defined attribute types, not framework built-ins
    if not is_builtin:
        _validate_attribute_name(cls._attr_name, cls.__name__)

__str__

__str__()

String representation returns the stored value.

Source code in type_bridge/attribute/base.py
def __str__(self) -> str:
    """String representation returns the stored value."""
    return str(self._value) if self._value is not None else ""

__repr__

__repr__()

Repr shows the attribute type and value.

Source code in type_bridge/attribute/base.py
def __repr__(self) -> str:
    """Repr shows the attribute type and value."""
    cls_name = self.__class__.__name__
    return f"{cls_name}({self._value!r})"

__eq__

__eq__(other)

Compare attribute with another attribute instance.

For strict type safety, Attribute instances do NOT compare equal to raw values. To access the raw value, use the .value property.

Examples:

Age(20) == Age(20) # True (same type, same value) Age(20) == Id(20) # False (different types!) Age(20) == 20 # False (not equal to raw value!) Age(20).value == 20 # True (access raw value explicitly)

Source code in type_bridge/attribute/base.py
def __eq__(self, other: object) -> bool:
    """Compare attribute with another attribute instance.

    For strict type safety, Attribute instances do NOT compare equal to raw values.
    To access the raw value, use the `.value` property.

    Examples:
        Age(20) == Age(20)  # True (same type, same value)
        Age(20) == Id(20)   # False (different types!)
        Age(20) == 20       # False (not equal to raw value!)
        Age(20).value == 20 # True (access raw value explicitly)
    """
    if isinstance(other, Attribute):
        # Compare two attribute instances: both type and value must match
        return type(self) is type(other) and self._value == other._value
    # Do not compare with non-Attribute objects (strict type safety)
    return False

__hash__

__hash__()

Make attribute hashable based on its type and value.

Source code in type_bridge/attribute/base.py
def __hash__(self) -> int:
    """Make attribute hashable based on its type and value."""
    return hash((type(self), self._value))

get_attribute_name classmethod

get_attribute_name()

Get the TypeDB attribute name.

If attr_name is explicitly set, it is used as-is. Otherwise, the class name is formatted according to the case parameter. Default case is CLASS_NAME (preserves class name as-is).

Source code in type_bridge/attribute/base.py
@classmethod
def get_attribute_name(cls) -> str:
    """Get the TypeDB attribute name.

    If attr_name is explicitly set, it is used as-is.
    Otherwise, the class name is formatted according to the case parameter.
    Default case is CLASS_NAME (preserves class name as-is).
    """
    return cls._attr_name or cls.__name__

get_value_type classmethod

get_value_type()

Get the TypeDB value type.

Source code in type_bridge/attribute/base.py
@classmethod
def get_value_type(cls) -> str:
    """Get the TypeDB value type."""
    return cls.value_type

is_key classmethod

is_key()

Check if this attribute is a key.

Source code in type_bridge/attribute/base.py
@classmethod
def is_key(cls) -> bool:
    """Check if this attribute is a key."""
    return cls._is_key

is_abstract classmethod

is_abstract()

Check if this attribute is abstract.

Source code in type_bridge/attribute/base.py
@classmethod
def is_abstract(cls) -> bool:
    """Check if this attribute is abstract."""
    return cls.abstract

is_independent classmethod

is_independent()

Check if this attribute is independent (can exist without owners).

Source code in type_bridge/attribute/base.py
@classmethod
def is_independent(cls) -> bool:
    """Check if this attribute is independent (can exist without owners)."""
    return cls.independent

get_owners classmethod

get_owners()

Get all Entity/Relation classes that own this attribute.

Returns:

Type Description
set[type[TypeDBType]]

Set of model classes that define this attribute as a field.

set[type[TypeDBType]]

Does not require a database connection (static discovery).

Source code in type_bridge/attribute/base.py
@classmethod
def get_owners(cls) -> "set[type[TypeDBType]]":
    """Get all Entity/Relation classes that own this attribute.

    Returns:
        Set of model classes that define this attribute as a field.
        Does not require a database connection (static discovery).
    """
    from type_bridge.models.registry import ModelRegistry

    return ModelRegistry.get_attribute_owners(cls)

get_supertype classmethod

get_supertype()

Get the supertype if this attribute extends another.

Source code in type_bridge/attribute/base.py
@classmethod
def get_supertype(cls) -> str | None:
    """Get the supertype if this attribute extends another."""
    return cls._supertype

to_schema_definition classmethod

to_schema_definition()

Generate TypeQL schema definition for this attribute.

Includes support for TypeDB annotations: - @abstract (comes right after attribute name) - @independent (comes right after attribute name, allows standalone existence) - @range(min..max) from range_constraint ClassVar (after value type) - @regex("pattern") from regex ClassVar (after value type) - @values("a", "b", ...) from allowed_values ClassVar (after value type)

Returns:

Type Description
str

TypeQL schema definition string

Source code in type_bridge/attribute/base.py
@classmethod
def to_schema_definition(cls) -> str:
    """Generate TypeQL schema definition for this attribute.

    Includes support for TypeDB annotations:
    - @abstract (comes right after attribute name)
    - @independent (comes right after attribute name, allows standalone existence)
    - @range(min..max) from range_constraint ClassVar (after value type)
    - @regex("pattern") from regex ClassVar (after value type)
    - @values("a", "b", ...) from allowed_values ClassVar (after value type)

    Returns:
        TypeQL schema definition string
    """
    from type_bridge.typeql.annotations import format_type_annotations

    attr_name = cls.get_attribute_name()
    value_type = cls.get_value_type()

    # Build type-level annotations (@abstract, @independent come right after name)
    type_annotations = format_type_annotations(
        abstract=cls.abstract,
        independent=cls.independent,
    )

    # Build definition: attribute name [@abstract] [@independent], [sub parent,] value type;
    if type_annotations:
        annotations_str = " ".join(type_annotations)
        if cls._supertype:
            definition = f"attribute {attr_name} {annotations_str}, sub {cls._supertype}, value {value_type}"
        else:
            definition = f"attribute {attr_name} {annotations_str}, value {value_type}"
    elif cls._supertype:
        definition = f"attribute {attr_name} sub {cls._supertype}, value {value_type}"
    else:
        definition = f"attribute {attr_name}, value {value_type}"

    # Add @range annotation if range_constraint is defined (after value type)
    range_constraint = getattr(cls, "range_constraint", None)
    if range_constraint is not None:
        range_min, range_max = range_constraint
        # Format as @range(min..max), @range(min..), or @range(..max)
        min_part = range_min if range_min is not None else ""
        max_part = range_max if range_max is not None else ""
        definition += f" @range({min_part}..{max_part})"

    # Add @regex annotation if regex_pattern is defined (after value type)
    # Note: Use regex_pattern (not regex) to avoid conflict with String.regex() query method
    regex_pattern = getattr(cls, "regex_pattern", None)
    if regex_pattern is not None and isinstance(regex_pattern, str):
        # Escape any quotes in the pattern
        escaped_pattern = regex_pattern.replace('"', '\\"')
        definition += f' @regex("{escaped_pattern}")'

    # Add @values annotation if allowed_values is defined (after value type)
    allowed_values = getattr(cls, "allowed_values", None)
    if allowed_values is not None and isinstance(allowed_values, tuple):
        # Format as @values("a", "b", ...)
        values_str = ", ".join(f'"{v}"' for v in allowed_values)
        definition += f" @values({values_str})"

    return definition + ";"

gt classmethod

gt(value)

Create greater-than comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr > value

Example

Age.gt(Age(30)) # age > 30

Source code in type_bridge/attribute/base.py
@classmethod
def gt(cls, value: "Attribute") -> "ComparisonExpr":
    """Create greater-than comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr > value

    Example:
        Age.gt(Age(30))  # age > 30
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator=">", value=value)

lt classmethod

lt(value)

Create less-than comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr < value

Example

Age.lt(Age(30)) # age < 30

Source code in type_bridge/attribute/base.py
@classmethod
def lt(cls, value: "Attribute") -> "ComparisonExpr":
    """Create less-than comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr < value

    Example:
        Age.lt(Age(30))  # age < 30
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator="<", value=value)

gte classmethod

gte(value)

Create greater-than-or-equal comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr >= value

Example

Salary.gte(Salary(80000.0)) # salary >= 80000

Source code in type_bridge/attribute/base.py
@classmethod
def gte(cls, value: "Attribute") -> "ComparisonExpr":
    """Create greater-than-or-equal comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr >= value

    Example:
        Salary.gte(Salary(80000.0))  # salary >= 80000
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator=">=", value=value)

lte classmethod

lte(value)

Create less-than-or-equal comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr <= value

Example

Age.lte(Age(65)) # age <= 65

Source code in type_bridge/attribute/base.py
@classmethod
def lte(cls, value: "Attribute") -> "ComparisonExpr":
    """Create less-than-or-equal comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr <= value

    Example:
        Age.lte(Age(65))  # age <= 65
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator="<=", value=value)

eq classmethod

eq(value)

Create equality comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr == value

Example

Status.eq(Status("active")) # status == "active"

Source code in type_bridge/attribute/base.py
@classmethod
def eq(cls, value: "Attribute") -> "ComparisonExpr":
    """Create equality comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr == value

    Example:
        Status.eq(Status("active"))  # status == "active"
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator="==", value=value)

neq classmethod

neq(value)

Create not-equal comparison expression.

Parameters:

Name Type Description Default
value Attribute

Value to compare against

required

Returns:

Type Description
ComparisonExpr

ComparisonExpr for attr != value

Example

Status.neq(Status("deleted")) # status != "deleted"

Source code in type_bridge/attribute/base.py
@classmethod
def neq(cls, value: "Attribute") -> "ComparisonExpr":
    """Create not-equal comparison expression.

    Args:
        value: Value to compare against

    Returns:
        ComparisonExpr for attr != value

    Example:
        Status.neq(Status("deleted"))  # status != "deleted"
    """
    from type_bridge.expressions import ComparisonExpr

    return ComparisonExpr(attr_type=cls, operator="!=", value=value)

sum classmethod

sum()

Create sum aggregation expression.

Returns:

Type Description
AggregateExpr

AggregateExpr for sum(attr)

Example

Salary.sum() # sum of all salaries

Source code in type_bridge/attribute/base.py
@classmethod
def sum(cls) -> "AggregateExpr":
    """Create sum aggregation expression.

    Returns:
        AggregateExpr for sum(attr)

    Example:
        Salary.sum()  # sum of all salaries
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="sum")

avg classmethod

avg()

Create average (mean) aggregation expression.

Note

Automatically converts to TypeQL 'mean' function. TypeDB 3.x uses 'mean' instead of 'avg'.

Returns:

Type Description
AggregateExpr

AggregateExpr for mean(attr)

Example

Age.avg() # Generates TypeQL: mean($age)

Source code in type_bridge/attribute/base.py
@classmethod
def avg(cls) -> "AggregateExpr":
    """Create average (mean) aggregation expression.

    Note:
        Automatically converts to TypeQL 'mean' function.
        TypeDB 3.x uses 'mean' instead of 'avg'.

    Returns:
        AggregateExpr for mean(attr)

    Example:
        Age.avg()  # Generates TypeQL: mean($age)
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="mean")

max classmethod

max()

Create maximum aggregation expression.

Returns:

Type Description
AggregateExpr

AggregateExpr for max(attr)

Example

Score.max() # maximum score

Source code in type_bridge/attribute/base.py
@classmethod
def max(cls) -> "AggregateExpr":
    """Create maximum aggregation expression.

    Returns:
        AggregateExpr for max(attr)

    Example:
        Score.max()  # maximum score
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="max")

min classmethod

min()

Create minimum aggregation expression.

Returns:

Type Description
AggregateExpr

AggregateExpr for min(attr)

Example

Price.min() # minimum price

Source code in type_bridge/attribute/base.py
@classmethod
def min(cls) -> "AggregateExpr":
    """Create minimum aggregation expression.

    Returns:
        AggregateExpr for min(attr)

    Example:
        Price.min()  # minimum price
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="min")

median classmethod

median()

Create median aggregation expression.

Returns:

Type Description
AggregateExpr

AggregateExpr for median(attr)

Example

Salary.median() # median salary

Source code in type_bridge/attribute/base.py
@classmethod
def median(cls) -> "AggregateExpr":
    """Create median aggregation expression.

    Returns:
        AggregateExpr for median(attr)

    Example:
        Salary.median()  # median salary
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="median")

std classmethod

std()

Create standard deviation aggregation expression.

Returns:

Type Description
AggregateExpr

AggregateExpr for std(attr)

Example

Score.std() # standard deviation of scores

Source code in type_bridge/attribute/base.py
@classmethod
def std(cls) -> "AggregateExpr":
    """Create standard deviation aggregation expression.

    Returns:
        AggregateExpr for std(attr)

    Example:
        Score.std()  # standard deviation of scores
    """
    from type_bridge.expressions import AggregateExpr

    return AggregateExpr(attr_type=cls, function="std")

__get_pydantic_core_schema__ classmethod

__get_pydantic_core_schema__(source_type, handler)

Unified Pydantic schema generation for all attribute types.

This base implementation handles the common patterns: - Serialization: extract _value from attribute instances - Validation: wrap raw values in attribute instances - Literal type support (for types that enable it)

Subclasses can override for completely custom behavior, or override the helper methods (_pydantic_serialize, _pydantic_validate, etc.) for targeted customization.

Source code in type_bridge/attribute/base.py
@classmethod
def __get_pydantic_core_schema__(
    cls, source_type: type[Any], handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
    """Unified Pydantic schema generation for all attribute types.

    This base implementation handles the common patterns:
    - Serialization: extract _value from attribute instances
    - Validation: wrap raw values in attribute instances
    - Literal type support (for types that enable it)

    Subclasses can override for completely custom behavior, or override
    the helper methods (_pydantic_serialize, _pydantic_validate, etc.)
    for targeted customization.
    """
    # Check if this is a Literal type and the attribute supports it
    if cls._supports_literal_types() and get_origin(source_type) is Literal:
        # For Literal types, extract the raw value for constraint checking
        return core_schema.with_info_plain_validator_function(
            lambda v, _: v._value if isinstance(v, cls) else v,
            serialization=core_schema.plain_serializer_function_ser_schema(
                cls._pydantic_serialize,
                return_schema=cls._get_pydantic_return_schema(),
            ),
        )

    # Default: validate and wrap values in attribute instances
    return core_schema.with_info_plain_validator_function(
        lambda v, _: cls._pydantic_validate(v),
        serialization=core_schema.plain_serializer_function_ser_schema(
            cls._pydantic_serialize,
            return_schema=cls._get_pydantic_return_schema(),
        ),
    )

__class_getitem__ classmethod

__class_getitem__(item)

Allow generic subscription for type checking (e.g., Integer[int]).

Source code in type_bridge/attribute/base.py
@classmethod
def __class_getitem__(cls, item: object) -> type[Self]:
    """Allow generic subscription for type checking (e.g., Integer[int])."""
    return cls

build_lookup classmethod

build_lookup(lookup, value)

Build an expression for a lookup operator.

This method centralizes the logic for converting lookup names (e.g., 'gt', 'in') into TypeQL expressions. Subclasses (like String) should override this to handle type-specific lookups.

Parameters:

Name Type Description Default
lookup str

The lookup operator name (e.g., 'exact', 'gt', 'contains')

required
value Any

The value to filter by

required

Returns:

Type Description
Expression

Expression object representing the filter

Raises:

Type Description
ValueError

If the lookup operator is not supported by this attribute type

Source code in type_bridge/attribute/base.py
@classmethod
def build_lookup(cls, lookup: str, value: Any) -> "Expression":
    """Build an expression for a lookup operator.

    This method centralizes the logic for converting lookup names (e.g., 'gt', 'in')
    into TypeQL expressions. Subclasses (like String) should override this to
    handle type-specific lookups.

    Args:
        lookup: The lookup operator name (e.g., 'exact', 'gt', 'contains')
        value: The value to filter by

    Returns:
        Expression object representing the filter

    Raises:
        ValueError: If the lookup operator is not supported by this attribute type
    """
    from type_bridge.expressions import AttributeExistsExpr, BooleanExpr, Expression

    def _wrap(v: Any) -> Any:
        """Wrap raw value in attribute instance if needed."""
        if isinstance(v, cls):
            return v
        return cls(v)

    # Exact match
    if lookup in ("exact", "eq"):
        return cls.eq(_wrap(value))

    # Comparison operators
    if lookup in ("gt", "gte", "lt", "lte"):
        if not hasattr(cls, lookup):
            raise ValueError(f"Lookup '{lookup}' not supported for {cls.__name__}")
        return getattr(cls, lookup)(_wrap(value))

    # Membership test
    if lookup == "in":
        if not isinstance(value, (list, tuple, set)):
            raise ValueError(f"'{lookup}' lookup requires an iterable of values")
        values = list(value)
        if not values:
            raise ValueError(f"'{lookup}' lookup requires a non-empty iterable")

        eq_exprs: list[Expression] = [cls.eq(_wrap(v)) for v in values]
        if len(eq_exprs) == 1:
            return eq_exprs[0]
        return BooleanExpr("or", eq_exprs)

    # Null check
    if lookup == "isnull":
        if not isinstance(value, bool):
            raise ValueError(f"'{lookup}' lookup expects a boolean")
        return AttributeExistsExpr(cls, present=not value)

    raise ValueError(f"Unsupported lookup operator '{lookup}' for {cls.__name__}")

AttributeFlags dataclass

AttributeFlags(is_key=False, is_unique=False, card_min=None, card_max=None, has_explicit_card=False, name=None, case=None)

Metadata for attribute ownership and type configuration.

Represents TypeDB ownership annotations like @key, @card(min..max), @unique, and allows overriding the attribute type name with explicit name or case formatting.

Example

class Person(Entity): name: Name = Flag(Key) # @key (implies @card(1..1)) email: Email = Flag(Unique) # @unique @card(1..1) age: Optional[Age] # @card(0..1) - no Flag needed tags: list[Tag] = Flag(Card(min=2)) # @card(2..) jobs: list[Job] = Flag(Card(1, 5)) # @card(1..5)

Override attribute type name explicitly

class Name(String): flags = AttributeFlags(name="name")

Or use case formatting

class PersonName(String): flags = AttributeFlags(case=TypeNameCase.SNAKE_CASE) # -> person_name

to_typeql_annotations

to_typeql_annotations()

Convert to TypeQL annotations like @key, @card(0..5).

Rules: - @key implies @card(1..1), so never output @card with @key - @unique with @card(1..1) is redundant, so omit @card in that case - Otherwise, always output @card if cardinality is specified

Returns:

Type Description
list[str]

List of TypeQL annotation strings

Source code in type_bridge/attribute/flags.py
def to_typeql_annotations(self) -> list[str]:
    """Convert to TypeQL annotations like @key, @card(0..5).

    Rules:
    - @key implies @card(1..1), so never output @card with @key
    - @unique with @card(1..1) is redundant, so omit @card in that case
    - Otherwise, always output @card if cardinality is specified

    Returns:
        List of TypeQL annotation strings
    """
    from type_bridge.typeql.annotations import format_card_annotation

    annotations = []
    if self.is_key:
        annotations.append("@key")
    if self.is_unique:
        annotations.append("@unique")

    # Only output @card if:
    # 1. Not a @key (since @key always implies @card(1..1))
    # 2. Not (@unique with default @card(1..1))
    should_output_card = self.card_min is not None or self.card_max is not None

    if should_output_card and not self.is_key:
        # Check if it's @unique with default (1,1) - if so, omit @card
        is_default_card = self.card_min == 1 and self.card_max == 1
        if not (self.is_unique and is_default_card):
            card_annotation = format_card_annotation(self.card_min, self.card_max)
            if card_annotation:
                annotations.append(card_annotation)

    return annotations

Boolean

Boolean(value)

Bases: Attribute

Boolean attribute type that accepts bool values.

Example

class IsActive(Boolean): pass

class IsVerified(Boolean): pass

Initialize Boolean attribute with a bool value.

Parameters:

Name Type Description Default
value bool

The boolean value to store

required
Source code in type_bridge/attribute/boolean.py
def __init__(self, value: bool):
    """Initialize Boolean attribute with a bool value.

    Args:
        value: The boolean value to store
    """
    super().__init__(value)

value property

value

Get the stored boolean value.

__bool__

__bool__()

Convert to bool.

Source code in type_bridge/attribute/boolean.py
def __bool__(self) -> bool:
    """Convert to bool."""
    return bool(self.value)

Card

Card(*args, min=None, max=None)

Cardinality marker for multi-value attribute ownership.

IMPORTANT: Card() should only be used with list[Type] annotations. For optional single values, use Optional[Type] instead.

Parameters:

Name Type Description Default
min int | None

Minimum cardinality (default: None, which means unspecified)

None
max int | None

Maximum cardinality (default: None, which means unbounded)

None

Examples:

tags: list[Tag] = Flag(Card(min=2)) # @card(2..) - at least two jobs: list[Job] = Flag(Card(1, 5)) # @card(1..5) - one to five ids: list[ID] = Flag(Key, Card(min=1)) # @key @card(1..)

INCORRECT - use Optional[Type] instead:

age: Age = Flag(Card(min=0, max=1)) # ❌ Wrong!

age: Optional[Age] # ✓ Correct

Initialize cardinality marker.

Supports both positional and keyword arguments: - Card(1, 5) → min=1, max=5 - Card(min=2) → min=2, max=None (unbounded) - Card(max=5) → min=0, max=5 (defaults min to 0) - Card(min=0, max=10) → min=0, max=10

Source code in type_bridge/attribute/flags.py
def __init__(self, *args: int, min: int | None = None, max: int | None = None):
    """Initialize cardinality marker.

    Supports both positional and keyword arguments:
    - Card(1, 5) → min=1, max=5
    - Card(min=2) → min=2, max=None (unbounded)
    - Card(max=5) → min=0, max=5 (defaults min to 0)
    - Card(min=0, max=10) → min=0, max=10
    """
    self.min: int | None = None
    self.max: int | None = None
    if args:
        # Positional arguments: Card(1, 5) or Card(2)
        if len(args) == 1:
            self.min = args[0]
            self.max = max  # Use keyword arg if provided
        elif len(args) == 2:
            self.min = args[0]
            self.max = args[1]
        else:
            raise ValueError("Card accepts at most 2 positional arguments")
    else:
        # Keyword arguments only
        # If only max is specified, default min to 0
        if min is None and max is not None:
            self.min = 0
            self.max = max
        else:
            self.min = min
            self.max = max

Date

Date(value)

Bases: Attribute

Date attribute type that accepts date values (date only, no time).

This maps to TypeDB's 'date' type, which is an ISO 8601 compliant date without time information.

Range: January 1, 262144 BCE to December 31, 262142 CE

Example

from datetime import date

class PublishDate(Date): pass

class BirthDate(Date): pass

Usage with date values

published = PublishDate(date(2024, 3, 30)) birthday = BirthDate(date(1990, 5, 15))

Initialize Date attribute with a date value.

Parameters:

Name Type Description Default
value date | str

The date value to store. Can be: - datetime.date instance - str in ISO 8601 format (YYYY-MM-DD)

required
Example

from datetime import date

From date instance

publish_date = PublishDate(date(2024, 3, 30))

From ISO string

publish_date = PublishDate("2024-03-30")

Source code in type_bridge/attribute/date.py
def __init__(self, value: date_type | str):
    """Initialize Date attribute with a date value.

    Args:
        value: The date value to store. Can be:
            - datetime.date instance
            - str in ISO 8601 format (YYYY-MM-DD)

    Example:
        from datetime import date

        # From date instance
        publish_date = PublishDate(date(2024, 3, 30))

        # From ISO string
        publish_date = PublishDate("2024-03-30")
    """
    if isinstance(value, str):
        value = date_type.fromisoformat(value)
    elif isinstance(value, datetime_type):
        # If passed a datetime, extract just the date part
        value = value.date()
    super().__init__(value)

value property

value

Get the stored date value.

__add__

__add__(other)

Add a Duration to this Date.

Parameters:

Name Type Description Default
other Any

A Duration to add to this date

required

Returns:

Type Description
Date

New Date with the duration added

Example

from type_bridge import Duration d = Date(date(2024, 1, 31)) duration = Duration("P1M") result = d + duration # Date(2024-02-29)

Source code in type_bridge/attribute/date.py
def __add__(self, other: Any) -> "Date":
    """Add a Duration to this Date.

    Args:
        other: A Duration to add to this date

    Returns:
        New Date with the duration added

    Example:
        from type_bridge import Duration
        d = Date(date(2024, 1, 31))
        duration = Duration("P1M")
        result = d + duration  # Date(2024-02-29)
    """
    from type_bridge.attribute.duration import Duration

    if isinstance(other, Duration):
        new_date = self.value + other.value
        # isodate returns datetime when adding Duration to date, extract .date()
        if isinstance(new_date, datetime_type):
            new_date = new_date.date()
        return Date(new_date)
    return NotImplemented

__radd__

__radd__(other)

Reverse addition for Duration + Date.

Source code in type_bridge/attribute/date.py
def __radd__(self, other: Any) -> "Date":
    """Reverse addition for Duration + Date."""
    return self.__add__(other)

__sub__

__sub__(other)

Subtract a Duration from this Date.

Parameters:

Name Type Description Default
other Any

A Duration to subtract from this date

required

Returns:

Type Description
Date

New Date with the duration subtracted

Example

from type_bridge import Duration d = Date(date(2024, 3, 31)) duration = Duration("P1M") result = d - duration # Date(2024-02-29)

Source code in type_bridge/attribute/date.py
def __sub__(self, other: Any) -> "Date":
    """Subtract a Duration from this Date.

    Args:
        other: A Duration to subtract from this date

    Returns:
        New Date with the duration subtracted

    Example:
        from type_bridge import Duration
        d = Date(date(2024, 3, 31))
        duration = Duration("P1M")
        result = d - duration  # Date(2024-02-29)
    """
    from type_bridge.attribute.duration import Duration

    if isinstance(other, Duration):
        new_date = self.value - other.value
        if isinstance(new_date, datetime_type):
            new_date = new_date.date()
        return Date(new_date)
    return NotImplemented

DateTime

DateTime(value)

Bases: Attribute

DateTime attribute type that accepts naive datetime values.

This maps to TypeDB's 'datetime' type, which does not include timezone information.

Example

class CreatedAt(DateTime): pass

Usage with naive datetime

event = Event(created_at=CreatedAt(datetime(2024, 1, 15, 10, 30, 45)))

Convert to DateTimeTZ

aware_dt = created_at.add_timezone() # Implicit: add system timezone aware_dt_utc = created_at.add_timezone(timezone.utc) # Explicit: add UTC

Initialize DateTime attribute with a datetime value.

Parameters:

Name Type Description Default
value datetime

The datetime value to store

required
Source code in type_bridge/attribute/datetime.py
def __init__(self, value: datetime_type):
    """Initialize DateTime attribute with a datetime value.

    Args:
        value: The datetime value to store
    """
    super().__init__(value)

value property

value

Get the stored datetime value.

add_timezone

add_timezone(tz=None)

Convert DateTime to DateTimeTZ by adding timezone information.

Implicit conversion (tz=None): Add system/local timezone Explicit conversion (tz provided): Add specified timezone

Parameters:

Name Type Description Default
tz timezone | None

Optional timezone to add to the naive datetime. If None, uses system local timezone (astimezone()). If provided, uses that specific timezone.

None

Returns:

Type Description
DateTimeTZ

DateTimeTZ instance with timezone-aware datetime

Example
Implicit: add system timezone

aware = naive_dt.add_timezone()

Explicit: add UTC timezone

from datetime import timezone aware_utc = naive_dt.add_timezone(timezone.utc)

Explicit: add JST (+9) timezone

from datetime import timezone, timedelta jst = timezone(timedelta(hours=9)) aware_jst = naive_dt.add_timezone(jst)

Source code in type_bridge/attribute/datetime.py
def add_timezone(self, tz: timezone_type | None = None) -> "DateTimeTZ":
    """Convert DateTime to DateTimeTZ by adding timezone information.

    Implicit conversion (tz=None): Add system/local timezone
    Explicit conversion (tz provided): Add specified timezone

    Args:
        tz: Optional timezone to add to the naive datetime.
            If None, uses system local timezone (astimezone()).
            If provided, uses that specific timezone.

    Returns:
        DateTimeTZ instance with timezone-aware datetime

    Example:
        # Implicit: add system timezone
        aware = naive_dt.add_timezone()

        # Explicit: add UTC timezone
        from datetime import timezone
        aware_utc = naive_dt.add_timezone(timezone.utc)

        # Explicit: add JST (+9) timezone
        from datetime import timezone, timedelta
        jst = timezone(timedelta(hours=9))
        aware_jst = naive_dt.add_timezone(jst)
    """
    from type_bridge.attribute.datetimetz import DateTimeTZ

    dt_value = self.value
    if tz is None:
        # Implicit: add system timezone
        aware_dt = dt_value.astimezone()
    else:
        # Explicit: add specified timezone
        aware_dt = dt_value.replace(tzinfo=tz)

    return DateTimeTZ(aware_dt)

__add__

__add__(other)

Add a Duration to this DateTime.

Parameters:

Name Type Description Default
other Any

A Duration to add to this datetime

required

Returns:

Type Description
DateTime

New DateTime with the duration added

Example

from type_bridge import Duration dt = DateTime(datetime(2024, 1, 31, 14, 0, 0)) duration = Duration("P1M") result = dt + duration # DateTime(2024-02-28 14:00:00)

Source code in type_bridge/attribute/datetime.py
def __add__(self, other: Any) -> "DateTime":
    """Add a Duration to this DateTime.

    Args:
        other: A Duration to add to this datetime

    Returns:
        New DateTime with the duration added

    Example:
        from type_bridge import Duration
        dt = DateTime(datetime(2024, 1, 31, 14, 0, 0))
        duration = Duration("P1M")
        result = dt + duration  # DateTime(2024-02-28 14:00:00)
    """
    from type_bridge.attribute.duration import Duration

    if isinstance(other, Duration):
        # Add duration to datetime
        # Use isodate's add_duration which handles month/day arithmetic

        new_dt = self.value + other.value
        return DateTime(new_dt)
    return NotImplemented

__radd__

__radd__(other)

Reverse addition for Duration + DateTime.

Source code in type_bridge/attribute/datetime.py
def __radd__(self, other: Any) -> "DateTime":
    """Reverse addition for Duration + DateTime."""
    return self.__add__(other)

DateTimeTZ

DateTimeTZ(value)

Bases: Attribute

DateTimeTZ attribute type that accepts timezone-aware datetime values.

This maps to TypeDB's 'datetime-tz' type, which requires timezone information. The datetime must have tzinfo set (e.g., using datetime.timezone.utc or zoneinfo).

Example

from datetime import datetime, timezone

class CreatedAt(DateTimeTZ): pass

Usage with timezone

event = Event(created_at=CreatedAt(datetime(2024, 1, 15, 10, 30, 45, tzinfo=timezone.utc)))

Convert to DateTime

naive_dt = created_at.strip_timezone() # Implicit: just strip tz naive_dt_jst = created_at.strip_timezone(timezone(timedelta(hours=9))) # Explicit: convert to JST, then strip

Initialize DateTimeTZ attribute with a timezone-aware datetime value.

Parameters:

Name Type Description Default
value datetime

The timezone-aware datetime value to store

required

Raises:

Type Description
ValueError

If the datetime does not have timezone information

Source code in type_bridge/attribute/datetimetz.py
def __init__(self, value: datetime_type):
    """Initialize DateTimeTZ attribute with a timezone-aware datetime value.

    Args:
        value: The timezone-aware datetime value to store

    Raises:
        ValueError: If the datetime does not have timezone information
    """
    if value.tzinfo is None:
        raise ValueError(
            "DateTimeTZ requires timezone-aware datetime. "
            "Use DateTime for naive datetime or add tzinfo (e.g., datetime.timezone.utc)"
        )
    super().__init__(value)

value property

value

Get the stored datetime value.

strip_timezone

strip_timezone(tz=None)

Convert DateTimeTZ to DateTime by stripping timezone information.

Implicit conversion (tz=None): Just strip timezone as-is Explicit conversion (tz provided): Convert to specified timezone first, then strip

Parameters:

Name Type Description Default
tz timezone | None

Optional timezone to convert to before stripping. If None, strips timezone without conversion. If provided, converts to that timezone first.

None

Returns:

Type Description
DateTime

DateTime instance with naive datetime

Example
Implicit: strip timezone as-is

naive = dt_tz.strip_timezone()

Explicit: convert to JST (+9), then strip

from datetime import timezone, timedelta jst = timezone(timedelta(hours=9)) naive_jst = dt_tz.strip_timezone(jst)

Source code in type_bridge/attribute/datetimetz.py
def strip_timezone(self, tz: timezone_type | None = None) -> "DateTime":
    """Convert DateTimeTZ to DateTime by stripping timezone information.

    Implicit conversion (tz=None): Just strip timezone as-is
    Explicit conversion (tz provided): Convert to specified timezone first, then strip

    Args:
        tz: Optional timezone to convert to before stripping.
            If None, strips timezone without conversion.
            If provided, converts to that timezone first.

    Returns:
        DateTime instance with naive datetime

    Example:
        # Implicit: strip timezone as-is
        naive = dt_tz.strip_timezone()

        # Explicit: convert to JST (+9), then strip
        from datetime import timezone, timedelta
        jst = timezone(timedelta(hours=9))
        naive_jst = dt_tz.strip_timezone(jst)
    """
    from type_bridge.attribute.datetime import DateTime

    dt_value = self.value
    if tz is not None:
        # Explicit: convert to specified timezone first
        dt_value = dt_value.astimezone(tz)

    # Strip timezone info
    naive_dt = dt_value.replace(tzinfo=None)
    return DateTime(naive_dt)

__add__

__add__(other)

Add a Duration to this DateTimeTZ.

Parameters:

Name Type Description Default
other Any

A Duration to add to this timezone-aware datetime

required

Returns:

Type Description
DateTimeTZ

New DateTimeTZ with the duration added

Note

Duration addition respects timezone changes (DST, etc.)

Example

from type_bridge import Duration from datetime import datetime, timezone dt = DateTimeTZ(datetime(2024, 1, 31, 14, 0, 0, tzinfo=timezone.utc)) duration = Duration("P1M") result = dt + duration # DateTimeTZ(2024-02-28 14:00:00+00:00)

Source code in type_bridge/attribute/datetimetz.py
def __add__(self, other: Any) -> "DateTimeTZ":
    """Add a Duration to this DateTimeTZ.

    Args:
        other: A Duration to add to this timezone-aware datetime

    Returns:
        New DateTimeTZ with the duration added

    Note:
        Duration addition respects timezone changes (DST, etc.)

    Example:
        from type_bridge import Duration
        from datetime import datetime, timezone
        dt = DateTimeTZ(datetime(2024, 1, 31, 14, 0, 0, tzinfo=timezone.utc))
        duration = Duration("P1M")
        result = dt + duration  # DateTimeTZ(2024-02-28 14:00:00+00:00)
    """
    from type_bridge.attribute.duration import Duration

    if isinstance(other, Duration):
        # Add duration to timezone-aware datetime
        # isodate handles timezone-aware datetime + duration correctly
        new_dt = self.value + other.value
        return DateTimeTZ(new_dt)
    return NotImplemented

__radd__

__radd__(other)

Reverse addition for Duration + DateTimeTZ.

Source code in type_bridge/attribute/datetimetz.py
def __radd__(self, other: Any) -> "DateTimeTZ":
    """Reverse addition for Duration + DateTimeTZ."""
    return self.__add__(other)

Decimal

Decimal(value)

Bases: NumericAttribute

Decimal attribute type that accepts fixed-point decimal values.

This maps to TypeDB's 'decimal' type, which is a fixed-point signed decimal number with 64 bits to the left of the decimal point and 19 decimal digits of precision after the point.

Range: −2^63 to 2^63 − 10^−19 (inclusive)

Example

from decimal import Decimal as DecimalType

class AccountBalance(Decimal): pass

class Price(Decimal): pass

Usage with decimal values

balance = AccountBalance(DecimalType("1234.567890")) price = Price(DecimalType("0.02"))

Initialize Decimal attribute with a decimal value.

Parameters:

Name Type Description Default
value Decimal | str | int | float

The decimal value to store. Can be: - decimal.Decimal instance - str that can be parsed as decimal - int or float (will be converted to Decimal)

required
Example

from decimal import Decimal as DecimalType

From Decimal

balance = AccountBalance(DecimalType("123.45"))

balance = AccountBalance("123.45")

From int or float (may lose precision)

balance = AccountBalance(123.45)

Source code in type_bridge/attribute/decimal.py
def __init__(self, value: DecimalType | str | int | float):
    """Initialize Decimal attribute with a decimal value.

    Args:
        value: The decimal value to store. Can be:
            - decimal.Decimal instance
            - str that can be parsed as decimal
            - int or float (will be converted to Decimal)

    Example:
        from decimal import Decimal as DecimalType

        # From Decimal
        balance = AccountBalance(DecimalType("123.45"))

        # From string (recommended for precision)
        balance = AccountBalance("123.45")

        # From int or float (may lose precision)
        balance = AccountBalance(123.45)
    """
    if not isinstance(value, DecimalType):
        value = DecimalType(str(value))
    super().__init__(value)

value property

value

Get the stored decimal value.

Double

Double(value)

Bases: NumericAttribute

Double precision float attribute type that accepts float values.

Example

class Price(Double): pass

class Score(Double): pass

Initialize Double attribute with a float value.

Parameters:

Name Type Description Default
value float

The float value to store

required

Raises:

Type Description
ValueError

If value violates range_constraint

Source code in type_bridge/attribute/double.py
def __init__(self, value: float):
    """Initialize Double attribute with a float value.

    Args:
        value: The float value to store

    Raises:
        ValueError: If value violates range_constraint
    """
    float_value = float(value)

    # Check range constraint if defined on the class
    range_constraint = getattr(self.__class__, "range_constraint", None)
    if range_constraint is not None:
        range_min, range_max = range_constraint
        if range_min is not None:
            min_val = float(range_min)
            if float_value < min_val:
                raise ValueError(
                    f"{self.__class__.__name__} value {float_value} is below minimum {min_val}"
                )
        if range_max is not None:
            max_val = float(range_max)
            if float_value > max_val:
                raise ValueError(
                    f"{self.__class__.__name__} value {float_value} is above maximum {max_val}"
                )

    super().__init__(float_value)

value property

value

Get the stored float value.

__float__

__float__()

Convert to float.

Source code in type_bridge/attribute/double.py
def __float__(self) -> float:
    """Convert to float."""
    return float(self.value)

Duration

Duration(value)

Bases: Attribute

Duration attribute type that accepts ISO 8601 duration values.

This maps to TypeDB's 'duration' type, which represents calendar-aware time spans using months, days, and nanoseconds.

TypeDB duration format: ISO 8601 duration (e.g., P1Y2M3DT4H5M6.789S) Storage: 32-bit months, 32-bit days, 64-bit nanoseconds

Important notes: - Durations are partially ordered (P1M and P30D cannot be compared) - P1D ≠ PT24H (calendar day vs 24 hours) - P1M ≠ P30D (months vary in length) - Addition is not commutative with calendar components

Example

from datetime import timedelta

class SessionDuration(Duration): pass

class EventCadence(Duration): pass

From ISO 8601 string

cadence = EventCadence("P1M") # 1 month interval = SessionDuration("PT1H30M") # 1 hour 30 minutes

From timedelta (converted to Duration internally)

session = SessionDuration(timedelta(hours=2))

Complex duration

complex = EventCadence("P1Y2M3DT4H5M6.789S")

Initialize Duration attribute with a duration value.

Parameters:

Name Type Description Default
value str | timedelta | Duration

The duration value to store. Can be: - str: ISO 8601 duration string (e.g., "P1Y2M3DT4H5M6S") - timedelta: Python timedelta (converted to Duration) - isodate.Duration: Direct Duration object

required

Raises:

Type Description
ValueError

If duration components exceed storage limits

Example

From ISO string

duration1 = Duration("P1M") # 1 month duration2 = Duration("PT1H30M") # 1 hour 30 minutes

From timedelta

from datetime import timedelta duration3 = Duration(timedelta(hours=2, minutes=30))

Complex duration

duration4 = Duration("P1Y2M3DT4H5M6.789S")

Source code in type_bridge/attribute/duration.py
def __init__(self, value: str | timedelta | IsodateDuration):
    """Initialize Duration attribute with a duration value.

    Args:
        value: The duration value to store. Can be:
            - str: ISO 8601 duration string (e.g., "P1Y2M3DT4H5M6S")
            - timedelta: Python timedelta (converted to Duration)
            - isodate.Duration: Direct Duration object

    Raises:
        ValueError: If duration components exceed storage limits

    Example:
        # From ISO string
        duration1 = Duration("P1M")  # 1 month
        duration2 = Duration("PT1H30M")  # 1 hour 30 minutes

        # From timedelta
        from datetime import timedelta
        duration3 = Duration(timedelta(hours=2, minutes=30))

        # Complex duration
        duration4 = Duration("P1Y2M3DT4H5M6.789S")
    """
    if isinstance(value, str):
        value = isodate.parse_duration(value)
    elif isinstance(value, timedelta) and not isinstance(value, IsodateDuration):
        # Convert plain timedelta to Duration for consistent handling
        value = _timedelta_to_duration(value)

    # Validate storage limits
    if isinstance(value, IsodateDuration):
        _validate_duration_limits(value)

    super().__init__(value)

value property

value

Get the stored duration value.

Returns:

Type Description
Duration

isodate.Duration instance (zero duration if None)

to_iso8601

to_iso8601()

Convert duration to ISO 8601 string format.

Returns:

Type Description
str

ISO 8601 duration string (e.g., "P1Y2M3DT4H5M6S")

Example

duration = Duration("P1M") assert duration.to_iso8601() == "P1M"

Source code in type_bridge/attribute/duration.py
def to_iso8601(self) -> str:
    """Convert duration to ISO 8601 string format.

    Returns:
        ISO 8601 duration string (e.g., "P1Y2M3DT4H5M6S")

    Example:
        duration = Duration("P1M")
        assert duration.to_iso8601() == "P1M"
    """
    return isodate.duration_isoformat(self.value)

__add__

__add__(other)

Add two durations.

Parameters:

Name Type Description Default
other Any

Another Duration to add

required

Returns:

Type Description
Duration

New Duration with sum

Example

d1 = Duration("P1M") d2 = Duration("P15D") result = d1 + d2 # P1M15D

Source code in type_bridge/attribute/duration.py
def __add__(self, other: Any) -> "Duration":
    """Add two durations.

    Args:
        other: Another Duration to add

    Returns:
        New Duration with sum

    Example:
        d1 = Duration("P1M")
        d2 = Duration("P15D")
        result = d1 + d2  # P1M15D
    """
    if isinstance(other, Duration):
        # Both are Durations, add their components
        result = self.value + other.value
        return Duration(result)
    return NotImplemented

__radd__

__radd__(other)

Reverse addition for Duration.

Source code in type_bridge/attribute/duration.py
def __radd__(self, other: Any) -> "Duration":
    """Reverse addition for Duration."""
    return self.__add__(other)

__sub__

__sub__(other)

Subtract two durations.

Parameters:

Name Type Description Default
other Any

Another Duration to subtract

required

Returns:

Type Description
Duration

New Duration with difference

Example

d1 = Duration("P1M") d2 = Duration("P15D") result = d1 - d2 # P1M-15D

Source code in type_bridge/attribute/duration.py
def __sub__(self, other: Any) -> "Duration":
    """Subtract two durations.

    Args:
        other: Another Duration to subtract

    Returns:
        New Duration with difference

    Example:
        d1 = Duration("P1M")
        d2 = Duration("P15D")
        result = d1 - d2  # P1M-15D
    """
    if isinstance(other, Duration):
        # Both are Durations, subtract their components
        result = self.value - other.value
        return Duration(result)
    return NotImplemented

Integer

Integer(value)

Bases: NumericAttribute

Integer attribute type that accepts int values.

Example

class Age(Integer): pass

class Count(Integer): pass

With Literal for type safety

class Priority(Integer): pass

priority: Literal[1, 2, 3] | Priority

Initialize Integer attribute with an integer value.

Parameters:

Name Type Description Default
value int

The integer value to store

required

Raises:

Type Description
ValueError

If value violates range_constraint

Source code in type_bridge/attribute/integer.py
def __init__(self, value: int):
    """Initialize Integer attribute with an integer value.

    Args:
        value: The integer value to store

    Raises:
        ValueError: If value violates range_constraint
    """
    int_value = int(value)

    # Check range constraint if defined on the class
    range_constraint = getattr(self.__class__, "range_constraint", None)
    if range_constraint is not None:
        range_min, range_max = range_constraint
        if range_min is not None:
            min_val = int(range_min)
            if int_value < min_val:
                raise ValueError(
                    f"{self.__class__.__name__} value {int_value} is below minimum {min_val}"
                )
        if range_max is not None:
            max_val = int(range_max)
            if int_value > max_val:
                raise ValueError(
                    f"{self.__class__.__name__} value {int_value} is above maximum {max_val}"
                )

    super().__init__(int_value)

value property

value

Get the stored integer value.

__int__

__int__()

Convert to int.

Source code in type_bridge/attribute/integer.py
def __int__(self) -> int:
    """Convert to int."""
    return int(self.value)

String

String(value)

Bases: Attribute

String attribute type that accepts str values.

Example

class Name(String): pass

class Email(String): pass

With Literal for type safety

class Status(String): pass

status: Literal["active", "inactive"] | Status

Initialize String attribute with a string value.

Parameters:

Name Type Description Default
value str

The string value to store

required
Source code in type_bridge/attribute/string.py
def __init__(self, value: str):
    """Initialize String attribute with a string value.

    Args:
        value: The string value to store
    """
    super().__init__(value)

value property

value

Get the stored string value.

__str__

__str__()

Convert to string.

Source code in type_bridge/attribute/string.py
def __str__(self) -> str:
    """Convert to string."""
    return str(self.value)

__add__

__add__(other)

Concatenate strings.

Source code in type_bridge/attribute/string.py
def __add__(self, other: object) -> "String":
    """Concatenate strings."""
    if isinstance(other, str):
        return String(self.value + other)
    elif isinstance(other, String):
        return String(self.value + other.value)
    else:
        return NotImplemented

__radd__

__radd__(other)

Right-hand string concatenation.

Source code in type_bridge/attribute/string.py
def __radd__(self, other: object) -> "String":
    """Right-hand string concatenation."""
    if isinstance(other, str):
        return String(other + self.value)
    else:
        return NotImplemented

contains classmethod

contains(value)

Create contains string expression.

Parameters:

Name Type Description Default
value String

String value to search for

required

Returns:

Type Description
StringExpr

StringExpr for attr contains value

Example

Email.contains(Email("@company.com")) # email contains "@company.com"

Source code in type_bridge/attribute/string.py
@classmethod
def contains(cls, value: "String") -> "StringExpr":
    """Create contains string expression.

    Args:
        value: String value to search for

    Returns:
        StringExpr for attr contains value

    Example:
        Email.contains(Email("@company.com"))  # email contains "@company.com"
    """
    from type_bridge.expressions import StringExpr

    return StringExpr(attr_type=cls, operation="contains", pattern=value)

like classmethod

like(pattern)

Create regex pattern matching expression.

Parameters:

Name Type Description Default
pattern String

Regex pattern to match

required

Returns:

Type Description
StringExpr

StringExpr for attr like pattern

Example

Name.like(Name("^A.*")) # name starts with 'A'

Source code in type_bridge/attribute/string.py
@classmethod
def like(cls, pattern: "String") -> "StringExpr":
    """Create regex pattern matching expression.

    Args:
        pattern: Regex pattern to match

    Returns:
        StringExpr for attr like pattern

    Example:
        Name.like(Name("^A.*"))  # name starts with 'A'
    """
    from type_bridge.expressions import StringExpr

    return StringExpr(attr_type=cls, operation="like", pattern=pattern)

regex classmethod

regex(pattern)

Create regex pattern matching expression (alias for like).

Note

Automatically converts to TypeQL 'like' operator. Both 'like' and 'regex' perform regex pattern matching in TypeDB.

Parameters:

Name Type Description Default
pattern String

Regex pattern to match

required

Returns:

Type Description
StringExpr

StringExpr for attr like pattern

Example

Email.regex(Email(".@gmail.com")) # Generates TypeQL: $email like ".@gmail.com"

Source code in type_bridge/attribute/string.py
@classmethod
def regex(cls, pattern: "String") -> "StringExpr":
    """Create regex pattern matching expression (alias for like).

    Note:
        Automatically converts to TypeQL 'like' operator.
        Both 'like' and 'regex' perform regex pattern matching in TypeDB.

    Args:
        pattern: Regex pattern to match

    Returns:
        StringExpr for attr like pattern

    Example:
        Email.regex(Email(".*@gmail\\.com"))  # Generates TypeQL: $email like ".*@gmail\\.com"
    """
    from type_bridge.expressions import StringExpr

    return StringExpr(attr_type=cls, operation="regex", pattern=pattern)

startswith classmethod

startswith(prefix)

Create startswith string expression.

Parameters:

Name Type Description Default
prefix String

Prefix string to check for

required

Returns:

Type Description
StringExpr

StringExpr for attr like "^prefix.*"

Source code in type_bridge/attribute/string.py
@classmethod
def startswith(cls, prefix: "String") -> "StringExpr":
    """Create startswith string expression.

    Args:
        prefix: Prefix string to check for

    Returns:
        StringExpr for attr like "^prefix.*"
    """
    # Unwrap if it's an Attribute instance to get the raw string for regex construction
    # Note: Type-safe signature says "String", but we need the raw value
    raw_prefix = prefix.value if isinstance(prefix, String) else str(prefix)
    pattern = f"^{re.escape(raw_prefix)}.*"
    return cls.regex(cls(pattern))

endswith classmethod

endswith(suffix)

Create endswith string expression.

Parameters:

Name Type Description Default
suffix String

Suffix string to check for

required

Returns:

Type Description
StringExpr

StringExpr for attr like ".*suffix$"

Source code in type_bridge/attribute/string.py
@classmethod
def endswith(cls, suffix: "String") -> "StringExpr":
    """Create endswith string expression.

    Args:
        suffix: Suffix string to check for

    Returns:
        StringExpr for attr like ".*suffix$"
    """
    # Unwrap if it's an Attribute instance to get the raw string for regex construction
    raw_suffix = suffix.value if isinstance(suffix, String) else str(suffix)
    pattern = f".*{re.escape(raw_suffix)}$"
    return cls.regex(cls(pattern))

build_lookup classmethod

build_lookup(lookup, value)

Build an expression for string-specific lookups.

Overrides base method to handle contains, regex, startswith, endswith.

Source code in type_bridge/attribute/string.py
@classmethod
def build_lookup(cls, lookup: str, value: Any) -> "Expression":
    """Build an expression for string-specific lookups.

    Overrides base method to handle contains, regex, startswith, endswith.
    """
    if lookup in ("contains", "regex", "startswith", "endswith", "like"):
        # Ensure value is wrapped in String for method calls
        wrapped_val = value if isinstance(value, cls) else cls(str(value))

        if lookup == "contains":
            return cls.contains(wrapped_val)
        elif lookup == "regex" or lookup == "like":
            return cls.regex(wrapped_val)
        elif lookup == "startswith":
            return cls.startswith(wrapped_val)
        elif lookup == "endswith":
            return cls.endswith(wrapped_val)

    # Delegate to base for standard operators (eq, in, isnull, etc.)
    return super().build_lookup(lookup, value)

TypeFlags dataclass

TypeFlags(name=None, abstract=False, base=False, case=CLASS_NAME)

Metadata flags for Entity and Relation classes.

Parameters:

Name Type Description Default
name str | None

TypeDB type name (if None, uses class name with case formatting)

None
abstract bool

Whether this is an abstract type

False
base bool

Whether this is a Python base class that should not appear in TypeDB schema

False
case TypeNameCase

Case formatting for auto-generated type names (default: CLASS_NAME)

CLASS_NAME
Example

class Person(Entity): flags = TypeFlags(name="person") name: Name

class PersonName(Entity): flags = TypeFlags() # → PersonName (default CLASS_NAME) name: Name

class PersonName(Entity): flags = TypeFlags(case=TypeNameCase.SNAKE_CASE) # → person_name name: Name

class AbstractPerson(Entity): flags = TypeFlags(abstract=True) name: Name

class BaseEntity(Entity): flags = TypeFlags(base=True) # Python base class only # Children skip this in TypeDB hierarchy

Initialize TypeFlags.

Parameters:

Name Type Description Default
name str | None

TypeDB type name (if None, uses class name with case formatting)

None
abstract bool

Whether this is an abstract type

False
base bool

Whether this is a Python base class that should not appear in TypeDB schema

False
case TypeNameCase

Case formatting for auto-generated type names (default: CLASS_NAME)

CLASS_NAME
Source code in type_bridge/attribute/flags.py
def __init__(
    self,
    name: str | None = None,
    abstract: bool = False,
    base: bool = False,
    case: TypeNameCase = TypeNameCase.CLASS_NAME,
):
    """Initialize TypeFlags.

    Args:
        name: TypeDB type name (if None, uses class name with case formatting)
        abstract: Whether this is an abstract type
        base: Whether this is a Python base class that should not appear in TypeDB schema
        case: Case formatting for auto-generated type names (default: CLASS_NAME)
    """
    self.name = name
    self.abstract = abstract
    self.base = base
    self.case = case

TypeNameCase

Bases: Enum

Type name case formatting options for Entity and Relation types.

Options

LOWERCASE: Convert class name to lowercase (default) Example: PersonName → personname CLASS_NAME: Keep class name as-is (PascalCase) Example: PersonName → PersonName SNAKE_CASE: Convert class name to snake_case Example: PersonName → person_name

CrudEvent

Bases: Enum

CRUD lifecycle events.

CrudHook

Bases: Protocol

Protocol for CRUD lifecycle hooks.

Implement only the methods you need. All methods are optional — HookRunner uses hasattr / getattr to discover them.

EntityNotFoundError

Bases: NotFoundError

Raised when an entity does not exist in the database.

This exception is raised during delete or update operations when the target entity cannot be found using its @key attributes or matched attributes.

Example

try: manager.delete(nonexistent_entity) except EntityNotFoundError: print("Entity was already deleted or never existed")

HookCancelled

HookCancelled(reason='', *, event=None, hook=None)

Bases: Exception

Raise in a pre-hook to abort the operation.

Attributes:

Name Type Description
reason

Human-readable explanation.

event

The event that was cancelled (set by HookRunner).

hook

The hook instance that raised the cancellation (set by HookRunner).

Source code in type_bridge/crud/hooks.py
def __init__(
    self,
    reason: str = "",
    *,
    event: CrudEvent | None = None,
    hook: Any = None,
):
    self.reason = reason
    self.event = event
    self.hook = hook
    super().__init__(reason)

KeyAttributeError

KeyAttributeError(entity_type, operation, field_name=None, all_fields=None)

Bases: ValueError

Raised when @key attribute validation fails during update/delete.

This exception is raised when: - A @key attribute has a None value - No @key attributes are defined on the entity

Attributes:

Name Type Description
entity_type

Name of the entity class

operation

The operation that failed ("update" or "delete")

field_name

The @key field that was None (if applicable)

all_fields

List of all defined fields (when no @key exists)

Example

try: manager.update(entity_with_none_key) except KeyAttributeError as e: print(f"Key validation failed: {e}") print(f"Entity type: {e.entity_type}") print(f"Operation: {e.operation}")

Source code in type_bridge/crud/exceptions.py
def __init__(
    self,
    entity_type: str,
    operation: str,
    field_name: str | None = None,
    all_fields: list[str] | None = None,
):
    self.entity_type = entity_type
    self.operation = operation
    self.field_name = field_name
    self.all_fields = all_fields

    if field_name is not None:
        # Key attribute is None
        message = (
            f"Cannot {operation} {entity_type}: "
            f"key attribute '{field_name}' is None. "
            f"Ensure the entity has a valid '{field_name}' value "
            f"before calling {operation}()."
        )
    else:
        # No @key attributes defined
        message = (
            f"Cannot {operation} {entity_type}: no @key attributes found. "
            f"The {operation}() method requires at least one @key attribute "
            f"to identify the entity. "
            f"Defined attributes: {all_fields} (none marked as @key). "
            f"Hint: Add Flag(Key) to an attribute, e.g., `id: Id = Flag(Key)`"
        )

    super().__init__(message)

NotUniqueError

Bases: ValueError

Raised when an operation requires exactly one match but finds multiple.

This exception is raised when attempting to delete an entity without @key attributes and multiple matching records are found. Use filter().delete() for bulk deletion instead.

Example

try: manager.delete(keyless_entity) except NotUniqueError: print("Multiple entities matched - use filter().delete() for bulk deletion")

RelationNotFoundError

Bases: NotFoundError

Raised when a relation does not exist in the database.

This exception is raised during delete or update operations when the target relation cannot be found using its role players' @key attributes.

Example

try: manager.delete(nonexistent_relation) except RelationNotFoundError: print("Relation was already deleted or never existed")

TypeDBManager

TypeDBManager(connection, model_class)

Unified CRUD manager for TypeDB entities and relations.

Source code in type_bridge/crud/typedb_manager.py
def __init__(self, connection: Connection, model_class: type[T]):
    self._connection = connection
    self._executor = ConnectionExecutor(connection)
    self.model_class = model_class
    self.compiler = QueryCompiler()

    self._hook_runner = HookRunner()

    # Select strategy
    if issubclass(model_class, Entity):
        self.strategy: ModelStrategy = EntityStrategy()
    elif issubclass(model_class, Relation):
        self.strategy = RelationStrategy()
    else:
        raise TypeError(f"Unsupported model type: {model_class}")

add_hook

add_hook(hook)

Register a lifecycle hook. Returns self for chaining.

Source code in type_bridge/crud/typedb_manager.py
def add_hook(self, hook: Any) -> Self:
    """Register a lifecycle hook. Returns self for chaining."""
    self._hook_runner.add(hook)
    return self

remove_hook

remove_hook(hook)

Unregister a lifecycle hook.

Source code in type_bridge/crud/typedb_manager.py
def remove_hook(self, hook: Any) -> None:
    """Unregister a lifecycle hook."""
    self._hook_runner.remove(hook)

insert

insert(instance)

Insert a new instance and populate _iid.

For entities, uses a single roundtrip (insert + fetch combined). For relations, uses two roundtrips (insert, then fetch) because TypeDB 3.x relation inserts don't bind the variable.

Source code in type_bridge/crud/typedb_manager.py
def insert(self, instance: T) -> T:
    """Insert a new instance and populate _iid.

    For entities, uses a single roundtrip (insert + fetch combined).
    For relations, uses two roundtrips (insert, then fetch) because
    TypeDB 3.x relation inserts don't bind the variable.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound
    # after insert. Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        query_parts.append(self.compiler.compile(insert_clause))
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)
        return instance

    # Entities: Combined insert + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=False)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

    return instance

get

get(**filters)

Get instances matching filters.

Source code in type_bridge/crud/typedb_manager.py
def get(self, **filters) -> list[T]:
    """Get instances matching filters."""
    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry

    # Use descriptive variable name to avoid conflicts
    var = "$rel" if isinstance(self.strategy, RelationStrategy) else "$ent"

    # Check if this is a relation (needs special handling for role players)
    if isinstance(self.strategy, RelationStrategy):
        return self._get_relations(var, filters, [])

    # Entity path: fetch with polymorphic type resolution
    base_type = self.model_class.get_type_name()

    from type_bridge.query.ast import EntityPattern

    # Build match clause with isa! for type variable binding (enables polymorphic resolution)
    # This allows us to fetch the actual concrete type using label()
    match_clause: MatchClause = self.strategy.build_match_all(self.model_class, var, filters)

    # Modify the AST to use 'isa!' and capture type variable '$t'
    # We iterate through patterns to find the main entity pattern
    for pattern in match_clause.patterns:
        if (
            isinstance(pattern, EntityPattern)
            and pattern.variable == var
            and pattern.type_name == base_type
        ):
            # We found the main pattern.
            # Transform to: $ent isa! $t
            # And add: $t sub base_type

            pattern.type_name = "$t"
            pattern.is_strict = True

            # Add sub constraint as a new pattern
            from type_bridge.query.ast import SubTypePattern

            match_clause.patterns.append(SubTypePattern(variable="$t", parent_type=base_type))
            break

    match_str = self.compiler.compile(match_clause)

    # Build fetch clause using wildcard to get all attributes including subtype-specific ones
    fetch_clause_str = self._build_wildcard_fetch(var, include_iid=True, include_type=True)
    query = match_str + "\n" + fetch_clause_str

    results = self._execute(query, TransactionType.READ)

    # Hydrate entity instances with polymorphic type resolution
    instances = []
    for result in results:
        try:
            iid = result.pop("_iid", None)
            if isinstance(iid, dict) and "value" in iid:
                iid = iid["value"]

            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            if iid:
                object.__setattr__(instance, "_iid", iid)
            instances.append(instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e

    return instances

update

update(instance)

Update an instance in the database.

Uses the Strategy pattern to identify the instance, then updates all non-key attributes to match the current state.

Parameters:

Name Type Description Default
instance T

Instance with updated values

required

Returns:

Type Description
T

The updated instance

Source code in type_bridge/crud/typedb_manager.py
def update(self, instance: T) -> T:
    """Update an instance in the database.

    Uses the Strategy pattern to identify the instance, then updates
    all non-key attributes to match the current state.

    Args:
        instance: Instance with updated values

    Returns:
        The updated instance
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_UPDATE, self.model_class, instance)

    var = "$x"
    constraints = self.strategy.identify(instance)
    all_attrs = self.model_class.get_all_attributes()

    # Build the base match clause using AST
    from type_bridge.query.ast import EntityPattern, RelationPattern

    if issubclass(self.model_class, Entity):
        pattern = EntityPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            constraints=constraints,
        )
    else:
        pattern = RelationPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            role_players=[],
            constraints=constraints,
        )

    base_match = self.compiler.compile(MatchClause(patterns=[pattern]))
    # Remove "match\n" prefix as we'll rebuild it
    base_match_body = base_match[6:] if base_match.startswith("match\n") else base_match

    # Separate single-value and multi-value attributes
    single_value_updates: dict[str, Any] = {}
    multi_value_updates: dict[str, list[Any]] = {}
    single_value_deletes: list[str] = []

    for field_name, attr_info in all_attrs.items():
        # Skip key attributes - they identify the instance, can't be changed
        if attr_info.flags.is_key:
            continue

        value = getattr(instance, field_name, None)
        attr_name = attr_info.typ.get_attribute_name()
        is_multi = is_multi_value_attribute(attr_info.flags)

        if value is None:
            # Mark for deletion (if attribute exists)
            single_value_deletes.append(attr_name)
        elif is_multi and isinstance(value, list):
            multi_value_updates[attr_name] = value
        else:
            single_value_updates[attr_name] = value

    # Build try blocks for match clause
    try_blocks: list[str] = []

    # Add bindings for multi-value attributes with guards
    for attr_name, values in multi_value_updates.items():
        keep_literals = [format_value(v) for v in dict.fromkeys(values)]
        guard_lines = [f"not {{ ${attr_name} == {lit}; }};" for lit in keep_literals]
        try_block = "\n".join(
            [
                "try {",
                f"  {var} has {attr_name} ${attr_name};",
                *[f"  {g}" for g in guard_lines],
                "};",
            ]
        )
        try_blocks.append(try_block)

    # Add bindings for single-value updates (delete old + insert new)
    for attr_name in single_value_updates:
        try_blocks.append(f"try {{ {var} has {attr_name} $old_{attr_name}; }};")

    # Add bindings for single-value deletes
    for attr_name in single_value_deletes:
        try_blocks.append(f"try {{ {var} has {attr_name} ${attr_name}; }};")

    # Combine base match with try blocks
    if try_blocks:
        match_clause_str = base_match_body + "\n" + "\n".join(try_blocks)
    else:
        match_clause_str = base_match_body
    query_parts = [f"match\n{match_clause_str}"]

    # Build delete clause
    delete_parts = []
    for attr_name in multi_value_updates:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")
    for attr_name in single_value_updates:
        delete_parts.append(f"try {{ $old_{attr_name} of {var}; }};")
    for attr_name in single_value_deletes:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")

    if delete_parts:
        query_parts.append("delete\n" + "\n".join(delete_parts))

    # Build insert clause
    insert_parts = []
    for attr_name, values in multi_value_updates.items():
        for value in values:
            insert_parts.append(f"{var} has {attr_name} {format_value(value)};")
    for attr_name, value in single_value_updates.items():
        insert_parts.append(f"{var} has {attr_name} {format_value(value)};")

    if insert_parts:
        query_parts.append("insert\n" + "\n".join(insert_parts))

    full_query = "\n".join(query_parts)
    logger.debug(f"Update query: {full_query}")

    self._execute(full_query, TransactionType.WRITE)
    logger.info(f"Updated: {self.model_class.__name__}")

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_UPDATE, self.model_class, instance)

    return instance

delete

delete(instance)

Delete an instance and return it.

Source code in type_bridge/crud/typedb_manager.py
def delete(self, instance: T) -> T:
    """Delete an instance and return it."""
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, instance)

    var = "$x"

    # Build AST-based match clause
    patterns: list[Pattern]
    if isinstance(instance, Entity):
        patterns = [instance.get_match_pattern(var)]
    elif isinstance(instance, Relation):
        patterns = instance.get_match_patterns(var)
    else:
        raise TypeError(f"Unexpected instance type: {type(instance)}")

    match_clause = MatchClause(patterns=patterns)
    delete_clause = DeleteClause(statements=[DeleteThingStatement(variable=var)])

    # Compile and execute
    match_str = self.compiler.compile(match_clause)
    delete_str = self.compiler.compile(delete_clause)
    query = f"{match_str}\n{delete_str}"

    self._execute(query, TransactionType.WRITE)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, instance)

    return instance

all

all()

Fetch all instances of this type.

Source code in type_bridge/crud/typedb_manager.py
def all(self) -> list[T]:
    """Fetch all instances of this type."""
    return self.get()

insert_many

insert_many(instances)

Insert multiple instances in a single query (batched).

For entities, combines all inserts into a single query for efficiency. For relations, falls back to individual inserts (due to match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def insert_many(self, instances: list[T]) -> list[T]:
    """Insert multiple instances in a single query (batched).

    For entities, combines all inserts into a single query for efficiency.
    For relations, falls back to individual inserts (due to match clause complexity).
    """
    if not instances:
        return instances

    # Check if all instances are entities (can be batched)
    # Relations need individual handling due to role player match clauses
    if all(isinstance(inst, Entity) for inst in instances):
        if self._hook_runner.has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

        result = self._batch_insert_entities(instances)

        if self._hook_runner.has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.insert)
    for instance in instances:
        self.insert(instance)
    return instances

put

put(instance)

Insert or update an instance (idempotent) and populate _iid.

Uses TypeQL's PUT clause for idempotent insertion. For entities, uses a single roundtrip. For relations, uses two.

Source code in type_bridge/crud/typedb_manager.py
def put(self, instance: T) -> T:
    """Insert or update an instance (idempotent) and populate _iid.

    Uses TypeQL's PUT clause for idempotent insertion.
    For entities, uses a single roundtrip. For relations, uses two.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound.
    # Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        insert_query = self.compiler.compile(insert_clause)
        put_query = insert_query.replace("insert\n", "put\n", 1)
        query_parts.append(put_query)
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)
        return instance

    # Entities: Combined put + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=True)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

    return instance

delete_many

delete_many(instances, *, strict=False)

Delete multiple instances.

Optimized for batch deletion: instances with IIDs are deleted in a single query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

Parameters:

Name Type Description Default
instances list[T]

List of instances to delete

required
strict bool

If True, raise EntityNotFoundError if any entity doesn't exist. In strict mode, checks all entities first and raises before any deletion.

False

Returns:

Type Description
list[T]

List of actually-deleted entities (excludes those that didn't exist)

Source code in type_bridge/crud/typedb_manager.py
def delete_many(self, instances: list[T], *, strict: bool = False) -> list[T]:
    """Delete multiple instances.

    Optimized for batch deletion: instances with IIDs are deleted in a single
    query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

    Args:
        instances: List of instances to delete
        strict: If True, raise EntityNotFoundError if any entity doesn't exist.
               In strict mode, checks all entities first and raises before any deletion.

    Returns:
        List of actually-deleted entities (excludes those that didn't exist)
    """
    if not instances:
        return instances

    from type_bridge.crud.exceptions import EntityNotFoundError

    # Separate instances by whether they have IIDs
    with_iids: list[T] = []
    without_iids: list[T] = []

    for instance in instances:
        if getattr(instance, "_iid", None):
            with_iids.append(instance)
        else:
            without_iids.append(instance)

    has_hooks = self._hook_runner.has_hooks

    # For strict mode, we need to check existence before deleting
    if strict:
        # Batch check existence for instances with IIDs
        existing_iids = self._batch_check_existence_by_iid(with_iids) if with_iids else set()
        not_found_with_iid = [inst for inst in with_iids if inst._iid not in existing_iids]

        # Check existence individually for instances without IIDs
        not_found_without_iid = [inst for inst in without_iids if not self._entity_exists(inst)]

        not_found = not_found_with_iid + not_found_without_iid
        if not_found:
            names = [str(e) for e in not_found]
            raise EntityNotFoundError(f"entity(ies) not found: {names}")

        # All exist - proceed with batch delete
        deleted: list[T] = []
        if with_iids:
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(with_iids)
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(with_iids)
        for inst in without_iids:
            self.delete(inst)  # hooks fire inside self.delete()
            deleted.append(inst)
        return deleted

    # Non-strict mode: batch delete instances with IIDs, individual for others
    deleted = []

    if with_iids:
        # Batch check which ones exist
        existing_iids = self._batch_check_existence_by_iid(with_iids)
        existing_instances = [inst for inst in with_iids if inst._iid in existing_iids]

        if existing_instances:
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(existing_instances)
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(existing_instances)

    # Handle instances without IIDs individually (hooks fire via self.delete)
    for inst in without_iids:
        if self._entity_exists(inst):
            self.delete(inst)
            deleted.append(inst)

    return deleted

put_many

put_many(instances)

Put multiple instances (idempotent insert/update).

Attempts batch operation first for efficiency. If a key constraint violation occurs (some entities exist with different data), falls back to individual operations which are idempotent.

For entities, uses batch PUT when possible (N→1 roundtrips). For relations, uses individual operations (match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def put_many(self, instances: list[T]) -> list[T]:
    """Put multiple instances (idempotent insert/update).

    Attempts batch operation first for efficiency. If a key constraint
    violation occurs (some entities exist with different data), falls back
    to individual operations which are idempotent.

    For entities, uses batch PUT when possible (N→1 roundtrips).
    For relations, uses individual operations (match clause complexity).
    """
    if not instances:
        return instances

    has_hooks = self._hook_runner.has_hooks

    # Check if all instances are entities (can attempt batch)
    if all(isinstance(inst, Entity) for inst in instances):
        if has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

        try:
            result = self._batch_insert_entities(instances, use_put=True)
        except Exception as e:
            # Check if this is a key constraint violation
            error_str = str(e)
            if "unique" in error_str.lower() or "constraint" in error_str.lower():
                logger.debug(
                    f"Batch put failed with constraint violation, falling back to individual: {e}"
                )
                # Fall back to individual operations.
                # Note: pre-hooks may fire again via self.put() — acceptable
                # since the batch operation was rolled back.
                for instance in instances:
                    self.put(instance)
                return instances
            # Re-raise other errors
            raise

        if has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.put)
    for instance in instances:
        self.put(instance)
    return instances

update_many

update_many(instances)

Update multiple instances.

Source code in type_bridge/crud/typedb_manager.py
def update_many(self, instances: list[T]) -> list[T]:
    """Update multiple instances."""
    for instance in instances:
        self.update(instance)
    return instances

get_by_iid

get_by_iid(iid)

Fetch an instance by its internal ID with polymorphic type resolution.

Source code in type_bridge/crud/typedb_manager.py
def get_by_iid(self, iid: str) -> T | None:
    """Fetch an instance by its internal ID with polymorphic type resolution."""
    import re

    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry
    from type_bridge.query.ast import (
        EntityPattern,
        IidConstraint,
        MatchClause,
        SubTypePattern,
    )

    # Validate IID format (TypeDB IIDs are hexadecimal strings starting with 0x)
    # Return None for invalid IIDs (graceful handling - treat as "not found")
    if not iid or not re.match(r"^0x[0-9a-fA-F]+$", iid):
        return None

    var = "$x"
    base_type = self.model_class.get_type_name()

    if issubclass(self.model_class, Entity):
        # Entity path: Build match clause with isa! for polymorphic type resolution
        # $x isa! $t, iid <iid>; $t sub base_type;
        entity_pattern = EntityPattern(
            variable=var,
            type_name="$t",  # Type variable for polymorphic resolution
            constraints=[IidConstraint(iid=iid)],
            is_strict=True,  # Use isa! for strict type matching
        )
        subtype_pattern = SubTypePattern(variable="$t", parent_type=base_type)
        match_clause = MatchClause(patterns=[entity_pattern, subtype_pattern])
        match_str = self.compiler.compile(match_clause)

        # Build fetch clause using wildcard to get all attributes including subtype-specific ones
        # No IID needed in fetch - we already have it from input
        fetch_clause_str = self._build_wildcard_fetch(var, include_iid=False, include_type=True)
        query = match_str + "\n" + fetch_clause_str

        results = self._execute(query, TransactionType.READ)

        if not results:
            return None

        result = results[0]
        try:
            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            # Resolve concrete class
            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            object.__setattr__(instance, "_iid", iid)
            return cast(T | None, instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e
    else:
        # Relation path: Use _get_relations with IID filter
        # This properly handles role player hydration
        results = self._get_relations(var, {"_iid": iid}, [])
        if results:
            return results[0]
        return None

filter

filter(*expressions, **filters)

Create a chainable query with filters.

Parameters:

Name Type Description Default
*expressions Any

Expression objects (Person.age.gt(Age(30)), etc.)

()
**filters Any

Attribute filters (exact match) - age=30, name="Alice"

{}

Returns:

Type Description
TypeDBQuery[T]

TypeDBQuery for chaining

Raises:

Type Description
ValueError

If expressions reference attribute types not owned by the model

Source code in type_bridge/crud/typedb_manager.py
def filter(self, *expressions: Any, **filters: Any) -> TypeDBQuery[T]:
    """Create a chainable query with filters.

    Args:
        *expressions: Expression objects (Person.age.gt(Age(30)), etc.)
        **filters: Attribute filters (exact match) - age=30, name="Alice"

    Returns:
        TypeDBQuery for chaining

    Raises:
        ValueError: If expressions reference attribute types not owned by the model
    """
    # Validate expressions reference owned attribute types
    if expressions:
        owned_attrs = self.model_class.get_all_attributes()
        owned_attr_types = {attr_info.typ for attr_info in owned_attrs.values()}

        from type_bridge.expressions.role_player import RolePlayerExpr

        for expr in expressions:
            # Skip RolePlayerExpr - they reference player attributes, not relation attributes
            if isinstance(expr, RolePlayerExpr):
                continue

            # Get attribute types from expression
            expr_attr_types = expr.get_attribute_types()

            # Check if all attribute types are owned by the model
            for attr_type in expr_attr_types:
                if attr_type not in owned_attr_types:
                    raise ValueError(
                        f"{self.model_class.__name__} does not own attribute type {attr_type.__name__}. "
                        f"Available attribute types: {', '.join(t.__name__ for t in owned_attr_types)}"
                    )

    return TypeDBQuery(self, filters, list(expressions))

count

count(**filters)

Count all instances of this type, optionally filtering.

Source code in type_bridge/crud/typedb_manager.py
def count(self, **filters) -> int:
    """Count all instances of this type, optionally filtering."""
    var = "$x"
    match_clause = self.strategy.build_match_all(self.model_class, var, filters)
    match_str = self.compiler.compile(match_clause)
    reduce_str = self._build_count_reduce(var)
    query = match_str + "\n" + reduce_str

    results = self._execute(query, TransactionType.READ)
    if results and "count" in results[0]:
        count_val = results[0]["count"]
        # Handle wrapped value format from TypeDB driver
        if isinstance(count_val, dict) and "value" in count_val:
            return int(count_val["value"])
        if isinstance(count_val, (int, float)):
            return int(count_val)
        return int(str(count_val))
    return 0

group_by

group_by(*fields)

Group results by field values and compute aggregations.

Parameters:

Name Type Description Default
*fields Any

Field descriptors to group by (e.g., Person.department)

()

Returns:

Type Description
GroupByQuery[T]

GroupByQuery for chained aggregations

Example
Group by department, compute average age per department

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Returns: {
"Engineering": {"avg_age": 35.5},
"Sales":
}
Source code in type_bridge/crud/typedb_manager.py
def group_by(self, *fields: Any) -> GroupByQuery[T]:
    """Group results by field values and compute aggregations.

    Args:
        *fields: Field descriptors to group by (e.g., Person.department)

    Returns:
        GroupByQuery for chained aggregations

    Example:
        # Group by department, compute average age per department
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
        # Returns: {
        #   "Engineering": {"avg_age": 35.5},
        #   "Sales": {"avg_age": 28.3}
        # }
    """
    return GroupByQuery(self, {}, [], fields)

BreakingChangeAnalyzer

Analyzes schema diffs to classify changes by severity.

Classification rules: - SAFE: Adding new types, widening role player types - WARNING: Adding required attributes to existing types - BREAKING: Removing types, narrowing role player types, removing roles

Example

analyzer = BreakingChangeAnalyzer() diff = old_schema.compare(new_schema) changes = analyzer.analyze(diff)

for change in changes: print(f"[{change.category.value}] {change.description}") print(f" Recommendation: {change.recommendation}")

analyze

analyze(diff)

Classify all changes in the schema diff.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
list[ClassifiedChange]

List of classified changes with recommendations

Source code in type_bridge/migration/breaking.py
def analyze(self, diff: SchemaDiff) -> list[ClassifiedChange]:
    """Classify all changes in the schema diff.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        List of classified changes with recommendations
    """
    changes: list[ClassifiedChange] = []

    # Analyze entity changes
    changes.extend(self._analyze_entity_changes(diff))

    # Analyze relation changes
    changes.extend(self._analyze_relation_changes(diff))

    # Analyze attribute changes
    changes.extend(self._analyze_attribute_changes(diff))

    return changes

has_breaking_changes

has_breaking_changes(diff)

Quick check for any breaking changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
bool

True if any breaking changes exist

Source code in type_bridge/migration/breaking.py
def has_breaking_changes(self, diff: SchemaDiff) -> bool:
    """Quick check for any breaking changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        True if any breaking changes exist
    """
    classified = self.analyze(diff)
    return any(c.category == ChangeCategory.BREAKING for c in classified)

has_warnings

has_warnings(diff)

Quick check for any warning-level changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
bool

True if any warnings exist

Source code in type_bridge/migration/breaking.py
def has_warnings(self, diff: SchemaDiff) -> bool:
    """Quick check for any warning-level changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        True if any warnings exist
    """
    classified = self.analyze(diff)
    return any(c.category == ChangeCategory.WARNING for c in classified)

get_breaking_changes

get_breaking_changes(diff)

Get only breaking changes from the diff.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
list[ClassifiedChange]

List of breaking changes only

Source code in type_bridge/migration/breaking.py
def get_breaking_changes(self, diff: SchemaDiff) -> list[ClassifiedChange]:
    """Get only breaking changes from the diff.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        List of breaking changes only
    """
    return [c for c in self.analyze(diff) if c.category == ChangeCategory.BREAKING]

summary

summary(diff)

Generate a human-readable summary of classified changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
str

Formatted summary string

Source code in type_bridge/migration/breaking.py
def summary(self, diff: SchemaDiff) -> str:
    """Generate a human-readable summary of classified changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        Formatted summary string
    """
    classified = self.analyze(diff)

    if not classified:
        return "No schema changes detected."

    lines = ["Schema Change Analysis", "=" * 50]

    # Group by category
    breaking = [c for c in classified if c.category == ChangeCategory.BREAKING]
    warnings = [c for c in classified if c.category == ChangeCategory.WARNING]
    safe = [c for c in classified if c.category == ChangeCategory.SAFE]

    if breaking:
        lines.append(f"\n[BREAKING] ({len(breaking)} changes)")
        for change in breaking:
            lines.append(f"  - {change.description}")
            lines.append(f"    Recommendation: {change.recommendation}")

    if warnings:
        lines.append(f"\n[WARNING] ({len(warnings)} changes)")
        for change in warnings:
            lines.append(f"  - {change.description}")
            lines.append(f"    Recommendation: {change.recommendation}")

    if safe:
        lines.append(f"\n[SAFE] ({len(safe)} changes)")
        for change in safe:
            lines.append(f"  - {change.description}")

    return "\n".join(lines)

ChangeCategory

Bases: Enum

Classification of schema changes by severity.

SAFE class-attribute instance-attribute

SAFE = 'safe'

Backwards compatible change - no data loss or errors.

WARNING class-attribute instance-attribute

WARNING = 'warning'

May cause issues - review required.

BREAKING class-attribute instance-attribute

BREAKING = 'breaking'

Will cause data loss or errors - requires migration plan.

Migration

Base class for migration scripts.

Migrations define schema changes that can be applied to a TypeDB database. They can be either model-based (for initial migrations) or operation-based (for incremental changes).

Model-based Migration Example

class InitialMigration(Migration): dependencies = [] models = [Person, Company, Employment]

Operation-based Migration Example

class AddPhoneMigration(Migration): dependencies = [("myapp", "0001_initial")] operations = [ ops.AddAttribute(Phone), ops.AddOwnership(Person, Phone, optional=True), ]

Attributes:

Name Type Description
name str

Migration name (auto-populated from filename)

app_label str

Application label (auto-populated from directory)

dependencies list[tuple[str, str]]

List of (app_label, migration_name) tuples

models list[type[Entity | Relation]]

List of Entity/Relation classes for initial migrations

operations list[Operation]

List of Operation instances for incremental migrations

reversible bool

Whether the migration can be rolled back

get_dependencies

get_dependencies()

Get dependencies as MigrationDependency objects.

Returns:

Type Description
list[MigrationDependency]

List of MigrationDependency instances

Source code in type_bridge/migration/base.py
def get_dependencies(self) -> list[MigrationDependency]:
    """Get dependencies as MigrationDependency objects.

    Returns:
        List of MigrationDependency instances
    """
    return [MigrationDependency(app, name) for app, name in self.dependencies]

describe

describe()

Generate a human-readable description of this migration.

Returns:

Type Description
str

Description string

Source code in type_bridge/migration/base.py
def describe(self) -> str:
    """Generate a human-readable description of this migration.

    Returns:
        Description string
    """
    if self.models:
        model_names = [m.__name__ for m in self.models]
        return f"Initial migration with models: {', '.join(model_names)}"
    elif self.operations:
        op_count = len(self.operations)
        return f"Migration with {op_count} operation(s)"
    else:
        return "Empty migration"

MigrationError

Bases: Exception

Error during migration execution.

MigrationExecutor

MigrationExecutor(db, migrations_dir, dry_run=False)

Executes migrations against a TypeDB database.

Handles: - Applying pending migrations - Rolling back applied migrations - Previewing migration TypeQL - Listing migration status

Example

executor = MigrationExecutor(db, Path("migrations"))

Apply all pending migrations

results = executor.migrate()

Migrate to specific version

results = executor.migrate(target="0002_add_company")

Show migration status

status = executor.showmigrations() for name, is_applied in status: print(f"[{'X' if is_applied else ' '}] {name}")

Preview TypeQL

typeql = executor.sqlmigrate("0002_add_company") print(typeql)

Initialize executor.

Parameters:

Name Type Description Default
db Database

Database connection

required
migrations_dir Path

Directory containing migration files

required
dry_run bool

If True, preview operations without executing

False
Source code in type_bridge/migration/executor.py
def __init__(
    self,
    db: Database,
    migrations_dir: Path,
    dry_run: bool = False,
):
    """Initialize executor.

    Args:
        db: Database connection
        migrations_dir: Directory containing migration files
        dry_run: If True, preview operations without executing
    """
    self.db = db
    self.migrations_dir = migrations_dir
    self.dry_run = dry_run
    self.loader = MigrationLoader(migrations_dir)
    self.state_manager = MigrationStateManager(db)

migrate

migrate(target=None)

Apply pending migrations.

Parameters:

Name Type Description Default
target str | None

Optional target migration name (e.g., "0002_add_company") If None, apply all pending migrations. If specified, migrate to that exact state (may rollback).

None

Returns:

Type Description
list[MigrationResult]

List of migration results

Raises:

Type Description
MigrationError

If migration fails

Source code in type_bridge/migration/executor.py
def migrate(self, target: str | None = None) -> list[MigrationResult]:
    """Apply pending migrations.

    Args:
        target: Optional target migration name (e.g., "0002_add_company")
               If None, apply all pending migrations.
               If specified, migrate to that exact state (may rollback).

    Returns:
        List of migration results

    Raises:
        MigrationError: If migration fails
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()
    plan = self._create_plan(state, all_migrations, target)

    if plan.is_empty():
        logger.info("No migrations to apply")
        return []

    results: list[MigrationResult] = []

    # Rollback if needed (migrating backwards)
    for loaded in plan.to_rollback:
        result = self._rollback_one(loaded)
        results.append(result)
        if not result.success:
            raise MigrationError(f"Rollback failed: {result.error}")

    # Apply forward migrations
    for loaded in plan.to_apply:
        result = self._apply_one(loaded)
        results.append(result)
        if not result.success:
            raise MigrationError(f"Migration failed: {result.error}")

    return results

showmigrations

showmigrations()

List all migrations with their applied status.

Returns:

Type Description
list[tuple[str, bool]]

List of (migration_name, is_applied) tuples

Source code in type_bridge/migration/executor.py
def showmigrations(self) -> list[tuple[str, bool]]:
    """List all migrations with their applied status.

    Returns:
        List of (migration_name, is_applied) tuples
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()

    result: list[tuple[str, bool]] = []
    for loaded in all_migrations:
        is_applied = state.is_applied(loaded.migration.app_label, loaded.migration.name)
        result.append((loaded.migration.name, is_applied))

    return result

sqlmigrate

sqlmigrate(migration_name, reverse=False)

Preview TypeQL for a migration without executing.

Parameters:

Name Type Description Default
migration_name str

Name of the migration

required
reverse bool

If True, show rollback TypeQL

False

Returns:

Type Description
str

TypeQL string that would be executed

Raises:

Type Description
MigrationError

If migration not found or not reversible

Source code in type_bridge/migration/executor.py
def sqlmigrate(self, migration_name: str, reverse: bool = False) -> str:
    """Preview TypeQL for a migration without executing.

    Args:
        migration_name: Name of the migration
        reverse: If True, show rollback TypeQL

    Returns:
        TypeQL string that would be executed

    Raises:
        MigrationError: If migration not found or not reversible
    """
    loaded = self.loader.get_by_name(migration_name)
    if loaded is None:
        raise MigrationError(f"Migration not found: {migration_name}")

    if reverse:
        typeql = self._generate_rollback_typeql(loaded.migration)
        if typeql is None:
            raise MigrationError(f"Migration {migration_name} is not reversible")
        return typeql
    else:
        return self._generate_apply_typeql(loaded.migration)

plan

plan(target=None)

Get the migration plan without executing.

Parameters:

Name Type Description Default
target str | None

Optional target migration name

None

Returns:

Type Description
MigrationPlan

MigrationPlan showing what would be applied/rolled back

Source code in type_bridge/migration/executor.py
def plan(self, target: str | None = None) -> MigrationPlan:
    """Get the migration plan without executing.

    Args:
        target: Optional target migration name

    Returns:
        MigrationPlan showing what would be applied/rolled back
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()
    return self._create_plan(state, all_migrations, target)

ModelRegistry

Registry for tracking Entity/Relation models.

Models can be registered manually or auto-discovered from Python modules. The registry is used by the migration generator to determine which models should be tracked for schema changes.

Example - Manual registration

from type_bridge.migration import ModelRegistry from myapp.models import Person, Company

ModelRegistry.register(Person, Company)

Example - Auto-discovery

from type_bridge.migration import ModelRegistry

Discover all Entity/Relation classes in module

models = ModelRegistry.discover("myapp.models")

Example - In models.py (recommended pattern): from type_bridge import Entity, String, Flag, Key, TypeFlags from type_bridge.migration import ModelRegistry

class Name(String):
    pass

class Person(Entity):
    flags = TypeFlags(name="person")
    name: Name = Flag(Key)

# Register at module load time
ModelRegistry.register(Person)

register classmethod

register(*models)

Register models for migration tracking.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Entity or Relation classes to register

()
Source code in type_bridge/migration/registry.py
@classmethod
def register(cls, *models: type[Entity | Relation]) -> None:
    """Register models for migration tracking.

    Args:
        models: Entity or Relation classes to register
    """
    from type_bridge.models import Entity, Relation

    for model in models:
        if not isinstance(model, type):
            logger.warning(f"Skipping non-class: {model}")
            continue

        if not issubclass(model, (Entity, Relation)):
            logger.warning(f"Skipping {model.__name__}: not an Entity or Relation subclass")
            continue

        if model in (Entity, Relation):
            continue

        if model not in cls._models:
            cls._models.add(model)
            logger.debug(f"Registered model: {model.__name__}")

unregister classmethod

unregister(*models)

Unregister models from migration tracking.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Entity or Relation classes to unregister

()
Source code in type_bridge/migration/registry.py
@classmethod
def unregister(cls, *models: type[Entity | Relation]) -> None:
    """Unregister models from migration tracking.

    Args:
        models: Entity or Relation classes to unregister
    """
    for model in models:
        cls._models.discard(model)
        logger.debug(f"Unregistered model: {model.__name__}")

clear classmethod

clear()

Clear all registered models.

Source code in type_bridge/migration/registry.py
@classmethod
def clear(cls) -> None:
    """Clear all registered models."""
    cls._models.clear()
    logger.debug("Cleared all registered models")

get_all classmethod

get_all()

Get all registered models.

Returns:

Type Description
list[type[Entity | Relation]]

List of registered Entity/Relation classes

Source code in type_bridge/migration/registry.py
@classmethod
def get_all(cls) -> list[type[Entity | Relation]]:
    """Get all registered models.

    Returns:
        List of registered Entity/Relation classes
    """
    return list(cls._models)

is_registered classmethod

is_registered(model)

Check if a model is registered.

Parameters:

Name Type Description Default
model type

Model class to check

required

Returns:

Type Description
bool

True if model is registered

Source code in type_bridge/migration/registry.py
@classmethod
def is_registered(cls, model: type) -> bool:
    """Check if a model is registered.

    Args:
        model: Model class to check

    Returns:
        True if model is registered
    """
    return model in cls._models

discover classmethod

discover(module_path, register=True)

Auto-discover Entity/Relation classes from a module.

Imports the module and finds all Entity/Relation subclasses defined in it.

Parameters:

Name Type Description Default
module_path str

Python module path (e.g., "myapp.models")

required
register bool

If True, also register discovered models

True

Returns:

Type Description
list[type[Entity | Relation]]

List of discovered Entity/Relation classes

Raises:

Type Description
ImportError

If module cannot be imported

Source code in type_bridge/migration/registry.py
@classmethod
def discover(cls, module_path: str, register: bool = True) -> list[type[Entity | Relation]]:
    """Auto-discover Entity/Relation classes from a module.

    Imports the module and finds all Entity/Relation subclasses defined in it.

    Args:
        module_path: Python module path (e.g., "myapp.models")
        register: If True, also register discovered models

    Returns:
        List of discovered Entity/Relation classes

    Raises:
        ImportError: If module cannot be imported
    """
    from type_bridge.models import Entity, Relation

    logger.info(f"Discovering models from: {module_path}")

    module = importlib.import_module(module_path)
    discovered: list[type[Entity | Relation]] = []

    for name in dir(module):
        # Skip private/magic attributes
        if name.startswith("_"):
            continue

        obj = getattr(module, name)

        # Must be a class
        if not isinstance(obj, type):
            continue

        # Must be defined in this module (not imported)
        if obj.__module__ != module_path:
            continue

        # Must be Entity or Relation subclass (but not the base classes)
        if issubclass(obj, (Entity, Relation)) and obj not in (Entity, Relation):
            discovered.append(obj)
            logger.debug(f"Discovered model: {obj.__name__}")

            if register:
                cls.register(obj)

    logger.info(f"Discovered {len(discovered)} models from {module_path}")
    return discovered

discover_recursive classmethod

discover_recursive(package_path, register=True)

Recursively discover models from a package.

Imports all modules in the package and discovers Entity/Relation classes.

Parameters:

Name Type Description Default
package_path str

Python package path (e.g., "myapp")

required
register bool

If True, also register discovered models

True

Returns:

Type Description
list[type[Entity | Relation]]

List of discovered Entity/Relation classes

Raises:

Type Description
ImportError

If package cannot be imported

Source code in type_bridge/migration/registry.py
@classmethod
def discover_recursive(
    cls, package_path: str, register: bool = True
) -> list[type[Entity | Relation]]:
    """Recursively discover models from a package.

    Imports all modules in the package and discovers Entity/Relation classes.

    Args:
        package_path: Python package path (e.g., "myapp")
        register: If True, also register discovered models

    Returns:
        List of discovered Entity/Relation classes

    Raises:
        ImportError: If package cannot be imported
    """
    import pkgutil

    logger.info(f"Recursively discovering models from: {package_path}")

    package = importlib.import_module(package_path)
    discovered: list[type[Entity | Relation]] = []

    # First discover in the package itself
    discovered.extend(cls.discover(package_path, register=register))

    # Then discover in submodules
    if hasattr(package, "__path__"):
        for importer, modname, ispkg in pkgutil.walk_packages(
            package.__path__, prefix=f"{package_path}."
        ):
            try:
                submodule_models = cls.discover(modname, register=register)
                discovered.extend(submodule_models)
            except ImportError as e:
                logger.warning(f"Could not import {modname}: {e}")
                continue

    logger.info(f"Recursively discovered {len(discovered)} models from {package_path}")
    return discovered

RolePlayerChange dataclass

RolePlayerChange(role_name, added_player_types=list(), removed_player_types=list())

Represents a change in role player types.

Tracks when entity types are added to or removed from a role's allowed players.

Example

If a role changes from Role[Person] to Role[Person, Company]: - added_player_types = ["company"] - removed_player_types = []

has_changes

has_changes()

Check if there are any player type changes.

Source code in type_bridge/migration/diff.py
def has_changes(self) -> bool:
    """Check if there are any player type changes."""
    return bool(self.added_player_types or self.removed_player_types)

SchemaConflictError

SchemaConflictError(diff, message=None)

Bases: Exception

Raised when there are conflicting schema changes during sync.

This exception is raised when attempting to sync a schema that has breaking changes (removed or modified types/attributes) compared to the existing database schema.

Initialize SchemaConflictError.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff containing the conflicting changes

required
message str | None

Optional custom error message

None
Source code in type_bridge/migration/exceptions.py
def __init__(self, diff: SchemaDiff, message: str | None = None):
    """Initialize SchemaConflictError.

    Args:
        diff: SchemaDiff containing the conflicting changes
        message: Optional custom error message
    """
    self.diff = diff

    if message is None:
        message = self._build_default_message()

    super().__init__(message)

has_breaking_changes

has_breaking_changes()

Check if the diff contains breaking changes.

Breaking changes include removed or modified types/attributes.

Returns:

Type Description
bool

True if there are breaking changes

Source code in type_bridge/migration/exceptions.py
def has_breaking_changes(self) -> bool:
    """Check if the diff contains breaking changes.

    Breaking changes include removed or modified types/attributes.

    Returns:
        True if there are breaking changes
    """
    return bool(
        self.diff.removed_entities
        or self.diff.removed_relations
        or self.diff.removed_attributes
        or self.diff.modified_entities
        or self.diff.modified_relations
    )

SchemaInfo

SchemaInfo()

Container for organized schema information.

Initialize SchemaInfo with empty collections.

Source code in type_bridge/migration/info.py
def __init__(self):
    """Initialize SchemaInfo with empty collections."""
    self.entities: list[type[Entity]] = []
    self.relations: list[type[Relation]] = []
    self.attribute_classes: set[type[Attribute]] = set()

get_entity_by_name

get_entity_by_name(name)

Get entity by type name.

Parameters:

Name Type Description Default
name str

Entity type name

required

Returns:

Type Description
type[Entity] | None

Entity class or None if not found

Source code in type_bridge/migration/info.py
def get_entity_by_name(self, name: str) -> type[Entity] | None:
    """Get entity by type name.

    Args:
        name: Entity type name

    Returns:
        Entity class or None if not found
    """
    for entity in self.entities:
        if entity.get_type_name() == name:
            return entity
    return None

get_relation_by_name

get_relation_by_name(name)

Get relation by type name.

Parameters:

Name Type Description Default
name str

Relation type name

required

Returns:

Type Description
type[Relation] | None

Relation class or None if not found

Source code in type_bridge/migration/info.py
def get_relation_by_name(self, name: str) -> type[Relation] | None:
    """Get relation by type name.

    Args:
        name: Relation type name

    Returns:
        Relation class or None if not found
    """
    for relation in self.relations:
        if relation.get_type_name() == name:
            return relation
    return None

validate

validate()

Validate schema definitions for TypeDB constraints.

Raises:

Type Description
SchemaValidationError

If schema violates TypeDB constraints

Source code in type_bridge/migration/info.py
def validate(self) -> None:
    """Validate schema definitions for TypeDB constraints.

    Raises:
        SchemaValidationError: If schema violates TypeDB constraints
    """
    # Validate entities
    for entity_model in self.entities:
        self._validate_no_duplicate_attribute_types(entity_model, entity_model.get_type_name())

    # Validate relations
    for relation_model in self.relations:
        self._validate_no_duplicate_attribute_types(
            relation_model, relation_model.get_type_name()
        )

to_typeql

to_typeql()

Generate TypeQL schema definition from collected schema information.

Base classes (with base=True) are skipped as they don't appear in TypeDB schema.

Validates the schema before generation.

Returns:

Type Description
str

TypeQL schema definition string

Raises:

Type Description
SchemaValidationError

If schema validation fails

Source code in type_bridge/migration/info.py
def to_typeql(self) -> str:
    """Generate TypeQL schema definition from collected schema information.

    Base classes (with base=True) are skipped as they don't appear in TypeDB schema.

    Validates the schema before generation.

    Returns:
        TypeQL schema definition string

    Raises:
        SchemaValidationError: If schema validation fails
    """
    # Validate schema before generation
    self.validate()

    lines = []

    # Define attributes first
    lines.append("define")
    lines.append("")

    # Sort attributes by name for consistent output
    sorted_attrs = sorted(self.attribute_classes, key=lambda x: x.get_attribute_name())
    for attr_class in sorted_attrs:
        lines.append(attr_class.to_schema_definition())

    lines.append("")

    # Define entities (skip base classes)
    for entity_model in self.entities:
        schema_def = entity_model.to_schema_definition()
        if schema_def is not None:  # Skip base classes
            lines.append(schema_def)
            lines.append("")

    # Define relations (skip base classes)
    for relation_model in self.relations:
        schema_def = relation_model.to_schema_definition()
        if schema_def is not None:  # Skip base classes
            lines.append(schema_def)

            # Add role player definitions
            for role_name, role in relation_model._roles.items():
                for player_type in role.player_types:
                    lines.append(
                        f"{player_type} plays {relation_model.get_type_name()}:{role.role_name};"
                    )
            lines.append("")

    return "\n".join(lines)

compare

compare(other)

Compare this schema with another schema.

Parameters:

Name Type Description Default
other SchemaInfo

Another SchemaInfo to compare against

required

Returns:

Type Description
SchemaDiff

SchemaDiff containing all differences between the schemas

Source code in type_bridge/migration/info.py
def compare(self, other: "SchemaInfo") -> SchemaDiff:
    """Compare this schema with another schema.

    Args:
        other: Another SchemaInfo to compare against

    Returns:
        SchemaDiff containing all differences between the schemas
    """
    diff = SchemaDiff()

    # Compare entities by type name (not Python object identity)
    self_entity_by_name = {e.get_type_name(): e for e in self.entities}
    other_entity_by_name = {e.get_type_name(): e for e in other.entities}

    self_ent_names = set(self_entity_by_name.keys())
    other_ent_names = set(other_entity_by_name.keys())

    diff.added_entities = {other_entity_by_name[n] for n in other_ent_names - self_ent_names}
    diff.removed_entities = {self_entity_by_name[n] for n in self_ent_names - other_ent_names}

    # Compare entities that exist in both (by type name)
    for type_name in self_ent_names & other_ent_names:
        self_ent = self_entity_by_name[type_name]
        other_ent = other_entity_by_name[type_name]
        entity_changes = self._compare_entity(self_ent, other_ent)
        if entity_changes:
            diff.modified_entities[other_ent] = entity_changes

    # Compare relations by type name (not Python object identity)
    self_relation_by_name = {r.get_type_name(): r for r in self.relations}
    other_relation_by_name = {r.get_type_name(): r for r in other.relations}

    self_rel_names = set(self_relation_by_name.keys())
    other_rel_names = set(other_relation_by_name.keys())

    diff.added_relations = {other_relation_by_name[n] for n in other_rel_names - self_rel_names}
    diff.removed_relations = {
        self_relation_by_name[n] for n in self_rel_names - other_rel_names
    }

    # Compare relations that exist in both (by type name)
    for type_name in self_rel_names & other_rel_names:
        self_rel = self_relation_by_name[type_name]
        other_rel = other_relation_by_name[type_name]
        relation_changes = self._compare_relation(self_rel, other_rel)
        if relation_changes:
            diff.modified_relations[other_rel] = relation_changes

    # Compare attributes
    diff.added_attributes = other.attribute_classes - self.attribute_classes
    diff.removed_attributes = self.attribute_classes - other.attribute_classes

    return diff

SchemaIntrospector

SchemaIntrospector(db)

Introspects TypeDB database schema.

Queries the database to discover all types, attributes, ownerships, and relations defined in the schema.

Example

introspector = SchemaIntrospector(db) schema = introspector.introspect()

print(f"Found {len(schema.entities)} entities") print(f"Found {len(schema.relations)} relations") print(f"Found {len(schema.attributes)} attributes")

Initialize introspector.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/introspection.py
def __init__(self, db: Database):
    """Initialize introspector.

    Args:
        db: Database connection
    """
    self.db = db

introspect_for_models

introspect_for_models(models)

Introspect database schema for specific model types.

This is the TypeDB 3.x compatible approach that checks each model type individually instead of enumerating all types.

Parameters:

Name Type Description Default
models list[type[Entity] | type[Relation]]

List of model classes to check

required

Returns:

Type Description
IntrospectedSchema

IntrospectedSchema with info about existing types

Source code in type_bridge/migration/introspection.py
def introspect_for_models(
    self, models: list[type[Entity] | type[Relation]]
) -> IntrospectedSchema:
    """Introspect database schema for specific model types.

    This is the TypeDB 3.x compatible approach that checks each
    model type individually instead of enumerating all types.

    Args:
        models: List of model classes to check

    Returns:
        IntrospectedSchema with info about existing types
    """
    from type_bridge.models import Entity, Relation

    schema = IntrospectedSchema()

    if not self.db.database_exists():
        logger.debug("Database does not exist, returning empty schema")
        return schema

    logger.info(f"Introspecting database schema for {len(models)} model types")

    # Collect unique attribute types from all models
    attr_types: set[type[Attribute]] = set()
    for model in models:
        if hasattr(model, "get_owned_attributes"):
            for attr_info in model.get_owned_attributes().values():
                attr_types.add(attr_info.typ)

    # Check each attribute type
    for attr_type in attr_types:
        attr_name = attr_type.get_attribute_name()
        if type_exists(self.db, attr_name):
            schema.attributes[attr_name] = IntrospectedAttribute(
                name=attr_name,
                value_type=getattr(attr_type, "_value_type", "string"),
            )
            logger.debug(f"Found existing attribute: {attr_name}")

    # Check each model type
    for model in models:
        type_name = model.get_type_name()

        if issubclass(model, Entity) and model is not Entity:
            if type_exists(self.db, type_name):
                schema.entities[type_name] = IntrospectedEntity(name=type_name)
                logger.debug(f"Found existing entity: {type_name}")

                # Check ownerships (pass model for fallback)
                self._introspect_ownerships_for_type(schema, type_name, model)

        elif issubclass(model, Relation) and model is not Relation:
            if type_exists(self.db, type_name):
                schema.relations[type_name] = IntrospectedRelation(name=type_name)
                logger.debug(f"Found existing relation: {type_name}")

                # Check roles and role players
                self._introspect_roles_for_relation(schema, type_name, model)

    logger.info(
        f"Introspected: {len(schema.entities)} entities, "
        f"{len(schema.relations)} relations, "
        f"{len(schema.attributes)} attributes"
    )

    return schema

introspect

introspect()

Query TypeDB schema and return structured info.

Returns:

Type Description
IntrospectedSchema

IntrospectedSchema with all discovered types

Source code in type_bridge/migration/introspection.py
def introspect(self) -> IntrospectedSchema:
    """Query TypeDB schema and return structured info.

    Returns:
        IntrospectedSchema with all discovered types
    """
    schema = IntrospectedSchema()

    if not self.db.database_exists():
        logger.debug("Database does not exist, returning empty schema")
        return schema

    logger.info("Introspecting database schema")

    # Query all schema information
    self._introspect_entities(schema)
    self._introspect_relations(schema)
    self._introspect_attributes(schema)
    self._introspect_ownerships(schema)
    self._introspect_role_players(schema)

    logger.info(
        f"Introspected: {len(schema.entities)} entities, "
        f"{len(schema.relations)} relations, "
        f"{len(schema.attributes)} attributes"
    )

    return schema

SchemaManager

SchemaManager(db)

Manager for database schema operations.

Initialize schema manager.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/schema_manager.py
def __init__(self, db: Database):
    """Initialize schema manager.

    Args:
        db: Database connection
    """
    self.db = db
    self.registered_models = []

register

register(*models)

Register model classes for schema management.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Model classes to register

()
Source code in type_bridge/migration/schema_manager.py
def register(self, *models: type[Entity | Relation]) -> None:
    """Register model classes for schema management.

    Args:
        models: Model classes to register
    """
    for model in models:
        if model not in self.registered_models:
            logger.debug(f"Registering model: {model.__name__}")
            self.registered_models.append(model)
        else:
            logger.debug(f"Model already registered: {model.__name__}")

collect_schema_info

collect_schema_info()

Collect schema information from registered models.

Returns:

Type Description
SchemaInfo

SchemaInfo with entities, relations, and attributes

Source code in type_bridge/migration/schema_manager.py
def collect_schema_info(self) -> SchemaInfo:
    """Collect schema information from registered models.

    Returns:
        SchemaInfo with entities, relations, and attributes
    """
    logger.debug(f"Collecting schema info from {len(self.registered_models)} registered models")
    schema_info = SchemaInfo()

    for model in self.registered_models:
        if issubclass(model, Entity) and model is not Entity:
            logger.debug(f"Adding entity to schema: {model.__name__}")
            schema_info.entities.append(model)
        elif issubclass(model, Relation) and model is not Relation:
            logger.debug(f"Adding relation to schema: {model.__name__}")
            schema_info.relations.append(model)

        # Collect all attribute classes owned by this model
        owned_attrs = model.get_owned_attributes()
        for field_name, attr_info in owned_attrs.items():
            logger.debug(
                f"Adding attribute class: {attr_info.typ.__name__} (owned by {model.__name__})"
            )
            schema_info.attribute_classes.add(attr_info.typ)

    logger.info(
        f"Schema info collected: {len(schema_info.entities)} entities, "
        f"{len(schema_info.relations)} relations, {len(schema_info.attribute_classes)} attributes"
    )
    return schema_info

generate_schema

generate_schema()

Generate complete TypeQL schema definition.

Returns:

Type Description
str

TypeQL schema definition string

Source code in type_bridge/migration/schema_manager.py
def generate_schema(self) -> str:
    """Generate complete TypeQL schema definition.

    Returns:
        TypeQL schema definition string
    """
    logger.debug("Generating TypeQL schema definition")
    # Collect schema information and generate TypeQL
    schema_info = self.collect_schema_info()
    typeql = schema_info.to_typeql()
    logger.debug(f"Generated TypeQL schema ({len(typeql)} chars)")
    logger.debug(f"Schema:\n{typeql}")
    return typeql

has_existing_schema

has_existing_schema()

Check if database has existing schema defined.

Returns:

Type Description
bool

True if database exists and has custom schema beyond built-in types

Source code in type_bridge/migration/schema_manager.py
def has_existing_schema(self) -> bool:
    """Check if database has existing schema defined.

    Returns:
        True if database exists and has custom schema beyond built-in types
    """
    logger.debug("Checking for existing schema in database")
    if not self.db.database_exists():
        logger.debug("Database does not exist, no existing schema")
        return False

    # Check if any of the registered types already exist in the schema
    # This is the most reliable way in TypeDB 3.x
    for model in self.registered_models:
        if issubclass(model, Entity) and model is not Entity:
            type_name = model.get_type_name()
            if type_exists(self.db, type_name):
                logger.debug(f"Found existing entity type: {type_name}")
                return True
        elif issubclass(model, Relation) and model is not Relation:
            type_name = model.get_type_name()
            if type_exists(self.db, type_name):
                logger.debug(f"Found existing relation type: {type_name}")
                return True

    logger.debug("No existing schema found for registered models")
    return False

introspect_current_schema_info

introspect_current_schema_info()

Introspect current database schema and build SchemaInfo.

Note: This is a best-effort attempt. It cannot perfectly reconstruct Python class hierarchies from TypeDB schema.

Returns:

Type Description
SchemaInfo | None

SchemaInfo with current schema, or None if database doesn't exist

Source code in type_bridge/migration/schema_manager.py
def introspect_current_schema_info(self) -> SchemaInfo | None:
    """Introspect current database schema and build SchemaInfo.

    Note: This is a best-effort attempt. It cannot perfectly reconstruct
    Python class hierarchies from TypeDB schema.

    Returns:
        SchemaInfo with current schema, or None if database doesn't exist
    """
    if not self.db.database_exists():
        return None

    # For now, we return None and rely on has_existing_schema()
    # Full reconstruction would require complex TypeDB schema introspection
    return None

verify_compatibility

verify_compatibility(old_schema_info)

Verify that new schema is compatible with old schema.

Checks for breaking changes (removed or modified types/attributes) and raises SchemaConflictError if found.

Parameters:

Name Type Description Default
old_schema_info SchemaInfo

The previous schema to compare against

required

Raises:

Type Description
SchemaConflictError

If breaking changes are detected

Source code in type_bridge/migration/schema_manager.py
def verify_compatibility(self, old_schema_info: SchemaInfo) -> None:
    """Verify that new schema is compatible with old schema.

    Checks for breaking changes (removed or modified types/attributes)
    and raises SchemaConflictError if found.

    Args:
        old_schema_info: The previous schema to compare against

    Raises:
        SchemaConflictError: If breaking changes are detected
    """
    logger.debug("Verifying schema compatibility")
    new_schema_info = self.collect_schema_info()
    diff = old_schema_info.compare(new_schema_info)

    # Check for breaking changes
    has_breaking_changes = bool(
        diff.removed_entities
        or diff.removed_relations
        or diff.removed_attributes
        or diff.modified_entities
        or diff.modified_relations
    )

    if has_breaking_changes:
        logger.warning(f"Breaking schema changes detected: {diff}")
        raise SchemaConflictError(diff)

    logger.debug("Schema compatibility verified - no breaking changes")

sync_schema

sync_schema(force=False, skip_if_exists=False)

Synchronize database schema with registered models.

Automatically checks for existing schema in the database and raises SchemaConflictError if schema already exists and might conflict.

Parameters:

Name Type Description Default
force bool

If True, recreate database from scratch, ignoring conflicts

False
skip_if_exists bool

If True, skip conflict checks when types already exist. Use this for idempotent deployments where you want to ensure the schema is in place without recreating the database. TypeDB 3.x's define statement is idempotent for identical definitions.

False

Raises:

Type Description
SchemaConflictError

If database has existing schema and force=False and skip_if_exists=False

Source code in type_bridge/migration/schema_manager.py
def sync_schema(self, force: bool = False, skip_if_exists: bool = False) -> None:
    """Synchronize database schema with registered models.

    Automatically checks for existing schema in the database and raises
    SchemaConflictError if schema already exists and might conflict.

    Args:
        force: If True, recreate database from scratch, ignoring conflicts
        skip_if_exists: If True, skip conflict checks when types already exist.
                       Use this for idempotent deployments where you want to ensure
                       the schema is in place without recreating the database.
                       TypeDB 3.x's define statement is idempotent for identical
                       definitions.

    Raises:
        SchemaConflictError: If database has existing schema and force=False
                            and skip_if_exists=False
    """
    logger.info(f"Syncing schema (force={force}, skip_if_exists={skip_if_exists})")
    # Check for existing schema before making changes
    if not force and not skip_if_exists and self.has_existing_schema():
        logger.debug("Existing schema detected, checking for conflicts")
        # In TypeDB 3.x, schema introspection is limited without instances
        # For safety, we treat any attempt to redefine existing types as a potential conflict
        existing_types = []
        for model in self.registered_models:
            if issubclass(model, Entity) and model is not Entity:
                type_name = model.get_type_name()
                if type_exists(self.db, type_name):
                    existing_types.append(f"entity '{type_name}'")
            elif issubclass(model, Relation) and model is not Relation:
                type_name = model.get_type_name()
                if type_exists(self.db, type_name):
                    existing_types.append(f"relation '{type_name}'")

        if existing_types:
            from type_bridge.migration.diff import SchemaDiff

            types_str = ", ".join(existing_types)
            logger.error(f"Schema conflict: types already exist: {types_str}")
            raise SchemaConflictError(
                SchemaDiff(),
                message=(
                    f"Schema conflict detected! The following types already exist in the database: {types_str}\n"
                    "\n"
                    "Redefining existing types may cause:\n"
                    "  - Data loss if attributes or roles are removed\n"
                    "  - Schema conflicts if types are modified\n"
                    "  - Undefined behavior if ownership changes\n"
                    "\n"
                    "Resolution options:\n"
                    "1. Use sync_schema(force=True) to recreate database from scratch (⚠️  DATA LOSS)\n"
                    "2. Manually drop the existing database first\n"
                    "3. Use MigrationManager for incremental schema changes\n"
                    "4. Ensure no conflicting types exist before syncing\n"
                ),
            )

    if force:
        # Delete and recreate database
        logger.info("Force mode: recreating database from scratch")
        if self.db.database_exists():
            logger.debug("Deleting existing database")
            self.db.delete_database()
        self.db.create_database()

    # Ensure database exists
    if not self.db.database_exists():
        logger.debug("Creating database")
        self.db.create_database()

    # Generate and apply schema
    schema = self.generate_schema()

    logger.debug("Applying schema to database")
    with self.db.transaction("schema") as tx:
        tx.execute(schema)
        tx.commit()
    logger.info("Schema synchronized successfully")

drop_schema

drop_schema()

Drop all schema definitions.

Source code in type_bridge/migration/schema_manager.py
def drop_schema(self) -> None:
    """Drop all schema definitions."""
    logger.info("Dropping schema")
    if self.db.database_exists():
        self.db.delete_database()
        logger.info("Schema dropped (database deleted)")
    else:
        logger.debug("Database does not exist, nothing to drop")

introspect_schema

introspect_schema()

Introspect current database schema.

Returns:

Type Description
dict[str, list[str]]

Dictionary of schema information

Source code in type_bridge/migration/schema_manager.py
def introspect_schema(self) -> dict[str, list[str]]:
    """Introspect current database schema.

    Returns:
        Dictionary of schema information
    """
    logger.debug("Introspecting database schema")
    # Query to get all types
    query = """
    match
    $x sub thing;
    fetch
    $x: label;
    """

    with self.db.transaction("read") as tx:
        results = tx.execute(query)

    schema_info: dict[str, list[str]] = {"entities": [], "relations": [], "attributes": []}

    for result in results:
        # Parse result to categorize types
        # This is a simplified implementation
        pass

    logger.debug(f"Schema introspection complete: {schema_info}")
    return schema_info

SchemaValidationError

Bases: Exception

Raised when schema validation fails during schema generation.

This exception is raised when the Python model definitions violate TypeDB constraints or best practices.

MigrationManager

MigrationManager(db)

Manager for schema migrations.

Initialize migration manager.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/simple_migration.py
def __init__(self, db: Database):
    """Initialize migration manager.

    Args:
        db: Database connection
    """
    self.db = db
    self.migrations: list[tuple[str, str]] = []

add_migration

add_migration(name, schema)

Add a migration.

Parameters:

Name Type Description Default
name str

Migration name

required
schema str

TypeQL schema definition

required
Source code in type_bridge/migration/simple_migration.py
def add_migration(self, name: str, schema: str) -> None:
    """Add a migration.

    Args:
        name: Migration name
        schema: TypeQL schema definition
    """
    logger.debug(f"Adding migration: {name} ({len(schema)} chars)")
    self.migrations.append((name, schema))

apply_migrations

apply_migrations()

Apply all pending migrations.

Source code in type_bridge/migration/simple_migration.py
def apply_migrations(self) -> None:
    """Apply all pending migrations."""
    logger.info(f"Applying {len(self.migrations)} migration(s)")
    for name, schema in self.migrations:
        logger.info(f"Applying migration: {name}")
        logger.debug(f"Migration schema:\n{schema}")

        with self.db.transaction("schema") as tx:
            tx.execute(schema)
            tx.commit()

        logger.info(f"Migration {name} applied successfully")
    logger.info("All migrations applied")

create_attribute_migration

create_attribute_migration(attr_name, value_type)

Create a migration to add an attribute.

Parameters:

Name Type Description Default
attr_name str

Attribute name

required
value_type str

Value type

required

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_attribute_migration(self, attr_name: str, value_type: str) -> str:
    """Create a migration to add an attribute.

    Args:
        attr_name: Attribute name
        value_type: Value type

    Returns:
        TypeQL migration
    """
    return f"define\nattribute {attr_name}, value {value_type};"

create_entity_migration

create_entity_migration(entity_name, attributes)

Create a migration to add an entity.

Parameters:

Name Type Description Default
entity_name str

Entity name

required
attributes list[str]

List of attribute names

required

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_entity_migration(self, entity_name: str, attributes: list[str]) -> str:
    """Create a migration to add an entity.

    Args:
        entity_name: Entity name
        attributes: List of attribute names

    Returns:
        TypeQL migration
    """
    lines = ["define", f"entity {entity_name}"]
    for attr in attributes:
        lines.append(f"    owns {attr}")
    lines.append(";")
    return "\n".join(lines)

create_relation_migration

create_relation_migration(relation_name, roles, attributes=None)

Create a migration to add a relation.

Parameters:

Name Type Description Default
relation_name str

Relation name

required
roles list[tuple[str, str]]

List of (role_name, player_type) tuples

required
attributes list[str] | None

Optional list of attribute names

None

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_relation_migration(
    self, relation_name: str, roles: list[tuple[str, str]], attributes: list[str] | None = None
) -> str:
    """Create a migration to add a relation.

    Args:
        relation_name: Relation name
        roles: List of (role_name, player_type) tuples
        attributes: Optional list of attribute names

    Returns:
        TypeQL migration
    """
    lines = ["define", f"relation {relation_name}"]

    seen_roles: set[str] = set()
    for role_name, _ in roles:
        if role_name in seen_roles:
            continue
        seen_roles.add(role_name)
        lines.append(f"    relates {role_name}")

    if attributes:
        for attr in attributes:
            lines.append(f"    owns {attr}")

    lines.append(";")
    lines.append("")

    # Add role player definitions
    for role_name, player_type in roles:
        lines.append(f"{player_type} plays {relation_name}:{role_name};")

    return "\n".join(lines)

Entity

Bases: TypeDBType

Base class for TypeDB entities with Pydantic validation.

Entities own attributes defined as Attribute subclasses. Use TypeFlags to configure type name and abstract status. Supertype is determined automatically from Python inheritance.

This class inherits from TypeDBType and Pydantic's BaseModel, providing: - Automatic validation of attribute values - JSON serialization/deserialization - Type checking and coercion - Field metadata via Pydantic's Field()

Example

class Name(String): pass

class Age(Integer): pass

class Person(Entity): flags = TypeFlags(name="person") name: Name = Flag(Key) age: Age

Abstract entity

class AbstractPerson(Entity): flags = TypeFlags(abstract=True) name: Name

Inheritance (Person sub abstract-person)

class ConcretePerson(AbstractPerson): age: Age

__init_subclass__

__init_subclass__()

Called when Entity subclass is created.

Source code in type_bridge/models/entity.py
def __init_subclass__(cls) -> None:
    """Called when Entity subclass is created."""
    super().__init_subclass__()
    logger.debug(f"Initializing Entity subclass: {cls.__name__}")

    from type_bridge.models.schema_scanner import SchemaScanner

    scanner = SchemaScanner(cls)
    cls._owned_attrs = scanner.scan_attributes(is_relation=False)

    from type_bridge.models.registry import ModelRegistry

    ModelRegistry.register_attribute_owners(cls)

to_schema_definition classmethod

to_schema_definition()

Generate TypeQL schema definition for this entity.

Returns:

Type Description
str | None

TypeQL schema definition string, or None if this is a base class

Source code in type_bridge/models/entity.py
@classmethod
def to_schema_definition(cls) -> str | None:
    """Generate TypeQL schema definition for this entity.

    Returns:
        TypeQL schema definition string, or None if this is a base class
    """
    from type_bridge.typeql.annotations import format_type_annotations

    # Base classes don't appear in TypeDB schema
    if cls.is_base():
        return None

    type_name = cls.get_type_name()
    lines = []

    # Define entity type with supertype from Python inheritance
    # TypeDB 3.x syntax: entity name @abstract, sub parent,
    supertype = cls.get_supertype()
    type_annotations = format_type_annotations(abstract=cls.is_abstract())

    entity_def = f"entity {type_name}"
    if type_annotations:
        entity_def += " " + " ".join(type_annotations)
    if supertype:
        entity_def += f", sub {supertype}"

    lines.append(entity_def)

    # Add attribute ownerships using shared helper
    lines.extend(cls._build_owns_lines())

    # Join with commas, but end with semicolon (no comma before semicolon)
    return ",\n".join(lines) + ";"

to_ast

to_ast(var='$e')

Generate AST InsertClause for this instance.

Parameters:

Name Type Description Default
var str

Variable name to use

'$e'

Returns:

Type Description
InsertClause

InsertClause containing statements

Source code in type_bridge/models/entity.py
def to_ast(self, var: str = "$e") -> InsertClause:
    """Generate AST InsertClause for this instance.

    Args:
        var: Variable name to use

    Returns:
        InsertClause containing statements
    """
    from type_bridge.query.ast import InsertClause, IsaStatement, Statement

    type_name = self.get_type_name()
    statements: list[Statement] = [IsaStatement(variable=var, type_name=type_name)]

    # Add attribute statements using shared helper from TypeDBType
    statements.extend(self._build_attribute_statements(var))

    return InsertClause(statements=statements)

get_match_clause_info

get_match_clause_info(var_name='$e')

Get match clause info for this entity instance.

Prefers IID-based matching when available (most precise). Falls back to @key attribute matching.

Parameters:

Name Type Description Default
var_name str

Variable name to use in the match clause

'$e'

Returns:

Type Description
MatchClauseInfo

MatchClauseInfo with the match clause

Raises:

Type Description
ValueError

If entity has neither _iid nor key attributes

Source code in type_bridge/models/entity.py
def get_match_clause_info(self, var_name: str = "$e") -> MatchClauseInfo:
    """Get match clause info for this entity instance.

    Prefers IID-based matching when available (most precise).
    Falls back to @key attribute matching.

    Args:
        var_name: Variable name to use in the match clause

    Returns:
        MatchClauseInfo with the match clause

    Raises:
        ValueError: If entity has neither _iid nor key attributes
    """
    type_name = self.get_type_name()

    # Prefer IID-based matching when available
    entity_iid = getattr(self, "_iid", None)
    if entity_iid:
        main_clause = f"{var_name} isa {type_name}, iid {entity_iid}"
        return MatchClauseInfo(main_clause=main_clause, extra_clauses=[], var_name=var_name)

    # Fall back to key attribute matching
    key_attrs = {
        field_name: attr_info
        for field_name, attr_info in self.get_all_attributes().items()
        if attr_info.flags.is_key
    }

    if key_attrs:
        parts = [f"{var_name} isa {type_name}"]
        for field_name, attr_info in key_attrs.items():
            value = getattr(self, field_name, None)
            if value is None:
                from type_bridge.crud.exceptions import KeyAttributeError

                raise KeyAttributeError(
                    entity_type=self.__class__.__name__,
                    operation="identify",
                    field_name=field_name,
                )
            attr_name = attr_info.typ.get_attribute_name()
            parts.append(f"has {attr_name} {self._format_value(value)}")
        main_clause = ", ".join(parts)
        return MatchClauseInfo(main_clause=main_clause, extra_clauses=[], var_name=var_name)

    # Neither IID nor key attributes available
    raise ValueError(
        f"Entity '{self.__class__.__name__}' cannot be identified: "
        f"no _iid set and no @key attributes defined. Either fetch the entity from the "
        f"database first (to populate _iid) or add Flag(Key) to an attribute."
    )

get_match_pattern

get_match_pattern(var_name='$e')

Get an AST EntityPattern for matching this entity instance.

Prefers IID-based matching when available (most precise). Falls back to @key attribute matching.

Parameters:

Name Type Description Default
var_name str

Variable name to use in the pattern

'$e'

Returns:

Type Description
EntityPattern

EntityPattern AST node

Raises:

Type Description
ValueError

If entity has neither _iid nor key attributes

Source code in type_bridge/models/entity.py
def get_match_pattern(self, var_name: str = "$e") -> EntityPattern:
    """Get an AST EntityPattern for matching this entity instance.

    Prefers IID-based matching when available (most precise).
    Falls back to @key attribute matching.

    Args:
        var_name: Variable name to use in the pattern

    Returns:
        EntityPattern AST node

    Raises:
        ValueError: If entity has neither _iid nor key attributes
    """
    from type_bridge.query.ast import EntityPattern

    type_name = self.get_type_name()
    constraints = self._build_identification_constraints()
    return EntityPattern(variable=var_name, type_name=type_name, constraints=constraints)

to_dict

to_dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False)

Serialize the entity to a primitive dict.

Parameters:

Name Type Description Default
include set[str] | None

Optional set of field names to include.

None
exclude set[str] | None

Optional set of field names to exclude.

None
by_alias bool

When True, use attribute TypeQL names instead of Python field names.

False
exclude_unset bool

When True, omit fields that were never explicitly set.

False
Source code in type_bridge/models/entity.py
def to_dict(
    self,
    *,
    include: set[str] | None = None,
    exclude: set[str] | None = None,
    by_alias: bool = False,
    exclude_unset: bool = False,
) -> dict[str, Any]:
    """Serialize the entity to a primitive dict.

    Args:
        include: Optional set of field names to include.
        exclude: Optional set of field names to exclude.
        by_alias: When True, use attribute TypeQL names instead of Python field names.
        exclude_unset: When True, omit fields that were never explicitly set.
    """
    # Let Pydantic handle include/exclude/exclude_unset, then unwrap Attribute values.
    dumped = self.model_dump(
        include=include,
        exclude=exclude,
        by_alias=False,
        exclude_unset=exclude_unset,
    )

    attrs = self.get_all_attributes()
    result: dict[str, Any] = {}

    for field_name, raw_value in dumped.items():
        attr_info = attrs[field_name]
        key = attr_info.typ.get_attribute_name() if by_alias else field_name
        if by_alias and key in result and key != field_name:
            # Avoid collisions when multiple fields share the same attribute type
            key = field_name
        result[key] = self._unwrap_value(raw_value)

    return result

from_dict classmethod

from_dict(data, *, field_mapping=None, strict=True)

Construct an Entity from a plain dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

External data to hydrate the Entity.

required
field_mapping dict[str, str] | None

Optional mapping of external keys to internal field names.

None
strict bool

When True, raise on unknown fields; otherwise ignore them.

True
Source code in type_bridge/models/entity.py
@classmethod
def from_dict(
    cls,
    data: dict[str, Any],
    *,
    field_mapping: dict[str, str] | None = None,
    strict: bool = True,
) -> Self:
    """Construct an Entity from a plain dictionary.

    Args:
        data: External data to hydrate the Entity.
        field_mapping: Optional mapping of external keys to internal field names.
        strict: When True, raise on unknown fields; otherwise ignore them.
    """
    mapping = field_mapping or {}
    attrs = cls.get_all_attributes()
    alias_to_field = {info.typ.get_attribute_name(): name for name, info in attrs.items()}
    normalized: dict[str, Any] = {}

    for raw_key, raw_value in data.items():
        internal_key = mapping.get(raw_key, raw_key)
        if internal_key not in attrs and raw_key in alias_to_field:
            internal_key = alias_to_field[raw_key]

        if internal_key not in attrs:
            if strict:
                raise ValueError(f"Unknown field '{raw_key}' for {cls.__name__}")
            continue

        if raw_value is None:
            continue

        attr_info = attrs[internal_key]
        wrapped_value = cls._wrap_attribute_value(raw_value, attr_info)

        if wrapped_value is None:
            continue

        normalized[internal_key] = wrapped_value

    return cls(**normalized)

__repr__

__repr__()

Developer-friendly string representation of entity.

Source code in type_bridge/models/entity.py
def __repr__(self) -> str:
    """Developer-friendly string representation of entity."""
    field_strs = []
    for field_name in self._owned_attrs:
        value = getattr(self, field_name, None)
        if value is not None:
            field_strs.append(f"{field_name}={value!r}")
    return f"{self.__class__.__name__}({', '.join(field_strs)})"

__str__

__str__()

User-friendly string representation of entity.

Source code in type_bridge/models/entity.py
def __str__(self) -> str:
    """User-friendly string representation of entity."""
    # Extract key attributes first
    key_parts = []
    other_parts = []

    for field_name, attr_info in self._owned_attrs.items():
        value = getattr(self, field_name, None)
        if value is None:
            continue

        # Extract actual value from Attribute instance
        display_value = unwrap_attribute(value)

        # Format the field
        field_str = f"{field_name}={display_value}"

        # Separate key attributes
        if attr_info.flags.is_key:
            key_parts.append(field_str)
        else:
            other_parts.append(field_str)

    # Show key attributes first, then others
    all_parts = key_parts + other_parts

    if all_parts:
        return f"{self.get_type_name()}({', '.join(all_parts)})"
    else:
        return f"{self.get_type_name()}()"

Relation

Bases: TypeDBType

Base class for TypeDB relations with Pydantic validation.

Relations can own attributes and have role players. Use TypeFlags to configure type name and abstract status. Supertype is determined automatically from Python inheritance.

This class inherits from TypeDBType and Pydantic's BaseModel, providing: - Automatic validation of attribute values - JSON serialization/deserialization - Type checking and coercion - Field metadata via Pydantic's Field()

Example

class Position(String): pass

class Salary(Integer): pass

class Employment(Relation): flags = TypeFlags(name="employment")

employee: Role[Person] = Role("employee", Person)
employer: Role[Company] = Role("employer", Company)

position: Position
salary: Salary | None

__init_subclass__

__init_subclass__(**kwargs)

Initialize relation subclass.

Source code in type_bridge/models/relation.py
def __init_subclass__(cls, **kwargs: Any) -> None:
    """Initialize relation subclass."""
    super().__init_subclass__(**kwargs)
    logger.debug(f"Initializing Relation subclass: {cls.__name__}")

    from type_bridge.models.schema_scanner import SchemaScanner

    scanner = SchemaScanner(cls)
    cls._roles = scanner.scan_roles()
    cls._owned_attrs = scanner.scan_attributes(is_relation=True)

    from type_bridge.models.registry import ModelRegistry

    ModelRegistry.register_attribute_owners(cls)

__pydantic_init_subclass__ classmethod

__pydantic_init_subclass__(**kwargs)

Called by Pydantic after model class initialization.

This is the right place to restore Role descriptors because: 1. init_subclass runs before Pydantic's metaclass finishes 2. Pydantic removes Role instances from class dict during construction 3. pydantic_init_subclass runs after Pydantic's setup is complete

This restores Role descriptors so class-level access (Employment.employee) returns a RoleRef for type-safe query building.

Source code in type_bridge/models/relation.py
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """Called by Pydantic after model class initialization.

    This is the right place to restore Role descriptors because:
    1. __init_subclass__ runs before Pydantic's metaclass finishes
    2. Pydantic removes Role instances from class __dict__ during construction
    3. __pydantic_init_subclass__ runs after Pydantic's setup is complete

    This restores Role descriptors so class-level access (Employment.employee)
    returns a RoleRef for type-safe query building.
    """
    super().__pydantic_init_subclass__(**kwargs)

    # Restore Role descriptors using type.__setattr__ to bypass any Pydantic interception
    for role_name, role in cls._roles.items():
        type.__setattr__(cls, role_name, role)

get_roles classmethod

get_roles()

Get all roles defined on this relation.

Returns:

Type Description
dict[str, Role]

Dictionary mapping role names to Role instances

Source code in type_bridge/models/relation.py
@classmethod
def get_roles(cls) -> dict[str, Role]:
    """Get all roles defined on this relation.

    Returns:
        Dictionary mapping role names to Role instances
    """
    return cls._roles

to_ast

to_ast(var='$r')

Generate AST InsertClause for this relation instance.

Parameters:

Name Type Description Default
var str

Variable name to use

'$r'

Returns:

Type Description
InsertClause

InsertClause containing statements

Source code in type_bridge/models/relation.py
def to_ast(self, var: str = "$r") -> InsertClause:
    """Generate AST InsertClause for this relation instance.

    Args:
        var: Variable name to use

    Returns:
        InsertClause containing statements
    """
    from type_bridge.query.ast import InsertClause, RelationStatement, Statement
    from type_bridge.query.ast import RolePlayer as AstRolePlayer

    type_name = self.get_type_name()

    # Build role players
    role_players_ast = []
    for role_name, role in self.__class__._roles.items():
        # Get the entity from the instance
        entity_or_list = self.__dict__.get(role_name)
        if entity_or_list is not None:
            # Normalize to list for uniform handling
            entities = entity_or_list if isinstance(entity_or_list, list) else [entity_or_list]
            for i, entity in enumerate(entities):
                # Generate unique variable name for each player
                # Note: This assumes the manager will bind these variables in a match clause
                # or they are already bound. For now, we generate the usage.
                var_name = f"{role_name}_{i}" if len(entities) > 1 else role_name

                # We assume variables are passed in or coordinated.
                # Since to_ast is called on the instance, we need a convention.
                # This implies the Manager/Compiler must coordinate variable names between Match and Insert.
                # For now, let's use a standard derived name format that the Manager can also replicate.
                player_var = f"${var_name}"

                role_players_ast.append(
                    AstRolePlayer(role=role.role_name, player_var=player_var)
                )

    # Collect attribute statements using shared helper from TypeDBType
    inline_attributes = self._build_attribute_statements(var)

    statements: list[Statement] = [
        RelationStatement(
            variable=var,
            type_name=type_name,
            role_players=role_players_ast,
            include_variable=False,  # TypeDB 3.x insert doesn't use variable for relations
            attributes=inline_attributes,
        )
    ]

    return InsertClause(statements=statements)

get_match_clause_info

get_match_clause_info(var_name='$r')

Get match clause info for this relation instance.

Prefers IID-based matching when available (most precise). Falls back to role player matching.

Parameters:

Name Type Description Default
var_name str

Variable name to use in the match clause

'$r'

Returns:

Type Description
MatchClauseInfo

MatchClauseInfo with the match clause and role player clauses

Raises:

Type Description
ValueError

If any role player cannot be identified

Source code in type_bridge/models/relation.py
def get_match_clause_info(self, var_name: str = "$r") -> MatchClauseInfo:
    """Get match clause info for this relation instance.

    Prefers IID-based matching when available (most precise).
    Falls back to role player matching.

    Args:
        var_name: Variable name to use in the match clause

    Returns:
        MatchClauseInfo with the match clause and role player clauses

    Raises:
        ValueError: If any role player cannot be identified
    """
    type_name = self.get_type_name()

    # Prefer IID-based matching when available
    relation_iid = getattr(self, "_iid", None)
    if relation_iid:
        main_clause = f"{var_name} isa {type_name}, iid {relation_iid}"
        return MatchClauseInfo(main_clause=main_clause, extra_clauses=[], var_name=var_name)

    # Fall back to role player matching
    roles = self.__class__._roles
    role_parts = []
    extra_clauses = []

    for role_name, role in roles.items():
        entity_or_list = self.__dict__.get(role_name)
        if entity_or_list is None:
            raise ValueError(f"Role player '{role_name}' is required for matching")

        # Normalize to list for uniform handling
        entities = entity_or_list if isinstance(entity_or_list, list) else [entity_or_list]

        for i, entity in enumerate(entities):
            player_var = f"${role_name}_{i}" if len(entities) > 1 else f"${role_name}"
            role_parts.append(f"{role.role_name}: {player_var}")

            # Get match clause for the role player entity
            player_match = entity.get_match_clause_info(player_var)
            extra_clauses.append(player_match.main_clause)
            extra_clauses.extend(player_match.extra_clauses)

    roles_str = ", ".join(role_parts)
    main_clause = f"{var_name} isa {type_name} ({roles_str})"

    return MatchClauseInfo(
        main_clause=main_clause, extra_clauses=extra_clauses, var_name=var_name
    )

get_match_patterns

get_match_patterns(var_name='$r')

Get AST patterns for matching this relation instance.

Returns a list of patterns: the main RelationPattern plus EntityPatterns for each role player (when matching by role players, not IID).

Parameters:

Name Type Description Default
var_name str

Variable name to use in the pattern

'$r'

Returns:

Type Description
list[Pattern]

List of Pattern AST nodes

Raises:

Type Description
ValueError

If any role player cannot be identified

Source code in type_bridge/models/relation.py
def get_match_patterns(self, var_name: str = "$r") -> list[Pattern]:
    """Get AST patterns for matching this relation instance.

    Returns a list of patterns: the main RelationPattern plus EntityPatterns
    for each role player (when matching by role players, not IID).

    Args:
        var_name: Variable name to use in the pattern

    Returns:
        List of Pattern AST nodes

    Raises:
        ValueError: If any role player cannot be identified
    """
    from type_bridge.query.ast import (
        IidConstraint,
        RelationPattern,
        RolePlayer,
    )

    type_name = self.get_type_name()
    patterns: list[Pattern] = []

    # Prefer IID-based matching when available
    relation_iid = getattr(self, "_iid", None)
    if relation_iid:
        main_pattern = RelationPattern(
            variable=var_name,
            type_name=type_name,
            role_players=[],
            constraints=[IidConstraint(iid=relation_iid)],
        )
        return [main_pattern]

    # Fall back to role player matching
    roles = self.__class__._roles
    role_player_nodes: list[RolePlayer] = []

    for role_name, role in roles.items():
        entity_or_list = self.__dict__.get(role_name)
        if entity_or_list is None:
            raise ValueError(f"Role player '{role_name}' is required for matching")

        # Normalize to list for uniform handling
        entities = entity_or_list if isinstance(entity_or_list, list) else [entity_or_list]

        for i, player in enumerate(entities):
            player_var = f"${role_name}_{i}" if len(entities) > 1 else f"${role_name}"
            role_player_nodes.append(RolePlayer(role=role.role_name, player_var=player_var))

            # Get AST pattern for the role player (could be Entity or Relation)
            if hasattr(player, "get_match_pattern"):
                # Entity: returns single pattern
                patterns.append(player.get_match_pattern(player_var))
            else:
                # Relation: returns list of patterns
                patterns.extend(player.get_match_patterns(player_var))

    # Build main relation pattern
    main_pattern = RelationPattern(
        variable=var_name,
        type_name=type_name,
        role_players=role_player_nodes,
        constraints=[],
    )
    # Insert main pattern first, then role player patterns
    patterns.insert(0, main_pattern)

    return patterns

to_schema_definition classmethod

to_schema_definition()

Generate TypeQL schema definition for this relation.

Returns:

Type Description
str | None

TypeQL schema definition string, or None if this is a base class

Source code in type_bridge/models/relation.py
@classmethod
def to_schema_definition(cls) -> str | None:
    """Generate TypeQL schema definition for this relation.

    Returns:
        TypeQL schema definition string, or None if this is a base class
    """
    from type_bridge.typeql.annotations import (
        format_card_annotation,
        format_type_annotations,
    )

    # Base classes don't appear in TypeDB schema
    if cls.is_base():
        return None

    type_name = cls.get_type_name()
    lines = []

    # Define relation type with supertype from Python inheritance
    # TypeDB 3.x syntax: relation name @abstract, sub parent,
    supertype = cls.get_supertype()
    type_annotations = format_type_annotations(abstract=cls.is_abstract())

    relation_def = f"relation {type_name}"
    if type_annotations:
        relation_def += " " + " ".join(type_annotations)
    if supertype:
        relation_def += f", sub {supertype}"

    lines.append(relation_def)

    # Add roles with optional cardinality constraints
    for role in cls._roles.values():
        role_def = f"    relates {role.role_name}"
        # Add cardinality annotation if not default (1..1)
        if role.cardinality is not None:
            card_annotation = format_card_annotation(role.cardinality.min, role.cardinality.max)
            if card_annotation:
                role_def += f" {card_annotation}"
        lines.append(role_def)

    # Add attribute ownerships using shared helper
    lines.extend(cls._build_owns_lines())

    # Join with commas, but end with semicolon (no comma before semicolon)
    return ",\n".join(lines) + ";"

__repr__

__repr__()

Developer-friendly string representation of relation.

Source code in type_bridge/models/relation.py
def __repr__(self) -> str:
    """Developer-friendly string representation of relation."""
    parts = []
    # Show role players
    for role_name in self._roles:
        player = getattr(self, role_name, None)
        if player is not None:
            parts.append(f"{role_name}={player!r}")
    # Show attributes
    for field_name in self._owned_attrs:
        value = getattr(self, field_name, None)
        if value is not None:
            parts.append(f"{field_name}={value!r}")
    return f"{self.__class__.__name__}({', '.join(parts)})"

__str__

__str__()

User-friendly string representation of relation.

Source code in type_bridge/models/relation.py
def __str__(self) -> str:
    """User-friendly string representation of relation."""
    parts = []

    # Show role players first (more important)
    role_parts = []
    for role_name, role in self._roles.items():
        player = getattr(self, role_name, None)
        # Only show role players that are actual entity instances (have _owned_attrs)
        if player is not None and hasattr(player, "_owned_attrs"):
            # Get a simple representation of the player (their key attribute)
            key_info = extract_entity_key(player)
            if key_info:
                _, _, raw_value = key_info
                role_parts.append(f"{role_name}={raw_value}")

    if role_parts:
        parts.append("(" + ", ".join(role_parts) + ")")

    # Show attributes
    attr_parts = []
    for field_name, attr_info in self._owned_attrs.items():
        value = getattr(self, field_name, None)
        if value is None:
            continue

        # Extract actual value from Attribute instance
        display_value = unwrap_attribute(value)

        attr_parts.append(f"{field_name}={display_value}")

    if attr_parts:
        parts.append("[" + ", ".join(attr_parts) + "]")

    if parts:
        return f"{self.get_type_name()}{' '.join(parts)}"
    else:
        return f"{self.get_type_name()}()"

Role

Role(role_name, player_type, *additional_player_types, cardinality=None)

Descriptor for relation role players with type safety.

Generic type T represents the type (Entity or Relation) that can play this role. TypeDB supports both entities and relations as role players.

Example

Entity as role player

class Employment(Relation): employee: Role[Person] = Role("employee", Person) employer: Role[Company] = Role("employer", Company)

Relation as role player

class Permission(Relation): permitted_subject: Role[Subject] = Role("permitted_subject", Subject) permitted_access: Role[Access] = Role("permitted_access", Access) # Access is a Relation

Initialize a role.

Parameters:

Name Type Description Default
role_name str

The name of the role in TypeDB

required
player_type type[T]

The type (Entity or Relation) that can play this role

required
additional_player_types type[T]

Optional additional types allowed to play this role

()
cardinality Card | None

Optional cardinality constraint for the role (e.g., Card(2, 2) for exactly 2)

None

Raises:

Type Description
ReservedWordError

If role_name is a TypeQL reserved word

TypeError

If player type is a library base class (Entity, Relation, TypeDBType)

Source code in type_bridge/models/role.py
def __init__(
    self,
    role_name: str,
    player_type: type[T],
    *additional_player_types: type[T],
    cardinality: Card | None = None,
):
    """Initialize a role.

    Args:
        role_name: The name of the role in TypeDB
        player_type: The type (Entity or Relation) that can play this role
        additional_player_types: Optional additional types allowed to play this role
        cardinality: Optional cardinality constraint for the role (e.g., Card(2, 2) for exactly 2)

    Raises:
        ReservedWordError: If role_name is a TypeQL reserved word
        TypeError: If player type is a library base class (Entity, Relation, TypeDBType)
    """
    # Validate role name doesn't conflict with TypeQL reserved words
    validate_reserved_word(role_name, "role")

    self.role_name = role_name
    self.cardinality = cardinality
    unique_types: list[type[T]] = []
    for typ in (player_type, *additional_player_types):
        # Validate that we're not using library base classes directly
        self._validate_player_type(typ)
        if typ not in unique_types:
            unique_types.append(typ)

    if not unique_types:
        # Should be impossible because player_type is required, but keeps type checkers happy
        raise ValueError("Role requires at least one player type")

    self.player_entity_types: tuple[type[T], ...] = tuple(unique_types)
    first_entity_type = unique_types[0]
    self.player_entity_type = first_entity_type
    # Get type name from the entity class(es)
    self.player_types = tuple(pt.get_type_name() for pt in self.player_entity_types)
    self.player_type = first_entity_type.get_type_name()
    self.attr_name: str | None = None

is_multi_player property

is_multi_player

Check if this role allows multiple players.

Returns True if cardinality allows more than one player (max > 1 or unbounded).

__set_name__

__set_name__(owner, name)

Called when role is assigned to a class.

Source code in type_bridge/models/role.py
def __set_name__(self, owner: type, name: str) -> None:
    """Called when role is assigned to a class."""
    self.attr_name = name

__get__

__get__(obj: None, objtype: type) -> RoleRef[T]
__get__(obj: Any, objtype: type) -> T
__get__(obj, objtype)

Get role player from instance or RoleRef from class.

When accessed from the class (obj is None), returns RoleRef for type-safe query building (e.g., Employment.employee.age.gt(Age(30))). When accessed from an instance, returns the entity playing the role.

Source code in type_bridge/models/role.py
def __get__(self, obj: Any, objtype: type) -> T | RoleRef[T]:
    """Get role player from instance or RoleRef from class.

    When accessed from the class (obj is None), returns RoleRef for
    type-safe query building (e.g., Employment.employee.age.gt(Age(30))).
    When accessed from an instance, returns the entity playing the role.
    """
    if obj is None:
        from type_bridge.fields.role import RoleRef

        return RoleRef(
            role_name=self.role_name,
            player_types=self.player_entity_types,
        )
    return obj.__dict__.get(self.attr_name)

__set__

__set__(obj, value)

Set role player(s) on instance.

For roles with cardinality > 1, accepts a list of entities. For single-player roles, accepts a single entity.

Source code in type_bridge/models/role.py
def __set__(self, obj: Any, value: T | list[T]) -> None:
    """Set role player(s) on instance.

    For roles with cardinality > 1, accepts a list of entities.
    For single-player roles, accepts a single entity.
    """
    if isinstance(value, list):
        # Multi-player role - validate each item in the list
        if not self.is_multi_player:
            raise TypeError(
                f"Role '{self.role_name}' does not allow multiple players. "
                f"Use cardinality=Card(...) to enable multi-player roles."
            )
        for item in value:
            if not isinstance(item, self.player_entity_types):
                allowed = ", ".join(pt.__name__ for pt in self.player_entity_types)
                raise TypeError(
                    f"Role '{self.role_name}' expects types ({allowed}), "
                    f"got {type(item).__name__} in list"
                )
        obj.__dict__[self.attr_name] = value
    else:
        # Single player
        if not isinstance(value, self.player_entity_types):
            allowed = ", ".join(pt.__name__ for pt in self.player_entity_types)
            raise TypeError(
                f"Role '{self.role_name}' expects types ({allowed}), got {type(value).__name__}"
            )
        obj.__dict__[self.attr_name] = value

multi classmethod

multi(role_name, player_type, *additional_player_types, cardinality=None)

Define a role playable by multiple entity types.

Parameters:

Name Type Description Default
role_name str

The name of the role in TypeDB

required
player_type type[T]

The first entity type that can play this role

required
additional_player_types type[T]

Additional entity types allowed to play this role

()
cardinality Card | None

Optional cardinality constraint for the role

None
Source code in type_bridge/models/role.py
@classmethod
def multi(
    cls,
    role_name: str,
    player_type: type[T],
    *additional_player_types: type[T],
    cardinality: Card | None = None,
) -> Role[T]:
    """Define a role playable by multiple entity types.

    Args:
        role_name: The name of the role in TypeDB
        player_type: The first entity type that can play this role
        additional_player_types: Additional entity types allowed to play this role
        cardinality: Optional cardinality constraint for the role
    """
    if len((player_type, *additional_player_types)) < 2:
        raise ValueError("Role.multi requires at least two player types")
    return cls(role_name, player_type, *additional_player_types, cardinality=cardinality)

__get_pydantic_core_schema__ classmethod

__get_pydantic_core_schema__(source_type, handler)

Define how Pydantic should validate Role fields.

Accepts either: - A single entity instance for single-player roles - A list of entity instances for multi-player roles (cardinality > 1)

Uses a custom validator that checks class names instead of isinstance, to handle generated code in different modules where the same class name exists but as a different Python object.

Source code in type_bridge/models/role.py
@classmethod
def __get_pydantic_core_schema__(
    cls, source_type: Any, handler: GetCoreSchemaHandler
) -> CoreSchema:
    """Define how Pydantic should validate Role fields.

    Accepts either:
    - A single entity instance for single-player roles
    - A list of entity instances for multi-player roles (cardinality > 1)

    Uses a custom validator that checks class names instead of isinstance,
    to handle generated code in different modules where the same class name
    exists but as a different Python object.
    """
    import types

    from pydantic_core import core_schema

    # Extract the entity type(s) from Role[T]
    # Handle both Role[Entity] and Role[Entity1 | Entity2] unions
    allowed_names: set[str] = set()

    if hasattr(source_type, "__args__") and source_type.__args__:
        for arg in source_type.__args__:
            # Check if it's a union type (e.g., Document | Email)
            if isinstance(arg, types.UnionType) or (
                hasattr(arg, "__origin__") and arg.__origin__ is type(int | str)
            ):
                # It's a union - get the individual types
                if hasattr(arg, "__args__"):
                    for union_arg in arg.__args__:
                        if hasattr(union_arg, "__name__"):
                            allowed_names.add(union_arg.__name__)
            elif hasattr(arg, "__name__"):
                allowed_names.add(arg.__name__)

    def validate_role_player(value: Any) -> Any:
        """Validate that value is an allowed entity type by class name.

        Checks the full inheritance chain (MRO) to support subclasses.
        E.g., if Document is allowed and Report is a subclass of Document,
        Report instances are accepted.
        """
        if not allowed_names:
            # No type constraints - allow anything
            return value

        def is_allowed_type(obj: Any) -> bool:
            """Check if obj's class or any base class matches allowed names."""
            # Check entire MRO (Method Resolution Order) for inheritance support
            for cls in type(obj).__mro__:
                if cls.__name__ in allowed_names:
                    return True
            return False

        if isinstance(value, list):
            # List of entities for multi-player roles
            for item in value:
                if not is_allowed_type(item):
                    raise ValueError(
                        f"Expected one of {allowed_names}, got {type(item).__name__}"
                    )
            return value
        else:
            # Single entity
            if not is_allowed_type(value):
                raise ValueError(f"Expected one of {allowed_names}, got {type(value).__name__}")
            return value

    return core_schema.no_info_plain_validator_function(validate_role_player)

TypeDBType

Bases: BaseModel, ABC

Abstract base class for TypeDB entities and relations.

This class provides common functionality for both Entity and Relation types, including type name management, abstract/base flags, and attribute ownership.

Subclasses must implement: - get_supertype(): Get parent type in TypeDB hierarchy - to_schema_definition(): Generate TypeQL schema definition - to_insert_query(): Generate TypeQL insert query for instances

manager classmethod

manager(connection)

Create a CRUD manager for this type.

Parameters:

Name Type Description Default
connection Connection

Database, Transaction, or TransactionContext

required

Returns:

Type Description
TypeDBManager[Self]

Manager instance for this type

Source code in type_bridge/models/base.py
@classmethod
def manager(cls, connection: Connection) -> TypeDBManager[Self]:
    """Create a CRUD manager for this type.

    Args:
        connection: Database, Transaction, or TransactionContext

    Returns:
        Manager instance for this type
    """
    manager_class = cls._get_manager_class()
    return cast("TypeDBManager[Self]", manager_class(connection, cls))

insert

insert(connection)

Insert this instance into the database.

Parameters:

Name Type Description Default
connection Connection

Database, Transaction, or TransactionContext

required

Returns:

Type Description
Self

Self for chaining

Source code in type_bridge/models/base.py
def insert(self, connection: Connection) -> Self:
    """Insert this instance into the database.

    Args:
        connection: Database, Transaction, or TransactionContext

    Returns:
        Self for chaining
    """
    self.manager(connection).insert(self)
    return self

delete

delete(connection)

Delete this instance from the database.

Parameters:

Name Type Description Default
connection Connection

Database, Transaction, or TransactionContext

required

Returns:

Type Description
Self

Self for chaining

Source code in type_bridge/models/base.py
def delete(self, connection: Connection) -> Self:
    """Delete this instance from the database.

    Args:
        connection: Database, Transaction, or TransactionContext

    Returns:
        Self for chaining
    """
    self.manager(connection).delete(self)
    return self

has classmethod

has(connection, attr_class, value=None)

Find all instances of this class (and its subtypes) that own attr_class.

Behaviour depends on the receiver:

  • Entity.has(...) / Relation.has(...): cross-type lookup — returns instances across all concrete types of that kind.
  • <ConcreteType>.has(...) / <AbstractBase>.has(...): narrowed lookup — restricted to that type and its TypeDB subtypes via isa polymorphism.

Returned relation instances always have their role players hydrated (in addition to attributes). This is implemented by re-fetching each relation through concrete_class.manager(connection).get(_iid=...) after the initial wildcard query, so the relation path is N+1 in the number of returned relations. Entity lookups remain single-query.

Parameters:

Name Type Description Default
connection Connection

Database, Transaction, or TransactionContext.

required
attr_class type[Attribute]

Attribute type to search for (e.g. Name).

required
value Any | None

Optional filter — raw value, Attribute instance, or Expression (e.g. Name.gt(Name("B"))).

None

Returns:

Type Description
list[TypeDBType]

List of hydrated model instances (may contain mixed concrete types

list[TypeDBType]

when called on the base Entity / Relation class or an

list[TypeDBType]

abstract base subclass).

Raises:

Type Description
TypeError

If called directly on :class:TypeDBType (use :class:Entity or :class:Relation instead).

Source code in type_bridge/models/base.py
@classmethod
def has(
    cls,
    connection: Connection,
    attr_class: type[Attribute],
    value: Any | None = None,
) -> list[TypeDBType]:
    """Find all instances of this class (and its subtypes) that own *attr_class*.

    Behaviour depends on the receiver:

    * ``Entity.has(...)`` / ``Relation.has(...)``: cross-type lookup — returns
      instances across **all** concrete types of that kind.
    * ``<ConcreteType>.has(...)`` / ``<AbstractBase>.has(...)``: narrowed
      lookup — restricted to that type and its TypeDB subtypes via ``isa``
      polymorphism.

    Returned relation instances always have their role players hydrated
    (in addition to attributes). This is implemented by re-fetching each
    relation through ``concrete_class.manager(connection).get(_iid=...)``
    after the initial wildcard query, so the relation path is N+1 in the
    number of returned relations. Entity lookups remain single-query.

    Args:
        connection: Database, Transaction, or TransactionContext.
        attr_class: Attribute type to search for (e.g. ``Name``).
        value: Optional filter — raw value, Attribute instance,
               or Expression (e.g. ``Name.gt(Name("B"))``).

    Returns:
        List of hydrated model instances (may contain mixed concrete types
        when called on the base ``Entity`` / ``Relation`` class or an
        abstract base subclass).

    Raises:
        TypeError: If called directly on :class:`TypeDBType` (use
            :class:`Entity` or :class:`Relation` instead).
    """
    from type_bridge.crud.has_lookup import has_lookup
    from type_bridge.models.entity import Entity
    from type_bridge.models.relation import Relation

    if cls is TypeDBType:
        raise TypeError("has() must be called on Entity or Relation, not TypeDBType directly")

    if issubclass(cls, Entity):
        kind: Literal["entity", "relation"] = "entity"
        base_cls: type[TypeDBType] = Entity
    elif issubclass(cls, Relation):
        kind = "relation"
        base_cls = Relation
    else:
        raise TypeError(f"has() requires an Entity or Relation class, got {cls.__name__}")

    # Narrow to the concrete (or abstract base) type when the caller is
    # not the bare Entity / Relation class. TypeDB's `isa` is polymorphic,
    # so subtypes of an abstract base are matched automatically.
    narrow_type = None if cls is base_cls else cls.get_type_name()

    return has_lookup(connection, attr_class, value, kind=kind, type_name=narrow_type)

__init_subclass__

__init_subclass__()

Called when a TypeDBType subclass is created.

Source code in type_bridge/models/base.py
def __init_subclass__(cls) -> None:
    """Called when a TypeDBType subclass is created."""
    super().__init_subclass__()

    # Get TypeFlags if defined, otherwise create new default flags
    # Check if flags is defined directly on this class (not inherited)
    if "flags" in cls.__dict__ and isinstance(cls.__dict__["flags"], TypeFlags):
        # Explicitly set flags on this class
        cls._flags = cls.__dict__["flags"]
    else:
        # No explicit flags on this class - create new default flags
        # This ensures each subclass gets its own flags instance
        cls._flags = TypeFlags()

    # Validate type name doesn't conflict with TypeDB built-ins
    # Skip validation for:
    # 1. Base classes that won't appear in schema (base=True)
    # 2. The abstract base Entity and Relation classes themselves
    is_base_entity_or_relation = cls.__name__ in ("Entity", "Relation") and cls.__module__ in (
        "type_bridge.models",
        "type_bridge.models.entity",
        "type_bridge.models.relation",
    )
    if not cls._flags.base and not is_base_entity_or_relation:
        type_name = cls._flags.name or format_type_name(cls.__name__, cls._flags.case)
        validate_type_name(type_name, cls.__name__, cls._type_context)

    # Register model in the central registry
    ModelRegistry.register(cls)

__pydantic_init_subclass__ classmethod

__pydantic_init_subclass__(**kwargs)

Called by Pydantic after model class initialization.

Injects FieldDescriptor instances for class-level query access. This runs after Pydantic's setup is complete, so descriptors won't be removed.

Example

Person.age # Returns FieldRef for query building (class-level access) person.age # Returns attribute value (instance-level access)

Source code in type_bridge/models/base.py
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """Called by Pydantic after model class initialization.

    Injects FieldDescriptor instances for class-level query access.
    This runs after Pydantic's setup is complete, so descriptors won't be removed.

    Example:
        Person.age  # Returns FieldRef for query building (class-level access)
        person.age  # Returns attribute value (instance-level access)
    """
    super().__pydantic_init_subclass__(**kwargs)

    from type_bridge.fields import FieldDescriptor

    # Inject FieldDescriptors for class-level query access
    for field_name, attr_info in cls._owned_attrs.items():
        descriptor = FieldDescriptor(field_name=field_name, attr_type=attr_info.typ)
        type.__setattr__(cls, field_name, descriptor)

model_copy

model_copy(*, update=None, deep=False)

Override model_copy to ensure raw values are wrapped in Attribute instances.

Pydantic's model_copy bypasses validators even with revalidate_instances='always', so we pre-wrap values in the update dict before copying. Also preserves _iid from original using Pydantic's pydantic_private.

Source code in type_bridge/models/base.py
def model_copy(self, *, update: Mapping[str, Any] | None = None, deep: bool = False):
    """Override model_copy to ensure raw values are wrapped in Attribute instances.

    Pydantic's model_copy bypasses validators even with revalidate_instances='always',
    so we pre-wrap values in the update dict before copying.
    Also preserves _iid from original using Pydantic's __pydantic_private__.
    """
    # Preserve _iid before copy
    preserved_iid = getattr(self, "_iid", None)

    # Pre-wrap values in update dict before calling super()
    wrapped_update: dict[str, Any] | None = None
    if update:
        wrapped_update = {}
        owned_attrs = self.__class__.get_owned_attributes()
        for key, value in update.items():
            if key in owned_attrs and value is not None:
                attr_info = owned_attrs[key]
                attr_class = attr_info.typ
                if isinstance(value, list):
                    wrapped_update[key] = [
                        item if isinstance(item, attr_class) else attr_class(item)
                        for item in value
                    ]
                elif not isinstance(value, attr_class):
                    wrapped_update[key] = attr_class(value)
                else:
                    wrapped_update[key] = value
            else:
                wrapped_update[key] = value

    # Call parent model_copy with pre-wrapped update
    copied = super().model_copy(update=wrapped_update, deep=deep)

    # Restore _iid using Pydantic's official private attribute storage
    private = copied.__pydantic_private__
    if preserved_iid is not None and private is not None and private.get("_iid") is None:
        private["_iid"] = preserved_iid

    return copied

get_type_name classmethod

get_type_name()

Get the TypeDB type name for this type.

If name is explicitly set in TypeFlags, it is used as-is. Otherwise, the class name is formatted according to the case parameter.

Source code in type_bridge/models/base.py
@classmethod
def get_type_name(cls) -> str:
    """Get the TypeDB type name for this type.

    If name is explicitly set in TypeFlags, it is used as-is.
    Otherwise, the class name is formatted according to the case parameter.
    """
    if cls._flags.name:
        return cls._flags.name
    return format_type_name(cls.__name__, cls._flags.case)

get_supertype classmethod

get_supertype()

Get the supertype from Python inheritance, skipping base classes.

Base classes (with base=True) are Python-only and don't appear in TypeDB schema. This method skips them when determining the TypeDB supertype.

Returns:

Type Description
str | None

Type name of the parent class, or None if direct subclass

Source code in type_bridge/models/base.py
@classmethod
def get_supertype(cls) -> str | None:
    """Get the supertype from Python inheritance, skipping base classes.

    Base classes (with base=True) are Python-only and don't appear in TypeDB schema.
    This method skips them when determining the TypeDB supertype.

    Returns:
        Type name of the parent class, or None if direct subclass
    """
    base_class = cls._get_base_type_class()
    for base in cls.__bases__:
        if base is not base_class and issubclass(base, base_class):
            # Skip base classes - they don't appear in TypeDB schema
            if base.is_base():
                # Recursively find the first non-base parent
                return base.get_supertype()
            return base.get_type_name()
    return None

is_abstract classmethod

is_abstract()

Check if this is an abstract type.

Source code in type_bridge/models/base.py
@classmethod
def is_abstract(cls) -> bool:
    """Check if this is an abstract type."""
    return cls._flags.abstract

is_base classmethod

is_base()

Check if this is a Python base class (not in TypeDB schema).

Source code in type_bridge/models/base.py
@classmethod
def is_base(cls) -> bool:
    """Check if this is a Python base class (not in TypeDB schema)."""
    return cls._flags.base

get_owned_attributes classmethod

get_owned_attributes()

Get attributes owned directly by this type (not inherited).

Returns:

Type Description
dict[str, ModelAttrInfo]

Dictionary mapping field names to ModelAttrInfo (typ + flags)

Source code in type_bridge/models/base.py
@classmethod
def get_owned_attributes(cls) -> dict[str, ModelAttrInfo]:
    """Get attributes owned directly by this type (not inherited).

    Returns:
        Dictionary mapping field names to ModelAttrInfo (typ + flags)
    """
    return cls._owned_attrs.copy()

get_all_attributes classmethod

get_all_attributes()

Get all attributes including inherited ones.

Traverses the class hierarchy to collect all owned attributes, including those from parent Entity/Relation classes.

Returns:

Type Description
dict[str, ModelAttrInfo]

Dictionary mapping field names to ModelAttrInfo (typ + flags)

Source code in type_bridge/models/base.py
@classmethod
def get_all_attributes(cls) -> dict[str, ModelAttrInfo]:
    """Get all attributes including inherited ones.

    Traverses the class hierarchy to collect all owned attributes,
    including those from parent Entity/Relation classes.

    Returns:
        Dictionary mapping field names to ModelAttrInfo (typ + flags)
    """
    all_attrs: dict[str, ModelAttrInfo] = {}

    # Traverse MRO in reverse to get parent attributes first
    # Child attributes will override parent attributes with same name
    for base in reversed(cls.__mro__):
        if hasattr(base, "_owned_attrs") and isinstance(base._owned_attrs, dict):
            all_attrs.update(dict(base._owned_attrs))

    return all_attrs

get_polymorphic_attributes classmethod

get_polymorphic_attributes()

Get all attributes including those from registered subtypes.

For polymorphic queries where the base class is used but concrete subtypes may be returned, this method collects attributes from all known subtypes so the query can fetch all possible attributes.

Returns:

Type Description
dict[str, ModelAttrInfo]

Dictionary mapping field names to ModelAttrInfo, including

dict[str, ModelAttrInfo]

attributes from all registered subtypes.

Source code in type_bridge/models/base.py
@classmethod
def get_polymorphic_attributes(cls) -> dict[str, ModelAttrInfo]:
    """Get all attributes including those from registered subtypes.

    For polymorphic queries where the base class is used but concrete
    subtypes may be returned, this method collects attributes from all
    known subtypes so the query can fetch all possible attributes.

    Returns:
        Dictionary mapping field names to ModelAttrInfo, including
        attributes from all registered subtypes.
    """
    # Start with this class's attributes (including inherited)
    all_attrs = cls.get_all_attributes()

    # Recursively collect attributes from all subclasses
    def collect_subclass_attrs(klass: type[TypeDBType]) -> None:
        for subclass in klass.__subclasses__():
            # Skip if subclass is a base class (abstract, Python-only)
            if hasattr(subclass, "is_base") and subclass.is_base():
                continue

            # Get subclass attributes and merge (subclass attrs take precedence)
            if hasattr(subclass, "get_all_attributes"):
                subclass_attrs = subclass.get_all_attributes()
                for field_name, attr_info in subclass_attrs.items():
                    if field_name not in all_attrs:
                        all_attrs[field_name] = attr_info

            # Recurse into further subclasses
            collect_subclass_attrs(subclass)

    collect_subclass_attrs(cls)
    return all_attrs

to_schema_definition abstractmethod classmethod

to_schema_definition()

Generate TypeQL schema definition for this type.

Returns:

Type Description
str | None

TypeQL schema definition string, or None if this is a base class

Source code in type_bridge/models/base.py
@classmethod
@abstractmethod
def to_schema_definition(cls) -> str | None:
    """Generate TypeQL schema definition for this type.

    Returns:
        TypeQL schema definition string, or None if this is a base class
    """
    ...

get_match_clause_info abstractmethod

get_match_clause_info(var_name='$x')

Get information to build a TypeQL match clause for this instance.

Used by TypeDBManager for delete/update operations. Returns IID-based matching when available, otherwise falls back to type-specific identification (key attributes for entities, role players for relations).

Parameters:

Name Type Description Default
var_name str

Variable name to use in the match clause

'$x'

Returns:

Type Description
MatchClauseInfo

MatchClauseInfo with main clause, extra clauses, and variable name

Raises:

Type Description
ValueError

If instance cannot be identified (no IID and no keys/role players)

Source code in type_bridge/models/base.py
@abstractmethod
def get_match_clause_info(self, var_name: str = "$x") -> MatchClauseInfo:
    """Get information to build a TypeQL match clause for this instance.

    Used by TypeDBManager for delete/update operations. Returns IID-based
    matching when available, otherwise falls back to type-specific
    identification (key attributes for entities, role players for relations).

    Args:
        var_name: Variable name to use in the match clause

    Returns:
        MatchClauseInfo with main clause, extra clauses, and variable name

    Raises:
        ValueError: If instance cannot be identified (no IID and no keys/role players)
    """
    ...

to_ast abstractmethod

to_ast(var='$x')

Generate AST InsertClause for this instance.

Parameters:

Name Type Description Default
var str

Variable name to use

'$x'

Returns:

Type Description
Any

InsertClause containing statements

Source code in type_bridge/models/base.py
@abstractmethod
def to_ast(self, var: str = "$x") -> Any:
    """Generate AST InsertClause for this instance.

    Args:
        var: Variable name to use

    Returns:
        InsertClause containing statements
    """
    ...

to_insert_query

to_insert_query(var='$e')

Generate TypeQL insert query string for this instance.

This is a convenience method that uses the AST-based generation internally and compiles it to a string.

Parameters:

Name Type Description Default
var str

Variable name to use (default: "$e")

'$e'

Returns:

Type Description
str

TypeQL insert query string

Source code in type_bridge/models/base.py
def to_insert_query(self, var: str = "$e") -> str:
    """Generate TypeQL insert query string for this instance.

    This is a convenience method that uses the AST-based generation
    internally and compiles it to a string.

    Args:
        var: Variable name to use (default: "$e")

    Returns:
        TypeQL insert query string
    """
    from type_bridge.query.compiler import QueryCompiler

    insert_clause = self.to_ast(var=var)
    return QueryCompiler().compile(insert_clause)

ProxyDatabase

ProxyDatabase(proxy_url='http://localhost:8080', database='typedb', timeout=30)

Drop-in replacement for Database that routes queries through a type-bridge proxy server.

Instead of connecting directly to TypeDB, all queries are sent as HTTP requests to the proxy server's REST API. The proxy handles validation, interceptors (audit log, etc.), and forwarding to TypeDB.

Source code in type_bridge/proxy.py
def __init__(
    self,
    proxy_url: str = "http://localhost:8080",
    database: str = "typedb",
    timeout: int = 30,
):
    self.proxy_url = proxy_url.rstrip("/")
    self.database_name = database
    self.timeout = timeout
    self._connected = False

connect

connect()

Verify the proxy server is reachable via health check.

Source code in type_bridge/proxy.py
def connect(self) -> None:
    """Verify the proxy server is reachable via health check."""
    try:
        health = self._http_get("/health")
        self._connected = True
        logger.info(
            "Connected to proxy at %s (version: %s)",
            self.proxy_url,
            health.get("version", "unknown"),
        )
    except Exception as e:
        raise ConnectionError(f"Failed to connect to proxy at {self.proxy_url}: {e}") from e

close

close()

Close the proxy connection (clears connected state).

Source code in type_bridge/proxy.py
def close(self) -> None:
    """Close the proxy connection (clears connected state)."""
    self._connected = False
    logger.debug("Proxy connection closed: %s", self.proxy_url)

transaction

transaction(transaction_type='read')

Create a proxy transaction context.

Parameters:

Name Type Description Default
transaction_type Any

Transaction type string ("read", "write", "schema") or TransactionType enum value.

'read'
Source code in type_bridge/proxy.py
def transaction(self, transaction_type: Any = "read") -> ProxyTransactionContext:
    """Create a proxy transaction context.

    Args:
        transaction_type: Transaction type string ("read", "write", "schema")
            or TransactionType enum value.
    """
    if isinstance(transaction_type, str):
        tx_type = transaction_type
    else:
        # Handle TransactionType enum from typedb.driver
        name = getattr(transaction_type, "name", str(transaction_type))
        tx_type = name.lower()
    return ProxyTransactionContext(self, tx_type)

execute_query

execute_query(query, transaction_type='read')

Execute a query through the proxy and return results.

Source code in type_bridge/proxy.py
def execute_query(self, query: str, transaction_type: str = "read") -> list[dict[str, Any]]:
    """Execute a query through the proxy and return results."""
    logger.debug("Executing query via proxy (type=%s, %d chars)", transaction_type, len(query))
    results = self._send_raw_query(query, transaction_type)
    return results if isinstance(results, list) else [results]

get_schema

get_schema()

Fetch the loaded schema from the proxy server.

Source code in type_bridge/proxy.py
def get_schema(self) -> str:
    """Fetch the loaded schema from the proxy server."""
    resp = self._http_get("/schema")
    return json.dumps(resp) if isinstance(resp, dict) else str(resp)

ProxyError

ProxyError(message, code='UNKNOWN', details=None)

Bases: Exception

Error returned by the proxy server.

Source code in type_bridge/proxy.py
def __init__(self, message: str, code: str = "UNKNOWN", details: Any = None):
    super().__init__(message)
    self.code = code
    self.details = details

Query

Query()

Builder for TypeQL queries.

Initialize query builder.

Source code in type_bridge/query/__init__.py
def __init__(self):
    """Initialize query builder."""
    self.match_clause = MatchClause(patterns=[])
    self.delete_clause = DeleteClause(statements=[])
    self.insert_clause = InsertClause(statements=[])
    self.fetch_clause = FetchClause(items=[])

    # Modifiers
    self.sort_clauses: list[tuple[str, str]] = []
    self.offset_val: int | None = None
    self.limit_val: int | None = None

    self.compiler = QueryCompiler()

match

match(pattern)

Add a match clause.

Parameters:

Name Type Description Default
pattern str

TypeQL match pattern

required

Returns:

Type Description
Query

Self for chaining

Source code in type_bridge/query/__init__.py
def match(self, pattern: str) -> Query:
    """Add a match clause.

    Args:
        pattern: TypeQL match pattern

    Returns:
        Self for chaining
    """
    if pattern:
        # Clean string pattern
        clean_pattern = pattern.strip().rstrip(";")
        # RawPattern doesn't strictly require 'variable' if handled by compiler as raw string
        # But we made it subclass Pattern again with 'variable' moved to subclasses.
        # RawPattern definition: content: str
        self.match_clause.patterns.append(RawPattern(content=clean_pattern))
    return self

fetch

fetch(variable, *attributes)

Add variables and attributes to fetch.

In TypeQL 3.x, fetch uses the syntax: fetch { $e.* } (fetch all attributes)

Parameters:

Name Type Description Default
variable str

Variable name to fetch (e.g., "$e")

required
attributes str

Not used in TypeQL 3.x (kept for API compatibility)

()

Returns:

Type Description
Query

Self for chaining

Example

query.fetch("$e") # Fetches all attributes

Source code in type_bridge/query/__init__.py
def fetch(self, variable: str, *attributes: str) -> Query:
    """Add variables and attributes to fetch.

    In TypeQL 3.x, fetch uses the syntax:
    fetch { $e.* }  (fetch all attributes)

    Args:
        variable: Variable name to fetch (e.g., "$e")
        attributes: Not used in TypeQL 3.x (kept for API compatibility)

    Returns:
        Self for chaining

    Example:
        query.fetch("$e")  # Fetches all attributes
    """
    # For TypeQL 3.x, default to wildcard fetch
    # Use variable name (without $) as the key
    key = variable.lstrip("$")
    self.fetch_clause.items.append(FetchWildcard(key=key, var=variable))
    return self

delete

delete(pattern)

Add a delete clause.

Parameters:

Name Type Description Default
pattern str

TypeQL delete pattern

required

Returns:

Type Description
Query

Self for chaining

Source code in type_bridge/query/__init__.py
def delete(self, pattern: str) -> Query:
    """Add a delete clause.

    Args:
        pattern: TypeQL delete pattern

    Returns:
        Self for chaining
    """
    if pattern:
        clean_pattern = pattern.strip().rstrip(";")
        self.delete_clause.statements.append(RawStatement(content=clean_pattern))
    return self

insert

insert(pattern)

Add an insert clause.

Parameters:

Name Type Description Default
pattern str

TypeQL insert pattern

required

Returns:

Type Description
Query

Self for chaining

Source code in type_bridge/query/__init__.py
def insert(self, pattern: str) -> Query:
    """Add an insert clause.

    Args:
        pattern: TypeQL insert pattern

    Returns:
        Self for chaining
    """
    if pattern:
        clean_pattern = pattern.strip().rstrip(";")
        self.insert_clause.statements.append(RawStatement(content=clean_pattern))
    return self

limit

limit(limit)

Set query limit.

Parameters:

Name Type Description Default
limit int

Maximum number of results

required

Returns:

Type Description
Query

Self for chaining

Source code in type_bridge/query/__init__.py
def limit(self, limit: int) -> Query:
    """Set query limit.

    Args:
        limit: Maximum number of results

    Returns:
        Self for chaining
    """
    self.limit_val = limit
    return self

offset

offset(offset)

Set query offset.

Parameters:

Name Type Description Default
offset int

Number of results to skip

required

Returns:

Type Description
Query

Self for chaining

Source code in type_bridge/query/__init__.py
def offset(self, offset: int) -> Query:
    """Set query offset.

    Args:
        offset: Number of results to skip

    Returns:
        Self for chaining
    """
    self.offset_val = offset
    return self

sort

sort(variable, direction='asc')

Add sorting to the query.

Parameters:

Name Type Description Default
variable str

Variable to sort by

required
direction str

Sort direction ("asc" or "desc")

'asc'

Returns:

Type Description
Query

Self for chaining

Example

Query().match("$p isa person").fetch("$p").sort("$p", "asc")

Source code in type_bridge/query/__init__.py
def sort(self, variable: str, direction: str = "asc") -> Query:
    """Add sorting to the query.

    Args:
        variable: Variable to sort by
        direction: Sort direction ("asc" or "desc")

    Returns:
        Self for chaining

    Example:
        Query().match("$p isa person").fetch("$p").sort("$p", "asc")
    """
    if direction not in ("asc", "desc"):
        raise ValueError(f"Invalid sort direction: {direction}")
    self.sort_clauses.append((variable, direction))
    return self

build

build()

Build the final TypeQL query string.

Returns:

Type Description
str

Complete TypeQL query

Source code in type_bridge/query/__init__.py
def build(self) -> str:
    """Build the final TypeQL query string.

    Returns:
        Complete TypeQL query
    """
    logger.debug("Building TypeQL query")
    parts = []

    # Match clause
    if self.match_clause.patterns:
        parts.append(self.compiler.compile(self.match_clause))

    # Delete clause
    if self.delete_clause.statements:
        parts.append(self.compiler.compile(self.delete_clause))

    # Insert clause
    if self.insert_clause.statements:
        parts.append(self.compiler.compile(self.insert_clause))

    # Sort, offset, and limit modifiers (must come BEFORE fetch in TypeQL 3.x)
    modifier_parts = []
    if self.sort_clauses:
        sort_items = [f"{var} {dir}" for var, dir in self.sort_clauses]
        modifier_parts.append(f"sort {', '.join(sort_items)};")

    # Order matters: form modifiers, put offset then limit
    if self.offset_val is not None:
        modifier_parts.append(f"offset {self.offset_val};")
    if self.limit_val is not None:
        modifier_parts.append(f"limit {self.limit_val};")

    if modifier_parts:
        parts.append("\n".join(modifier_parts))

    # Fetch clause
    if self.fetch_clause.items:
        parts.append(self.compiler.compile(self.fetch_clause))

    query = "\n".join(parts)
    logger.debug(f"Built query: {query}")
    return query

__str__

__str__()

String representation of query.

Source code in type_bridge/query/__init__.py
def __str__(self) -> str:
    """String representation of query."""
    return self.build()

QueryBuilder

Helper class for building queries with model classes.

match_entity staticmethod

match_entity(model_class, var='$e', **filters)

Create a match query for an entity.

Parameters:

Name Type Description Default
model_class type[Entity]

The entity model class

required
var str

Variable name to use

'$e'
filters Any

Attribute filters (field_name: value)

{}

Returns:

Type Description
Query

Query object

Source code in type_bridge/query/__init__.py
@staticmethod
def match_entity(model_class: type[Entity], var: str = "$e", **filters: Any) -> Query:
    """Create a match query for an entity.

    Args:
        model_class: The entity model class
        var: Variable name to use
        filters: Attribute filters (field_name: value)

    Returns:
        Query object
    """
    from type_bridge.crud.patterns import build_entity_match_pattern

    logger.debug(
        f"QueryBuilder.match_entity: {model_class.__name__}, var={var}, filters={filters}"
    )
    query = Query()
    pattern = build_entity_match_pattern(model_class, var, filters or None)
    query.match(pattern)
    return query

insert_entity staticmethod

insert_entity(instance, var='$e')

Create an insert query for an entity instance.

Parameters:

Name Type Description Default
instance Entity

Entity instance

required
var str

Variable name to use

'$e'

Returns:

Type Description
Query

Query object

Source code in type_bridge/query/__init__.py
@staticmethod
def insert_entity(instance: Entity, var: str = "$e") -> Query:
    """Create an insert query for an entity instance.

    Args:
        instance: Entity instance
        var: Variable name to use

    Returns:
        Query object
    """
    logger.debug(f"QueryBuilder.insert_entity: {instance.__class__.__name__}, var={var}")
    query = Query()
    insert_pattern = instance.to_insert_query(var)
    query.insert(insert_pattern)
    return query

match_relation staticmethod

match_relation(model_class, var='$r', role_players=None)

Create a match query for a relation.

Parameters:

Name Type Description Default
model_class type[Relation]

The relation model class

required
var str

Variable name to use

'$r'
role_players dict[str, str] | None

Dict mapping role names to player variables

None

Returns:

Type Description
Query

Query object

Raises:

Type Description
ValueError

If a role name is not defined in the model

Source code in type_bridge/query/__init__.py
@staticmethod
def match_relation(
    model_class: type[Relation], var: str = "$r", role_players: dict[str, str] | None = None
) -> Query:
    """Create a match query for a relation.

    Args:
        model_class: The relation model class
        var: Variable name to use
        role_players: Dict mapping role names to player variables

    Returns:
        Query object

    Raises:
        ValueError: If a role name is not defined in the model
    """
    from type_bridge.crud.patterns import build_relation_match_pattern

    logger.debug(
        f"QueryBuilder.match_relation: {model_class.__name__}, var={var}, "
        f"role_players={role_players}"
    )
    query = Query()
    pattern = build_relation_match_pattern(model_class, var, role_players)
    query.match(pattern)
    return query

Database

Database(address='localhost:1729', database='typedb', username=None, password=None, driver=None)

Main database connection and session manager.

Initialize database connection.

Parameters:

Name Type Description Default
address str

TypeDB server address

'localhost:1729'
database str

Database name

'typedb'
username str | None

Optional username for authentication

None
password str | None

Optional password for authentication

None
driver Driver | None

Optional pre-existing Driver instance to use. If provided, the Database will use this driver instead of creating a new one. The caller retains ownership and is responsible for closing it.

None
Source code in type_bridge/session.py
def __init__(
    self,
    address: str = "localhost:1729",
    database: str = "typedb",
    username: str | None = None,
    password: str | None = None,
    driver: Driver | None = None,
):
    """Initialize database connection.

    Args:
        address: TypeDB server address
        database: Database name
        username: Optional username for authentication
        password: Optional password for authentication
        driver: Optional pre-existing Driver instance to use. If provided,
            the Database will use this driver instead of creating a new one.
            The caller retains ownership and is responsible for closing it.
    """
    self.address = address
    self.database_name = database
    self.username = username
    self.password = password
    self._driver: Driver | None = driver
    self._owns_driver: bool = driver is None  # Track ownership

driver property

driver

Get the TypeDB driver, connecting if necessary.

connect

connect()

Connect to TypeDB server.

If a driver was injected via init, this method does nothing (the driver is already connected). Otherwise, creates a new driver.

Source code in type_bridge/session.py
def connect(self) -> None:
    """Connect to TypeDB server.

    If a driver was injected via __init__, this method does nothing
    (the driver is already connected). Otherwise, creates a new driver.
    """
    if self._driver is None:
        logger.debug(f"Connecting to TypeDB at {self.address} (database: {self.database_name})")
        # Create credentials if username/password provided
        credentials = (
            Credentials(self.username, self.password)
            if self.username and self.password
            else None
        )

        # Create driver options
        # Disable TLS for local connections (non-HTTPS addresses)
        is_tls_enabled = self.address.startswith("https://")
        driver_options = DriverOptions(is_tls_enabled=is_tls_enabled)
        logger.debug(f"TLS enabled: {is_tls_enabled}")

        # Connect to TypeDB
        try:
            if credentials:
                logger.debug("Using provided credentials for authentication")
                self._driver = TypeDB.driver(self.address, credentials, driver_options)
            else:
                # For local TypeDB Core without authentication
                logger.debug("Using default credentials for local connection")
                self._driver = TypeDB.driver(
                    self.address, Credentials("admin", "password"), driver_options
                )
            self._owns_driver = True
            logger.info(f"Connected to TypeDB at {self.address}")
        except Exception as e:
            logger.error(f"Failed to connect to TypeDB at {self.address}: {e}")
            raise

close

close()

Close connection to TypeDB server.

If the driver was injected via init, this method only clears the reference without closing the driver (the caller retains ownership). If the driver was created internally, it will be closed.

Source code in type_bridge/session.py
def close(self) -> None:
    """Close connection to TypeDB server.

    If the driver was injected via __init__, this method only clears the
    reference without closing the driver (the caller retains ownership).
    If the driver was created internally, it will be closed.
    """
    if self._driver:
        if self._owns_driver:
            logger.debug(f"Closing connection to TypeDB at {self.address}")
            self._driver.close()
            logger.info(f"Disconnected from TypeDB at {self.address}")
        else:
            logger.debug("Clearing driver reference (external driver, not closing)")
        self._driver = None

__enter__

__enter__()

Context manager entry.

Source code in type_bridge/session.py
def __enter__(self) -> "Database":
    """Context manager entry."""
    self.connect()
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in type_bridge/session.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Context manager exit."""
    del exc_type, exc_val, exc_tb  # unused
    self.close()

__del__

__del__()

Destructor that warns if driver was not properly closed.

Source code in type_bridge/session.py
def __del__(self) -> None:
    """Destructor that warns if driver was not properly closed."""
    import warnings

    if self._driver is not None and self._owns_driver:
        warnings.warn(
            f"Database connection to {self.address} was not closed. "
            "Use 'with Database(...) as db:' or call db.close() explicitly.",
            ResourceWarning,
            stacklevel=2,
        )
        # Attempt to close to prevent resource leak
        try:
            self._driver.close()
        except Exception:
            pass  # Ignore errors during cleanup

create_database

create_database()

Create the database if it doesn't exist.

Source code in type_bridge/session.py
def create_database(self) -> None:
    """Create the database if it doesn't exist."""
    if not self.driver.databases.contains(self.database_name):
        logger.debug(f"Creating database: {self.database_name}")
        self.driver.databases.create(self.database_name)
        logger.info(f"Database created: {self.database_name}")
    else:
        logger.debug(f"Database already exists: {self.database_name}")

delete_database

delete_database()

Delete the database.

Source code in type_bridge/session.py
def delete_database(self) -> None:
    """Delete the database."""
    if self.driver.databases.contains(self.database_name):
        logger.debug(f"Deleting database: {self.database_name}")
        self.driver.databases.get(self.database_name).delete()
        logger.info(f"Database deleted: {self.database_name}")
    else:
        logger.debug(f"Database does not exist, skipping delete: {self.database_name}")

database_exists

database_exists()

Check if database exists.

Source code in type_bridge/session.py
def database_exists(self) -> bool:
    """Check if database exists."""
    exists = self.driver.databases.contains(self.database_name)
    logger.debug(f"Database exists check for '{self.database_name}': {exists}")
    return exists

transaction

transaction(transaction_type: TransactionType) -> TransactionContext
transaction(transaction_type: str = 'read') -> TransactionContext
transaction(transaction_type='read')

Create a transaction context.

Parameters:

Name Type Description Default
transaction_type TransactionType | str

TransactionType or string ("read", "write", "schema")

'read'

Returns:

Type Description
TransactionContext

TransactionContext for use as a context manager

Source code in type_bridge/session.py
def transaction(self, transaction_type: TransactionType | str = "read") -> "TransactionContext":
    """Create a transaction context.

    Args:
        transaction_type: TransactionType or string ("read", "write", "schema")

    Returns:
        TransactionContext for use as a context manager
    """
    tx_type_map: dict[str, TransactionType] = {
        "read": TransactionType.READ,
        "write": TransactionType.WRITE,
        "schema": TransactionType.SCHEMA,
    }

    if isinstance(transaction_type, str):
        tx_type = tx_type_map.get(transaction_type, TransactionType.READ)
    else:
        tx_type = transaction_type

    logger.debug(
        f"Creating {_tx_type_name(tx_type)} transaction for database: {self.database_name}"
    )
    return TransactionContext(self, tx_type)

execute_query

execute_query(query, transaction_type='read')

Execute a query and return results.

Parameters:

Name Type Description Default
query str

TypeQL query string

required
transaction_type str

Type of transaction ("read", "write", or "schema")

'read'

Returns:

Type Description
list[dict[str, Any]]

List of result dictionaries

Source code in type_bridge/session.py
def execute_query(self, query: str, transaction_type: str = "read") -> list[dict[str, Any]]:
    """Execute a query and return results.

    Args:
        query: TypeQL query string
        transaction_type: Type of transaction ("read", "write", or "schema")

    Returns:
        List of result dictionaries
    """
    logger.debug(f"Executing query (type={transaction_type}, {len(query)} chars)")
    logger.debug(f"Query: {query}")
    with self.transaction(transaction_type) as tx:
        results = tx.execute(query)
        if isinstance(transaction_type, str):
            needs_commit = transaction_type in ("write", "schema")
        else:
            needs_commit = transaction_type in (TransactionType.WRITE, TransactionType.SCHEMA)
        if needs_commit:
            tx.commit()
        logger.debug(f"Query returned {len(results)} results")
        return results

get_schema

get_schema()

Get the schema definition for this database.

Source code in type_bridge/session.py
def get_schema(self) -> str:
    """Get the schema definition for this database."""
    logger.debug(f"Fetching schema for database: {self.database_name}")
    db = self.driver.databases.get(self.database_name)
    schema = db.schema()
    logger.debug(f"Schema fetched ({len(schema)} chars)")
    return schema

TransactionContext

TransactionContext(db, tx_type)

Context manager for sharing a TypeDB transaction across operations.

Source code in type_bridge/session.py
def __init__(self, db: Database, tx_type: TransactionType):
    self.db = db
    self.tx_type = tx_type
    self._tx: Transaction | None = None

transaction property

transaction

Underlying transaction wrapper.

database property

database

Database backing this transaction.

execute

execute(query)

Execute a query within the active transaction.

Source code in type_bridge/session.py
def execute(self, query: str) -> list[dict[str, Any]]:
    """Execute a query within the active transaction."""
    return self.transaction.execute(query)

commit

commit()

Commit the active transaction.

Source code in type_bridge/session.py
def commit(self) -> None:
    """Commit the active transaction."""
    self.transaction.commit()

rollback

rollback()

Rollback the active transaction.

Source code in type_bridge/session.py
def rollback(self) -> None:
    """Rollback the active transaction."""
    self.transaction.rollback()

manager

manager(model_cls)

Get a TypeDBManager bound to this transaction.

Source code in type_bridge/session.py
def manager(self, model_cls: Any):
    """Get a TypeDBManager bound to this transaction."""
    from type_bridge.crud import TypeDBManager
    from type_bridge.models import Entity, Relation

    if issubclass(model_cls, (Entity, Relation)):
        return TypeDBManager(self.transaction, model_cls)

    raise TypeError("manager() expects an Entity or Relation subclass")

Flag

Flag(*annotations)

Create attribute flags for Key, Unique, and Card markers.

Usage

field: Type = Flag(Key) # @key (implies @card(1..1)) field: Type = Flag(Unique) # @unique @card(1..1) field: list[Type] = Flag(Card(min=2)) # @card(2..) field: list[Type] = Flag(Card(1, 5)) # @card(1..5) field: Type = Flag(Key, Unique) # @key @unique field: list[Type] = Flag(Key, Card(min=1)) # @key @card(1..)

For optional single values, use Optional[Type] instead: field: Optional[Type] # @card(0..1) - no Flag needed

Parameters:

Name Type Description Default
*annotations Any

Variable number of Key, Unique, or Card marker instances

()

Returns:

Type Description
Annotated[Any, AttributeFlags]

AttributeFlags instance with the specified flags

Example

class Person(Entity): flags = TypeFlags(name="person") name: Name = Flag(Key) # @key (implies @card(1..1)) email: Email = Flag(Key, Unique) # @key @unique age: Optional[Age] # @card(0..1) tags: list[Tag] = Flag(Card(min=2)) # @card(2..) jobs: list[Job] = Flag(Card(1, 5)) # @card(1..5)

Source code in type_bridge/attribute/flags.py
def Flag(*annotations: Any) -> Annotated[Any, AttributeFlags]:
    """Create attribute flags for Key, Unique, and Card markers.

    Usage:
        field: Type = Flag(Key)                   # @key (implies @card(1..1))
        field: Type = Flag(Unique)                # @unique @card(1..1)
        field: list[Type] = Flag(Card(min=2))     # @card(2..)
        field: list[Type] = Flag(Card(1, 5))      # @card(1..5)
        field: Type = Flag(Key, Unique)           # @key @unique
        field: list[Type] = Flag(Key, Card(min=1)) # @key @card(1..)

    For optional single values, use Optional[Type] instead:
        field: Optional[Type]  # @card(0..1) - no Flag needed

    Args:
        *annotations: Variable number of Key, Unique, or Card marker instances

    Returns:
        AttributeFlags instance with the specified flags

    Example:
        class Person(Entity):
            flags = TypeFlags(name="person")
            name: Name = Flag(Key)                    # @key (implies @card(1..1))
            email: Email = Flag(Key, Unique)          # @key @unique
            age: Optional[Age]                        # @card(0..1)
            tags: list[Tag] = Flag(Card(min=2))       # @card(2..)
            jobs: list[Job] = Flag(Card(1, 5))        # @card(1..5)
    """
    flags = AttributeFlags()
    has_card = False

    for ann in annotations:
        if ann is Key:
            flags.is_key = True
        elif ann is Unique:
            flags.is_unique = True
        elif isinstance(ann, Card):
            # Extract cardinality from Card instance
            flags.card_min = ann.min
            flags.card_max = ann.max
            flags.has_explicit_card = True
            has_card = True

    # If Key was used but no Card, set default card(1,1)
    if flags.is_key and not has_card:
        flags.card_min = 1
        flags.card_max = 1

    return flags