Skip to content

type_bridge.crud

crud

CRUD operations for TypeDB entities and relations.

This module provides the unified TypeDBManager for performing CRUD (Create, Read, Update, Delete) operations on TypeDB entities and relations with type safety.

EntityNotFoundError

Bases: NotFoundError

Raised when an entity does not exist in the database.

This exception is raised during delete or update operations when the target entity cannot be found using its @key attributes or matched attributes.

Example

try: manager.delete(nonexistent_entity) except EntityNotFoundError: print("Entity was already deleted or never existed")

HydrationError

HydrationError(model_type, raw_data, cause)

Bases: RuntimeError

Raised when hydrating database results into model instances fails.

This exception is raised when the ORM cannot convert raw database results into Python model instances. This typically indicates: - Schema mismatch between database and Python models - Corrupt or unexpected data in the database - Missing or invalid type information

Attributes:

Name Type Description
model_type

Name of the model class being hydrated

raw_data

The raw database result that failed to hydrate

cause

The underlying exception that caused the failure

Example

try: results = manager.all() except HydrationError as e: print(f"Failed to hydrate {e.model_type}: {e.cause}") print(f"Raw data: {e.raw_data}")

Source code in type_bridge/crud/exceptions.py
def __init__(
    self,
    model_type: str,
    raw_data: object,
    cause: Exception,
):
    self.model_type = model_type
    self.raw_data = raw_data
    self.cause = cause

    message = (
        f"Failed to hydrate {model_type} from database result: {cause}. "
        f"This may indicate a schema mismatch or corrupt data. "
        f"Raw data: {raw_data!r}"
    )
    super().__init__(message)

KeyAttributeError

KeyAttributeError(entity_type, operation, field_name=None, all_fields=None)

Bases: ValueError

Raised when @key attribute validation fails during update/delete.

This exception is raised when: - A @key attribute has a None value - No @key attributes are defined on the entity

Attributes:

Name Type Description
entity_type

Name of the entity class

operation

The operation that failed ("update" or "delete")

field_name

The @key field that was None (if applicable)

all_fields

List of all defined fields (when no @key exists)

Example

try: manager.update(entity_with_none_key) except KeyAttributeError as e: print(f"Key validation failed: {e}") print(f"Entity type: {e.entity_type}") print(f"Operation: {e.operation}")

Source code in type_bridge/crud/exceptions.py
def __init__(
    self,
    entity_type: str,
    operation: str,
    field_name: str | None = None,
    all_fields: list[str] | None = None,
):
    self.entity_type = entity_type
    self.operation = operation
    self.field_name = field_name
    self.all_fields = all_fields

    if field_name is not None:
        # Key attribute is None
        message = (
            f"Cannot {operation} {entity_type}: "
            f"key attribute '{field_name}' is None. "
            f"Ensure the entity has a valid '{field_name}' value "
            f"before calling {operation}()."
        )
    else:
        # No @key attributes defined
        message = (
            f"Cannot {operation} {entity_type}: no @key attributes found. "
            f"The {operation}() method requires at least one @key attribute "
            f"to identify the entity. "
            f"Defined attributes: {all_fields} (none marked as @key). "
            f"Hint: Add Flag(Key) to an attribute, e.g., `id: Id = Flag(Key)`"
        )

    super().__init__(message)

NotFoundError

Bases: LookupError

Base class for not-found errors in CRUD operations.

Raised when an entity or relation that was expected to exist cannot be found in the database.

NotUniqueError

Bases: ValueError

Raised when an operation requires exactly one match but finds multiple.

This exception is raised when attempting to delete an entity without @key attributes and multiple matching records are found. Use filter().delete() for bulk deletion instead.

Example

try: manager.delete(keyless_entity) except NotUniqueError: print("Multiple entities matched - use filter().delete() for bulk deletion")

RelationNotFoundError

Bases: NotFoundError

Raised when a relation does not exist in the database.

This exception is raised during delete or update operations when the target relation cannot be found using its role players' @key attributes.

Example

try: manager.delete(nonexistent_relation) except RelationNotFoundError: print("Relation was already deleted or never existed")

CrudEvent

Bases: Enum

CRUD lifecycle events.

CrudHook

Bases: Protocol

Protocol for CRUD lifecycle hooks.

Implement only the methods you need. All methods are optional — HookRunner uses hasattr / getattr to discover them.

HookCancelled

HookCancelled(reason='', *, event=None, hook=None)

Bases: Exception

Raise in a pre-hook to abort the operation.

Attributes:

Name Type Description
reason

Human-readable explanation.

event

The event that was cancelled (set by HookRunner).

hook

The hook instance that raised the cancellation (set by HookRunner).

Source code in type_bridge/crud/hooks.py
def __init__(
    self,
    reason: str = "",
    *,
    event: CrudEvent | None = None,
    hook: Any = None,
):
    self.reason = reason
    self.event = event
    self.hook = hook
    super().__init__(reason)

EntityStrategy

Bases: ModelStrategy['Entity']

Strategy for handling Entity models.

identify

identify(instance)

Generate identification constraints for an entity.

Delegates to the entity's _build_identification_constraints() method to avoid duplicating the IID/key attribute logic.

Source code in type_bridge/crud/strategies.py
def identify(self, instance: Entity) -> list[Constraint]:
    """Generate identification constraints for an entity.

    Delegates to the entity's _build_identification_constraints() method
    to avoid duplicating the IID/key attribute logic.
    """
    return instance._build_identification_constraints()

ModelStrategy

Bases: ABC

Abstract strategy for handling model-specific logic.

identify abstractmethod

identify(instance)

Generate identification constraints (IID or keys/roles).

Source code in type_bridge/crud/strategies.py
@abstractmethod
def identify(self, instance: T) -> list[Constraint]:
    """Generate identification constraints (IID or keys/roles)."""
    pass

build_insert abstractmethod

build_insert(instance, var)

Generate insert AST and optional match prerequisites.

Source code in type_bridge/crud/strategies.py
@abstractmethod
def build_insert(self, instance: T, var: str) -> tuple[MatchClause | None, InsertClause]:
    """Generate insert AST and optional match prerequisites."""
    pass

build_match_all abstractmethod

build_match_all(model_class, var, filters)

Generate match AST for filtering.

Source code in type_bridge/crud/strategies.py
@abstractmethod
def build_match_all(
    self, model_class: type[T], var: str, filters: dict[str, Any]
) -> MatchClause:
    """Generate match AST for filtering."""
    pass

RelationStrategy

Bases: ModelStrategy['Relation']

Strategy for handling Relation models.

build_fetch_query

build_fetch_query(model_class, var, filters, expressions, order_by=None, limit=None, offset=None)

Build a complete fetch query for relations including role players.

Uses the TypeDB 3.x pattern with isa! for type variable binding to enable label() fetching for polymorphic type resolution.

Parameters:

Name Type Description Default
model_class type[Relation]

The relation class

required
var str

Variable name for the relation (e.g., "$r")

required
filters dict[str, Any]

Attribute and role player filters

required
expressions list[Any]

Expression objects for filtering

required
order_by list[tuple[str, bool]] | None

List of (field_name, descending) tuples for sorting

None
limit int | None

Maximum number of results

None
offset int | None

Number of results to skip

None

Returns:

Type Description
str

Complete TypeQL query string

Source code in type_bridge/crud/strategies.py
def build_fetch_query(
    self,
    model_class: type[Relation],
    var: str,
    filters: dict[str, Any],
    expressions: list[Any],
    order_by: list[tuple[str, bool]] | None = None,
    limit: int | None = None,
    offset: int | None = None,
) -> str:
    """Build a complete fetch query for relations including role players.

    Uses the TypeDB 3.x pattern with isa! for type variable binding to enable
    label() fetching for polymorphic type resolution.

    Args:
        model_class: The relation class
        var: Variable name for the relation (e.g., "$r")
        filters: Attribute and role player filters
        expressions: Expression objects for filtering
        order_by: List of (field_name, descending) tuples for sorting
        limit: Maximum number of results
        offset: Number of results to skip

    Returns:
        Complete TypeQL query string
    """
    from type_bridge.crud.formatting import format_value
    from type_bridge.crud.types import is_multi_value_attribute
    from type_bridge.crud.utils import build_role_player_fetch_items

    roles = getattr(model_class, "_roles", {})
    owned_attrs = model_class.get_all_attributes()
    base_type = model_class.get_type_name()

    # Build role player variables
    role_vars: dict[str, str] = {}  # role_name -> var
    role_info: dict[str, tuple[str, tuple[type, ...]]] = {}  # for fetch items

    for role_name, role in roles.items():
        role_var = f"${role_name}"
        role_vars[role_name] = role_var
        role_info[role_name] = (role_var, role.player_entity_types)

    # Build match clause using isa! pattern for type variable binding
    # This enables label($t) to fetch the actual type name
    role_player_parts = [f"{roles[rn].role_name}: {rv}" for rn, rv in role_vars.items()]
    roles_str = ", ".join(role_player_parts)

    # Use isa! to bind exact type to $t for label() function
    match_clauses = [f"{var} isa! $t ({roles_str})", f"$t sub {base_type}"]

    # Add type variable bindings for each role player to enable label() fetch
    for role_name in roles:
        role_var = f"${role_name}"
        type_var = f"{role_var}_type"
        match_clauses.append(f"{role_var} isa! {type_var}")

    # Add attribute filter clauses to relation match
    for field_name, value in filters.items():
        if field_name in owned_attrs:
            attr_info = owned_attrs[field_name]
            attr_name = attr_info.typ.get_attribute_name()
            raw_val = value.value if hasattr(value, "value") else value
            match_clauses.append(f"{var} has {attr_name} {format_value(raw_val)}")

    # Add role player filter clauses (handles both Entity and Relation role players)
    for role_name, value in filters.items():
        if role_name in roles and hasattr(value, "get_type_name"):
            player = value
            player_type = player.get_type_name()
            role_var = f"${role_name}"

            # Get player identification (IID or keys for entities)
            if player._iid:
                match_clauses.append(f"{role_var} isa {player_type}, iid {player._iid}")
            elif hasattr(player, "_build_identification_constraints"):
                # Entity: use key attributes
                key_match = f"{role_var} isa {player_type}"
                for field_name, attr_info in player.get_all_attributes().items():
                    if attr_info.flags.is_key:
                        attr_value = getattr(player, field_name, None)
                        if attr_value is not None:
                            attr_name = attr_info.typ.get_attribute_name()
                            raw_val = (
                                attr_value.value if hasattr(attr_value, "value") else attr_value
                            )
                            key_match += f", has {attr_name} {format_value(raw_val)}"
                match_clauses.append(key_match)
            else:
                # Relation as role player without IID - can't identify
                raise ValueError(
                    f"Relation role player '{role_name}' cannot be identified: "
                    f"no _iid set. Fetch the relation from the database first."
                )

    match_clause = "match\n" + ";\n".join(match_clauses) + ";"

    # Add expression patterns
    if expressions:
        from type_bridge.expressions.role_player import RolePlayerExpr

        match_clause = match_clause.rstrip(";")
        for expr in expressions:
            # For RolePlayerExpr, use the role player variable instead of relation variable
            if isinstance(expr, RolePlayerExpr):
                role_var = role_vars.get(expr.role_name, f"${expr.role_name}")
                pattern = expr.to_typeql(role_var)
            else:
                pattern = expr.to_typeql(var)
            match_clause += f";\n{pattern}"
        match_clause += ";"

    # Build fetch clause with type label and role player fetch items
    fetch_items = [f'"_iid": iid({var})', '"_type": label($t)']

    # Add relation attributes
    for field_name, attr_info in owned_attrs.items():
        attr_name = attr_info.typ.get_attribute_name()
        if is_multi_value_attribute(attr_info.flags):
            fetch_items.append(f'"{attr_name}": [{var}.{attr_name}]')
        else:
            fetch_items.append(f'"{attr_name}": {var}.{attr_name}')

    # Add role player fetch items (IID, type label, and attributes)
    fetch_items.extend(build_role_player_fetch_items(role_info))

    # Build modifiers (sort, offset, limit)
    # For relations, we need to bind sort attributes to variables in the match clause
    modifier_clauses = []
    sort_var_bindings = []
    if order_by:
        sort_parts = []
        for i, (field_name, desc) in enumerate(order_by):
            sort_var = f"$sort_{i}"
            direction = "desc" if desc else "asc"

            # Check for role player attribute syntax: role__attribute
            if "__" in field_name:
                role_name, attr_field = field_name.split("__", 1)
                if role_name in roles:
                    role_var = role_vars.get(role_name, f"${role_name}")
                    # Get the role player entity types to find the attribute
                    role = roles[role_name]
                    player_types = role.player_entity_types
                    # Look up the attribute in the player entity types
                    for player_type in player_types:
                        player_attrs = player_type.get_all_attributes()
                        if attr_field in player_attrs:
                            attr_name = player_attrs[attr_field].typ.get_attribute_name()
                            sort_var_bindings.append(f"{role_var} has {attr_name} {sort_var}")
                            sort_parts.append(f"{sort_var} {direction}")
                            break
            # Direct relation attribute
            elif field_name in owned_attrs:
                attr_name = owned_attrs[field_name].typ.get_attribute_name()
                # Add binding to match clause
                sort_var_bindings.append(f"{var} has {attr_name} {sort_var}")
                sort_parts.append(f"{sort_var} {direction}")
        if sort_parts:
            modifier_clauses.append(f"sort {', '.join(sort_parts)};")

    # Add sort variable bindings to match clause
    if sort_var_bindings:
        match_clause = match_clause.rstrip(";") + ";\n" + ";\n".join(sort_var_bindings) + ";"

    if offset is not None:
        modifier_clauses.append(f"offset {offset};")
    if limit is not None:
        modifier_clauses.append(f"limit {limit};")

    modifier_str = "\n".join(modifier_clauses)

    fetch_clause = "fetch {\n  " + ",\n  ".join(fetch_items) + "\n}"

    # Build query: match + modifiers + fetch
    if modifier_str:
        return match_clause + "\n" + modifier_str + "\n" + fetch_clause + ";"
    return match_clause + "\n" + fetch_clause + ";"

GroupByQuery

GroupByQuery(manager, filters, expressions, group_fields)

Query for grouped aggregations.

Allows grouping entities by field values and computing aggregations per group.

Initialize grouped query.

Parameters:

Name Type Description Default
manager TypeDBManager[T]

TypeDBManager instance

required
filters dict[str, Any]

Dict-based filters

required
expressions list[Any]

Expression-based filters

required
group_fields tuple[Any, ...]

Fields to group by (FieldRef or field descriptors)

required
Source code in type_bridge/crud/typedb_manager.py
def __init__(
    self,
    manager: TypeDBManager[T],
    filters: dict[str, Any],
    expressions: list[Any],
    group_fields: tuple[Any, ...],
):
    """Initialize grouped query.

    Args:
        manager: TypeDBManager instance
        filters: Dict-based filters
        expressions: Expression-based filters
        group_fields: Fields to group by (FieldRef or field descriptors)
    """
    self._manager = manager
    self._filters = filters
    self._expressions = expressions
    self._group_fields = group_fields

aggregate

aggregate(*aggregates)

Execute grouped aggregation.

Parameters:

Name Type Description Default
*aggregates Any

AggregateExpr objects

()

Returns:

Type Description
dict[Any, dict[str, Any]]

Dictionary mapping group values to aggregation results

Example
Group by department, compute average age per department

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Returns: {
"Engineering": {"avg_age": 35.5},
"Sales":
}
Source code in type_bridge/crud/typedb_manager.py
def aggregate(self, *aggregates: Any) -> dict[Any, dict[str, Any]]:
    """Execute grouped aggregation.

    Args:
        *aggregates: AggregateExpr objects

    Returns:
        Dictionary mapping group values to aggregation results

    Example:
        # Group by department, compute average age per department
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
        # Returns: {
        #   "Engineering": {"avg_age": 35.5},
        #   "Sales": {"avg_age": 28.3}
        # }
    """
    from type_bridge.crud.aggregates import parse_grouped_aggregate_results
    from type_bridge.expressions import AggregateExpr
    from type_bridge.expressions.utils import generate_attr_var
    from type_bridge.fields.base import FieldRef
    from type_bridge.query.ast import HasPattern, ReduceAssignment, ReduceClause

    if not aggregates:
        raise ValueError("At least one aggregation expression required")

    model_class = self._manager.model_class
    var = "$e"

    # Build base match clause with filters
    match_clause = self._manager.strategy.build_match_all(model_class, var, self._filters)

    # Add expression-based filters as AST patterns
    for expr in self._expressions:
        match_clause.patterns.extend(expr.to_ast(var))

    # Add group-by fields to match clause as AST patterns
    group_vars = []
    for i, field in enumerate(self._group_fields):
        var_name = f"$group{i}"
        # Field can be a FieldRef or a field descriptor
        if isinstance(field, FieldRef):
            attr_name = field.attr_type.get_attribute_name()
        else:
            # Assume it's a field descriptor with attr_type attribute
            attr_name = field.attr_type.get_attribute_name()
        match_clause.patterns.append(
            HasPattern(thing_var=var, attr_type=attr_name, attr_var=var_name)
        )
        group_vars.append(var_name)

    # Build reduce assignments for each aggregation
    reduce_assignments = []
    for agg in aggregates:
        if not isinstance(agg, AggregateExpr):
            raise TypeError(f"Expected AggregateExpr, got {type(agg).__name__}")

        # If this aggregation is on a specific attr_type (not count), add binding pattern
        if agg.attr_type is not None:
            attr_name = agg.attr_type.get_attribute_name()
            attr_var = generate_attr_var(var, agg.attr_type)
            match_clause.patterns.append(
                HasPattern(thing_var=var, attr_type=attr_name, attr_var=attr_var)
            )

        # Build reduce assignment
        result_var = f"${agg.get_fetch_key()}"
        reduce_assignments.append(
            ReduceAssignment(variable=result_var, expression=agg.to_typeql(var))
        )

    # Compile and execute with group-by
    match_str = self._manager.compiler.compile(match_clause)
    group_clause = ", ".join(group_vars)
    reduce_clause = ReduceClause(assignments=reduce_assignments, group_by=group_clause)
    reduce_str = self._manager.compiler.compile(reduce_clause)
    query = f"{match_str}\n{reduce_str}"

    results = self._manager._execute(query, TransactionType.READ)
    return parse_grouped_aggregate_results(results, group_vars)

TypeDBManager

TypeDBManager(connection, model_class)

Unified CRUD manager for TypeDB entities and relations.

Source code in type_bridge/crud/typedb_manager.py
def __init__(self, connection: Connection, model_class: type[T]):
    self._connection = connection
    self._executor = ConnectionExecutor(connection)
    self.model_class = model_class
    self.compiler = QueryCompiler()

    self._hook_runner = HookRunner()

    # Select strategy
    if issubclass(model_class, Entity):
        self.strategy: ModelStrategy = EntityStrategy()
    elif issubclass(model_class, Relation):
        self.strategy = RelationStrategy()
    else:
        raise TypeError(f"Unsupported model type: {model_class}")

add_hook

add_hook(hook)

Register a lifecycle hook. Returns self for chaining.

Source code in type_bridge/crud/typedb_manager.py
def add_hook(self, hook: Any) -> Self:
    """Register a lifecycle hook. Returns self for chaining."""
    self._hook_runner.add(hook)
    return self

remove_hook

remove_hook(hook)

Unregister a lifecycle hook.

Source code in type_bridge/crud/typedb_manager.py
def remove_hook(self, hook: Any) -> None:
    """Unregister a lifecycle hook."""
    self._hook_runner.remove(hook)

insert

insert(instance)

Insert a new instance and populate _iid.

For entities, uses a single roundtrip (insert + fetch combined). For relations, uses two roundtrips (insert, then fetch) because TypeDB 3.x relation inserts don't bind the variable.

Source code in type_bridge/crud/typedb_manager.py
def insert(self, instance: T) -> T:
    """Insert a new instance and populate _iid.

    For entities, uses a single roundtrip (insert + fetch combined).
    For relations, uses two roundtrips (insert, then fetch) because
    TypeDB 3.x relation inserts don't bind the variable.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound
    # after insert. Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        query_parts.append(self.compiler.compile(insert_clause))
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)
        return instance

    # Entities: Combined insert + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=False)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

    return instance

get

get(**filters)

Get instances matching filters.

Source code in type_bridge/crud/typedb_manager.py
def get(self, **filters) -> list[T]:
    """Get instances matching filters."""
    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry

    # Use descriptive variable name to avoid conflicts
    var = "$rel" if isinstance(self.strategy, RelationStrategy) else "$ent"

    # Check if this is a relation (needs special handling for role players)
    if isinstance(self.strategy, RelationStrategy):
        return self._get_relations(var, filters, [])

    # Entity path: fetch with polymorphic type resolution
    base_type = self.model_class.get_type_name()

    from type_bridge.query.ast import EntityPattern

    # Build match clause with isa! for type variable binding (enables polymorphic resolution)
    # This allows us to fetch the actual concrete type using label()
    match_clause: MatchClause = self.strategy.build_match_all(self.model_class, var, filters)

    # Modify the AST to use 'isa!' and capture type variable '$t'
    # We iterate through patterns to find the main entity pattern
    for pattern in match_clause.patterns:
        if (
            isinstance(pattern, EntityPattern)
            and pattern.variable == var
            and pattern.type_name == base_type
        ):
            # We found the main pattern.
            # Transform to: $ent isa! $t
            # And add: $t sub base_type

            pattern.type_name = "$t"
            pattern.is_strict = True

            # Add sub constraint as a new pattern
            from type_bridge.query.ast import SubTypePattern

            match_clause.patterns.append(SubTypePattern(variable="$t", parent_type=base_type))
            break

    match_str = self.compiler.compile(match_clause)

    # Build fetch clause using wildcard to get all attributes including subtype-specific ones
    fetch_clause_str = self._build_wildcard_fetch(var, include_iid=True, include_type=True)
    query = match_str + "\n" + fetch_clause_str

    results = self._execute(query, TransactionType.READ)

    # Hydrate entity instances with polymorphic type resolution
    instances = []
    for result in results:
        try:
            iid = result.pop("_iid", None)
            if isinstance(iid, dict) and "value" in iid:
                iid = iid["value"]

            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            if iid:
                object.__setattr__(instance, "_iid", iid)
            instances.append(instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e

    return instances

update

update(instance)

Update an instance in the database.

Uses the Strategy pattern to identify the instance, then updates all non-key attributes to match the current state.

Parameters:

Name Type Description Default
instance T

Instance with updated values

required

Returns:

Type Description
T

The updated instance

Source code in type_bridge/crud/typedb_manager.py
def update(self, instance: T) -> T:
    """Update an instance in the database.

    Uses the Strategy pattern to identify the instance, then updates
    all non-key attributes to match the current state.

    Args:
        instance: Instance with updated values

    Returns:
        The updated instance
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_UPDATE, self.model_class, instance)

    var = "$x"
    constraints = self.strategy.identify(instance)
    all_attrs = self.model_class.get_all_attributes()

    # Build the base match clause using AST
    from type_bridge.query.ast import EntityPattern, RelationPattern

    if issubclass(self.model_class, Entity):
        pattern = EntityPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            constraints=constraints,
        )
    else:
        pattern = RelationPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            role_players=[],
            constraints=constraints,
        )

    base_match = self.compiler.compile(MatchClause(patterns=[pattern]))
    # Remove "match\n" prefix as we'll rebuild it
    base_match_body = base_match[6:] if base_match.startswith("match\n") else base_match

    # Separate single-value and multi-value attributes
    single_value_updates: dict[str, Any] = {}
    multi_value_updates: dict[str, list[Any]] = {}
    single_value_deletes: list[str] = []

    for field_name, attr_info in all_attrs.items():
        # Skip key attributes - they identify the instance, can't be changed
        if attr_info.flags.is_key:
            continue

        value = getattr(instance, field_name, None)
        attr_name = attr_info.typ.get_attribute_name()
        is_multi = is_multi_value_attribute(attr_info.flags)

        if value is None:
            # Mark for deletion (if attribute exists)
            single_value_deletes.append(attr_name)
        elif is_multi and isinstance(value, list):
            multi_value_updates[attr_name] = value
        else:
            single_value_updates[attr_name] = value

    # Build try blocks for match clause
    try_blocks: list[str] = []

    # Add bindings for multi-value attributes with guards
    for attr_name, values in multi_value_updates.items():
        keep_literals = [format_value(v) for v in dict.fromkeys(values)]
        guard_lines = [f"not {{ ${attr_name} == {lit}; }};" for lit in keep_literals]
        try_block = "\n".join(
            [
                "try {",
                f"  {var} has {attr_name} ${attr_name};",
                *[f"  {g}" for g in guard_lines],
                "};",
            ]
        )
        try_blocks.append(try_block)

    # Add bindings for single-value updates (delete old + insert new)
    for attr_name in single_value_updates:
        try_blocks.append(f"try {{ {var} has {attr_name} $old_{attr_name}; }};")

    # Add bindings for single-value deletes
    for attr_name in single_value_deletes:
        try_blocks.append(f"try {{ {var} has {attr_name} ${attr_name}; }};")

    # Combine base match with try blocks
    if try_blocks:
        match_clause_str = base_match_body + "\n" + "\n".join(try_blocks)
    else:
        match_clause_str = base_match_body
    query_parts = [f"match\n{match_clause_str}"]

    # Build delete clause
    delete_parts = []
    for attr_name in multi_value_updates:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")
    for attr_name in single_value_updates:
        delete_parts.append(f"try {{ $old_{attr_name} of {var}; }};")
    for attr_name in single_value_deletes:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")

    if delete_parts:
        query_parts.append("delete\n" + "\n".join(delete_parts))

    # Build insert clause
    insert_parts = []
    for attr_name, values in multi_value_updates.items():
        for value in values:
            insert_parts.append(f"{var} has {attr_name} {format_value(value)};")
    for attr_name, value in single_value_updates.items():
        insert_parts.append(f"{var} has {attr_name} {format_value(value)};")

    if insert_parts:
        query_parts.append("insert\n" + "\n".join(insert_parts))

    full_query = "\n".join(query_parts)
    logger.debug(f"Update query: {full_query}")

    self._execute(full_query, TransactionType.WRITE)
    logger.info(f"Updated: {self.model_class.__name__}")

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_UPDATE, self.model_class, instance)

    return instance

delete

delete(instance)

Delete an instance and return it.

Source code in type_bridge/crud/typedb_manager.py
def delete(self, instance: T) -> T:
    """Delete an instance and return it."""
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, instance)

    var = "$x"

    # Build AST-based match clause
    patterns: list[Pattern]
    if isinstance(instance, Entity):
        patterns = [instance.get_match_pattern(var)]
    elif isinstance(instance, Relation):
        patterns = instance.get_match_patterns(var)
    else:
        raise TypeError(f"Unexpected instance type: {type(instance)}")

    match_clause = MatchClause(patterns=patterns)
    delete_clause = DeleteClause(statements=[DeleteThingStatement(variable=var)])

    # Compile and execute
    match_str = self.compiler.compile(match_clause)
    delete_str = self.compiler.compile(delete_clause)
    query = f"{match_str}\n{delete_str}"

    self._execute(query, TransactionType.WRITE)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, instance)

    return instance

all

all()

Fetch all instances of this type.

Source code in type_bridge/crud/typedb_manager.py
def all(self) -> list[T]:
    """Fetch all instances of this type."""
    return self.get()

insert_many

insert_many(instances)

Insert multiple instances in a single query (batched).

For entities, combines all inserts into a single query for efficiency. For relations, falls back to individual inserts (due to match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def insert_many(self, instances: list[T]) -> list[T]:
    """Insert multiple instances in a single query (batched).

    For entities, combines all inserts into a single query for efficiency.
    For relations, falls back to individual inserts (due to match clause complexity).
    """
    if not instances:
        return instances

    # Check if all instances are entities (can be batched)
    # Relations need individual handling due to role player match clauses
    if all(isinstance(inst, Entity) for inst in instances):
        if self._hook_runner.has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

        result = self._batch_insert_entities(instances)

        if self._hook_runner.has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.insert)
    for instance in instances:
        self.insert(instance)
    return instances

put

put(instance)

Insert or update an instance (idempotent) and populate _iid.

Uses TypeQL's PUT clause for idempotent insertion. For entities, uses a single roundtrip. For relations, uses two.

Source code in type_bridge/crud/typedb_manager.py
def put(self, instance: T) -> T:
    """Insert or update an instance (idempotent) and populate _iid.

    Uses TypeQL's PUT clause for idempotent insertion.
    For entities, uses a single roundtrip. For relations, uses two.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound.
    # Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        insert_query = self.compiler.compile(insert_clause)
        put_query = insert_query.replace("insert\n", "put\n", 1)
        query_parts.append(put_query)
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)
        return instance

    # Entities: Combined put + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=True)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

    return instance

delete_many

delete_many(instances, *, strict=False)

Delete multiple instances.

Optimized for batch deletion: instances with IIDs are deleted in a single query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

Parameters:

Name Type Description Default
instances list[T]

List of instances to delete

required
strict bool

If True, raise EntityNotFoundError if any entity doesn't exist. In strict mode, checks all entities first and raises before any deletion.

False

Returns:

Type Description
list[T]

List of actually-deleted entities (excludes those that didn't exist)

Source code in type_bridge/crud/typedb_manager.py
def delete_many(self, instances: list[T], *, strict: bool = False) -> list[T]:
    """Delete multiple instances.

    Optimized for batch deletion: instances with IIDs are deleted in a single
    query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

    Args:
        instances: List of instances to delete
        strict: If True, raise EntityNotFoundError if any entity doesn't exist.
               In strict mode, checks all entities first and raises before any deletion.

    Returns:
        List of actually-deleted entities (excludes those that didn't exist)
    """
    if not instances:
        return instances

    from type_bridge.crud.exceptions import EntityNotFoundError

    # Separate instances by whether they have IIDs
    with_iids: list[T] = []
    without_iids: list[T] = []

    for instance in instances:
        if getattr(instance, "_iid", None):
            with_iids.append(instance)
        else:
            without_iids.append(instance)

    has_hooks = self._hook_runner.has_hooks

    # For strict mode, we need to check existence before deleting
    if strict:
        # Batch check existence for instances with IIDs
        existing_iids = self._batch_check_existence_by_iid(with_iids) if with_iids else set()
        not_found_with_iid = [inst for inst in with_iids if inst._iid not in existing_iids]

        # Check existence individually for instances without IIDs
        not_found_without_iid = [inst for inst in without_iids if not self._entity_exists(inst)]

        not_found = not_found_with_iid + not_found_without_iid
        if not_found:
            names = [str(e) for e in not_found]
            raise EntityNotFoundError(f"entity(ies) not found: {names}")

        # All exist - proceed with batch delete
        deleted: list[T] = []
        if with_iids:
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(with_iids)
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(with_iids)
        for inst in without_iids:
            self.delete(inst)  # hooks fire inside self.delete()
            deleted.append(inst)
        return deleted

    # Non-strict mode: batch delete instances with IIDs, individual for others
    deleted = []

    if with_iids:
        # Batch check which ones exist
        existing_iids = self._batch_check_existence_by_iid(with_iids)
        existing_instances = [inst for inst in with_iids if inst._iid in existing_iids]

        if existing_instances:
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(existing_instances)
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(existing_instances)

    # Handle instances without IIDs individually (hooks fire via self.delete)
    for inst in without_iids:
        if self._entity_exists(inst):
            self.delete(inst)
            deleted.append(inst)

    return deleted

put_many

put_many(instances)

Put multiple instances (idempotent insert/update).

Attempts batch operation first for efficiency. If a key constraint violation occurs (some entities exist with different data), falls back to individual operations which are idempotent.

For entities, uses batch PUT when possible (N→1 roundtrips). For relations, uses individual operations (match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def put_many(self, instances: list[T]) -> list[T]:
    """Put multiple instances (idempotent insert/update).

    Attempts batch operation first for efficiency. If a key constraint
    violation occurs (some entities exist with different data), falls back
    to individual operations which are idempotent.

    For entities, uses batch PUT when possible (N→1 roundtrips).
    For relations, uses individual operations (match clause complexity).
    """
    if not instances:
        return instances

    has_hooks = self._hook_runner.has_hooks

    # Check if all instances are entities (can attempt batch)
    if all(isinstance(inst, Entity) for inst in instances):
        if has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

        try:
            result = self._batch_insert_entities(instances, use_put=True)
        except Exception as e:
            # Check if this is a key constraint violation
            error_str = str(e)
            if "unique" in error_str.lower() or "constraint" in error_str.lower():
                logger.debug(
                    f"Batch put failed with constraint violation, falling back to individual: {e}"
                )
                # Fall back to individual operations.
                # Note: pre-hooks may fire again via self.put() — acceptable
                # since the batch operation was rolled back.
                for instance in instances:
                    self.put(instance)
                return instances
            # Re-raise other errors
            raise

        if has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.put)
    for instance in instances:
        self.put(instance)
    return instances

update_many

update_many(instances)

Update multiple instances.

Source code in type_bridge/crud/typedb_manager.py
def update_many(self, instances: list[T]) -> list[T]:
    """Update multiple instances."""
    for instance in instances:
        self.update(instance)
    return instances

get_by_iid

get_by_iid(iid)

Fetch an instance by its internal ID with polymorphic type resolution.

Source code in type_bridge/crud/typedb_manager.py
def get_by_iid(self, iid: str) -> T | None:
    """Fetch an instance by its internal ID with polymorphic type resolution."""
    import re

    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry
    from type_bridge.query.ast import (
        EntityPattern,
        IidConstraint,
        MatchClause,
        SubTypePattern,
    )

    # Validate IID format (TypeDB IIDs are hexadecimal strings starting with 0x)
    # Return None for invalid IIDs (graceful handling - treat as "not found")
    if not iid or not re.match(r"^0x[0-9a-fA-F]+$", iid):
        return None

    var = "$x"
    base_type = self.model_class.get_type_name()

    if issubclass(self.model_class, Entity):
        # Entity path: Build match clause with isa! for polymorphic type resolution
        # $x isa! $t, iid <iid>; $t sub base_type;
        entity_pattern = EntityPattern(
            variable=var,
            type_name="$t",  # Type variable for polymorphic resolution
            constraints=[IidConstraint(iid=iid)],
            is_strict=True,  # Use isa! for strict type matching
        )
        subtype_pattern = SubTypePattern(variable="$t", parent_type=base_type)
        match_clause = MatchClause(patterns=[entity_pattern, subtype_pattern])
        match_str = self.compiler.compile(match_clause)

        # Build fetch clause using wildcard to get all attributes including subtype-specific ones
        # No IID needed in fetch - we already have it from input
        fetch_clause_str = self._build_wildcard_fetch(var, include_iid=False, include_type=True)
        query = match_str + "\n" + fetch_clause_str

        results = self._execute(query, TransactionType.READ)

        if not results:
            return None

        result = results[0]
        try:
            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            # Resolve concrete class
            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            object.__setattr__(instance, "_iid", iid)
            return cast(T | None, instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e
    else:
        # Relation path: Use _get_relations with IID filter
        # This properly handles role player hydration
        results = self._get_relations(var, {"_iid": iid}, [])
        if results:
            return results[0]
        return None

filter

filter(*expressions, **filters)

Create a chainable query with filters.

Parameters:

Name Type Description Default
*expressions Any

Expression objects (Person.age.gt(Age(30)), etc.)

()
**filters Any

Attribute filters (exact match) - age=30, name="Alice"

{}

Returns:

Type Description
TypeDBQuery[T]

TypeDBQuery for chaining

Raises:

Type Description
ValueError

If expressions reference attribute types not owned by the model

Source code in type_bridge/crud/typedb_manager.py
def filter(self, *expressions: Any, **filters: Any) -> TypeDBQuery[T]:
    """Create a chainable query with filters.

    Args:
        *expressions: Expression objects (Person.age.gt(Age(30)), etc.)
        **filters: Attribute filters (exact match) - age=30, name="Alice"

    Returns:
        TypeDBQuery for chaining

    Raises:
        ValueError: If expressions reference attribute types not owned by the model
    """
    # Validate expressions reference owned attribute types
    if expressions:
        owned_attrs = self.model_class.get_all_attributes()
        owned_attr_types = {attr_info.typ for attr_info in owned_attrs.values()}

        from type_bridge.expressions.role_player import RolePlayerExpr

        for expr in expressions:
            # Skip RolePlayerExpr - they reference player attributes, not relation attributes
            if isinstance(expr, RolePlayerExpr):
                continue

            # Get attribute types from expression
            expr_attr_types = expr.get_attribute_types()

            # Check if all attribute types are owned by the model
            for attr_type in expr_attr_types:
                if attr_type not in owned_attr_types:
                    raise ValueError(
                        f"{self.model_class.__name__} does not own attribute type {attr_type.__name__}. "
                        f"Available attribute types: {', '.join(t.__name__ for t in owned_attr_types)}"
                    )

    return TypeDBQuery(self, filters, list(expressions))

count

count(**filters)

Count all instances of this type, optionally filtering.

Source code in type_bridge/crud/typedb_manager.py
def count(self, **filters) -> int:
    """Count all instances of this type, optionally filtering."""
    var = "$x"
    match_clause = self.strategy.build_match_all(self.model_class, var, filters)
    match_str = self.compiler.compile(match_clause)
    reduce_str = self._build_count_reduce(var)
    query = match_str + "\n" + reduce_str

    results = self._execute(query, TransactionType.READ)
    if results and "count" in results[0]:
        count_val = results[0]["count"]
        # Handle wrapped value format from TypeDB driver
        if isinstance(count_val, dict) and "value" in count_val:
            return int(count_val["value"])
        if isinstance(count_val, (int, float)):
            return int(count_val)
        return int(str(count_val))
    return 0

group_by

group_by(*fields)

Group results by field values and compute aggregations.

Parameters:

Name Type Description Default
*fields Any

Field descriptors to group by (e.g., Person.department)

()

Returns:

Type Description
GroupByQuery[T]

GroupByQuery for chained aggregations

Example
Group by department, compute average age per department

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Returns: {
"Engineering": {"avg_age": 35.5},
"Sales":
}
Source code in type_bridge/crud/typedb_manager.py
def group_by(self, *fields: Any) -> GroupByQuery[T]:
    """Group results by field values and compute aggregations.

    Args:
        *fields: Field descriptors to group by (e.g., Person.department)

    Returns:
        GroupByQuery for chained aggregations

    Example:
        # Group by department, compute average age per department
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
        # Returns: {
        #   "Engineering": {"avg_age": 35.5},
        #   "Sales": {"avg_age": 28.3}
        # }
    """
    return GroupByQuery(self, {}, [], fields)

TypeDBQuery

TypeDBQuery(manager, filters, expressions=None)

Chainable query builder for TypeDBManager.

Source code in type_bridge/crud/typedb_manager.py
def __init__(
    self,
    manager: TypeDBManager[T],
    filters: dict[str, Any],
    expressions: list[Any] | None = None,
):
    self._manager = manager
    self._filters = filters
    self._expressions: list[Any] = expressions or []
    self._order_fields: list[tuple[str, bool]] = []  # (field, descending)
    self._limit_val: int | None = None
    self._offset_val: int | None = None

filter

filter(*expressions, **filters)

Add additional filters to the query.

Validates that expression attribute types are owned by the model class.

Source code in type_bridge/crud/typedb_manager.py
def filter(self, *expressions: Any, **filters: Any) -> TypeDBQuery[T]:
    """Add additional filters to the query.

    Validates that expression attribute types are owned by the model class.
    """
    # Validate expressions reference owned attribute types
    if expressions:
        model_class = self._manager.model_class
        owned_attrs = model_class.get_all_attributes()
        owned_attr_types = {attr_info.typ for attr_info in owned_attrs.values()}

        # For relations, also include role player attribute types
        # (RolePlayerExpr inner expressions will be validated separately)
        from type_bridge.expressions.role_player import RolePlayerExpr

        for expr in expressions:
            # Skip RolePlayerExpr - they reference player attributes, not relation attributes
            if isinstance(expr, RolePlayerExpr):
                continue

            # Get attribute types from expression
            expr_attr_types = expr.get_attribute_types()

            # Check if all attribute types are owned by the model
            for attr_type in expr_attr_types:
                if attr_type not in owned_attr_types:
                    raise ValueError(
                        f"{model_class.__name__} does not own attribute type {attr_type.__name__}. "
                        f"Available attribute types: {', '.join(t.__name__ for t in owned_attr_types)}"
                    )

    self._expressions.extend(expressions)
    self._filters.update(filters)
    return self

order_by

order_by(*fields)

Order results by fields. Prefix with '-' for descending.

Source code in type_bridge/crud/typedb_manager.py
def order_by(self, *fields: str) -> TypeDBQuery[T]:
    """Order results by fields. Prefix with '-' for descending."""
    for field in fields:
        if field.startswith("-"):
            self._order_fields.append((field[1:], True))
        else:
            self._order_fields.append((field, False))
    return self

limit

limit(n)

Limit number of results.

Source code in type_bridge/crud/typedb_manager.py
def limit(self, n: int) -> TypeDBQuery[T]:
    """Limit number of results."""
    self._limit_val = n
    return self

offset

offset(n)

Skip first n results.

Source code in type_bridge/crud/typedb_manager.py
def offset(self, n: int) -> TypeDBQuery[T]:
    """Skip first n results."""
    self._offset_val = n
    return self

execute

execute()

Execute the query and return results.

Source code in type_bridge/crud/typedb_manager.py
def execute(self) -> list[T]:
    """Execute the query and return results."""
    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry

    model_class = self._manager.model_class

    # Relations need special handling for role player fetching
    if isinstance(self._manager.strategy, RelationStrategy):
        return self._manager._get_relations(
            "$r",
            self._filters,
            self._expressions,
            order_by=self._order_fields if self._order_fields else None,
            limit=self._limit_val,
            offset=self._offset_val,
        )

    # Entity path with polymorphic type resolution
    var = "$x"
    base_type = model_class.get_type_name()

    # Parse Django-style lookup filters (e.g., name__startswith="Al")
    base_filters, lookup_expressions = self._parse_entity_lookup_filters(
        model_class, self._filters
    )
    all_expressions = list(self._expressions) + lookup_expressions

    from type_bridge.query.ast import EntityPattern, SubTypePattern

    # Build base match clause using parsed filters (exact match only)
    match_clause = self._manager.strategy.build_match_all(model_class, var, base_filters)

    # Add expression patterns to match clause (includes parsed lookup expressions)
    if all_expressions:
        for expr in all_expressions:
            match_clause.patterns.extend(expr.to_ast(var))

    # Add type variable binding for polymorphic resolution
    # Transform "$x isa type" to "$x isa! $t; $t sub type"

    for pattern in match_clause.patterns:
        if (
            isinstance(pattern, EntityPattern)
            and pattern.variable == var
            and pattern.type_name == base_type
        ):
            pattern.type_name = "$t"
            pattern.is_strict = True
            match_clause.patterns.append(SubTypePattern(variable="$t", parent_type=base_type))
            break

    # Add sort variable bindings to match clause (TypeDB 3.x requirement)
    # These must be added to the match clause BEFORE compilation
    modifier_clauses = []
    sort_parts = []
    all_attrs = model_class.get_all_attributes()
    if self._order_fields:
        for i, (field_name, desc) in enumerate(self._order_fields):
            if field_name in all_attrs:
                attr_name = all_attrs[field_name].typ.get_attribute_name()
                sort_var = f"$sort_{i}"
                # Bind attribute to variable in match clause
                from type_bridge.query.ast import HasPattern

                match_clause.patterns.append(
                    HasPattern(thing_var=var, attr_type=attr_name, attr_var=sort_var)
                )
                direction = "desc" if desc else "asc"
                sort_parts.append(f"{sort_var} {direction}")
        if sort_parts:
            modifier_clauses.append(f"sort {', '.join(sort_parts)};")

    match_str = self._manager.compiler.compile(match_clause)

    # Build fetch clause using wildcard to get all attributes including subtype-specific ones
    fetch_clause_str = self._manager._build_wildcard_fetch(
        var, include_iid=True, include_type=True
    )

    # offset must come BEFORE limit
    if self._offset_val is not None:
        modifier_clauses.append(f"offset {self._offset_val};")
    if self._limit_val is not None:
        modifier_clauses.append(f"limit {self._limit_val};")

    modifier_str = "\n".join(modifier_clauses)

    # Build final query: match ; modifiers ; fetch
    if modifier_str:
        query = match_str + "\n" + modifier_str + "\n" + fetch_clause_str
    else:
        query = match_str + "\n" + fetch_clause_str

    # Execute
    results = self._manager._execute(query, TransactionType.READ)

    # Hydrate objects with polymorphic type resolution
    instances = []
    entity_model = cast(type[Entity], model_class)
    for result in results:
        try:
            iid = result.pop("_iid", None)
            # Handle wrapped IID format from TypeDB driver
            if isinstance(iid, dict) and "value" in iid:
                iid = iid["value"]

            # Get actual type label for polymorphic resolution
            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            # Resolve concrete class from type label
            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label, (entity_model,)
                    )
            else:
                concrete_class = entity_model

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            if iid:
                object.__setattr__(instance, "_iid", iid)
            instances.append(instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e

    return instances

all

all()

Alias for execute().

Source code in type_bridge/crud/typedb_manager.py
def all(self) -> list[T]:
    """Alias for execute()."""
    return self.execute()

first

first()

Get the first matching instance.

Source code in type_bridge/crud/typedb_manager.py
def first(self) -> T | None:
    """Get the first matching instance."""
    # Optimize by limiting to 1
    original_limit = self._limit_val
    self._limit_val = 1
    results = self.execute()
    self._limit_val = original_limit
    return results[0] if results else None

count

count()

Count matching instances.

Note: For relation queries with role player filters, TypeDB 3.x's reduce count may not work as expected. In those cases, we fall back to fetching IIDs and counting unique results.

Source code in type_bridge/crud/typedb_manager.py
def count(self) -> int:
    """Count matching instances.

    Note: For relation queries with role player filters, TypeDB 3.x's reduce count
    may not work as expected. In those cases, we fall back to fetching IIDs and
    counting unique results.
    """
    model_class = self._manager.model_class

    # For relations with filters, fetch and count to avoid TypeDB count limitations
    if isinstance(self._manager.strategy, RelationStrategy) and self._filters:
        # Fetch only IIDs for efficiency
        results = self._manager._get_relations("$r", self._filters, self._expressions)
        return len(results)

    # Entity path or relation without filters - use reduce count
    var = "$x"

    # Build match clause with filters and expressions
    # Strategy is generic, so we need to cast appropriately based on strategy type
    if isinstance(self._manager.strategy, EntityStrategy):
        match_clause = self._manager.strategy.build_match_all(
            cast(type[Entity], model_class), var, self._filters
        )
    else:
        # RelationStrategy without filters
        assert isinstance(self._manager.strategy, RelationStrategy)
        match_clause = self._manager.strategy.build_match_all(
            cast(type[Relation], model_class), var, self._filters
        )

    if self._expressions:
        from type_bridge.query.ast import RawPattern

        for expr in self._expressions:
            # Add expressions as RawPatterns to the AST
            pattern_str = expr.to_typeql(var)
            match_clause.patterns.append(RawPattern(content=pattern_str))

    match_str = self._manager.compiler.compile(match_clause)

    # Explicitly count the variable to avoid counting joins
    reduce_str = self._manager._build_count_reduce(var)
    query = match_str + "\n" + reduce_str
    results = self._manager._execute(query, TransactionType.READ)
    if results and "count" in results[0]:
        count_val = results[0]["count"]
        # Handle wrapped value format from TypeDB driver
        if isinstance(count_val, dict) and "value" in count_val:
            return int(count_val["value"])
        if isinstance(count_val, (int, float)):
            return int(count_val)
        return int(str(count_val))
    return 0

delete

delete()

Delete all matching instances and return count.

Source code in type_bridge/crud/typedb_manager.py
def delete(self) -> int:
    """Delete all matching instances and return count."""
    instances = self.execute()
    for instance in instances:
        self._manager.delete(instance)
    return len(instances)

exists

exists()

Check if any matching instances exist.

Source code in type_bridge/crud/typedb_manager.py
def exists(self) -> bool:
    """Check if any matching instances exist."""
    return self.count() > 0

update_with

update_with(func)

Update instances by applying a function to each.

Uses atomic transaction semantics: if the function fails on any entity, no updates are persisted (all or nothing).

Note: _iid is preserved during attribute modification by the wrap validator in TypeDBType base class.

Source code in type_bridge/crud/typedb_manager.py
def update_with(self, func: Any) -> list[T]:
    """Update instances by applying a function to each.

    Uses atomic transaction semantics: if the function fails on any entity,
    no updates are persisted (all or nothing).

    Note: _iid is preserved during attribute modification by the wrap validator
    in TypeDBType base class.
    """
    instances = self.execute()
    if not instances:
        return []

    # Phase 1: Apply function to all instances FIRST
    # If any function call fails, no writes have happened yet
    for instance in instances:
        func(instance)

    # Phase 2: All functions succeeded, now persist all updates
    for instance in instances:
        self._manager.update(instance)

    return instances

aggregate

aggregate(*aggregates)

Execute aggregation queries.

Performs database-side aggregations for efficiency.

Parameters:

Name Type Description Default
*aggregates Any

AggregateExpr objects (Person.age.avg(), Person.score.sum(), etc.)

()

Returns:

Type Description
dict[str, Any]

Dictionary mapping aggregate keys to results

Examples:

Single aggregation

result = manager.filter().aggregate(Person.age.avg()) avg_age = result['avg_age']

Multiple aggregations

result = manager.filter(Person.city.eq(City("NYC"))).aggregate( Person.age.avg(), Person.score.sum(), Person.salary.max() )

Source code in type_bridge/crud/typedb_manager.py
def aggregate(self, *aggregates: Any) -> dict[str, Any]:
    """Execute aggregation queries.

    Performs database-side aggregations for efficiency.

    Args:
        *aggregates: AggregateExpr objects (Person.age.avg(), Person.score.sum(), etc.)

    Returns:
        Dictionary mapping aggregate keys to results

    Examples:
        # Single aggregation
        result = manager.filter().aggregate(Person.age.avg())
        avg_age = result['avg_age']

        # Multiple aggregations
        result = manager.filter(Person.city.eq(City("NYC"))).aggregate(
            Person.age.avg(),
            Person.score.sum(),
            Person.salary.max()
        )
    """
    from type_bridge.crud.aggregates import parse_aggregate_results
    from type_bridge.expressions import AggregateExpr
    from type_bridge.expressions.utils import generate_attr_var
    from type_bridge.query.ast import HasPattern, ReduceAssignment, ReduceClause

    if not aggregates:
        raise ValueError("At least one aggregation expression required")

    model_class = self._manager.model_class
    var = "$e"

    # Build base match clause with filters
    match_clause = self._manager.strategy.build_match_all(model_class, var, self._filters)

    # Add expression-based filters as AST patterns
    for expr in self._expressions:
        match_clause.patterns.extend(expr.to_ast(var))

    # Build reduce assignments for each aggregation
    reduce_assignments = []
    for agg in aggregates:
        if not isinstance(agg, AggregateExpr):
            raise TypeError(f"Expected AggregateExpr, got {type(agg).__name__}")

        # If this aggregation is on a specific attr_type (not count), add binding pattern
        if agg.attr_type is not None:
            attr_name = agg.attr_type.get_attribute_name()
            attr_var = generate_attr_var(var, agg.attr_type)
            match_clause.patterns.append(
                HasPattern(thing_var=var, attr_type=attr_name, attr_var=attr_var)
            )

        # Build reduce assignment using FunctionCallValue
        result_var = f"${agg.get_fetch_key()}"
        # agg.to_typeql produces something like "mean($e_age)" - use it as expression string
        reduce_assignments.append(
            ReduceAssignment(variable=result_var, expression=agg.to_typeql(var))
        )

    # Compile and execute
    match_str = self._manager.compiler.compile(match_clause)
    reduce_clause = ReduceClause(assignments=reduce_assignments)
    reduce_str = self._manager.compiler.compile(reduce_clause)
    query = f"{match_str}\n{reduce_str}"

    results = self._manager._execute(query, TransactionType.READ)
    return parse_aggregate_results(results)

group_by

group_by(*fields)

Group results by field values.

Parameters:

Name Type Description Default
*fields Any

FieldRef objects or field descriptors to group by (e.g., Person.department)

()

Returns:

Type Description
GroupByQuery[T]

GroupByQuery for chained aggregations

Example

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Source code in type_bridge/crud/typedb_manager.py
def group_by(self, *fields: Any) -> GroupByQuery[T]:
    """Group results by field values.

    Args:
        *fields: FieldRef objects or field descriptors to group by (e.g., Person.department)

    Returns:
        GroupByQuery for chained aggregations

    Example:
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
    """
    return GroupByQuery(
        self._manager,
        self._filters,
        self._expressions,
        fields,
    )