Skip to content

type_bridge.crud.typedb_manager

typedb_manager

Unified TypeDB Manager using AST and Strategies.

This module provides TypeDBManager, a unified CRUD manager that replaces the separate EntityManager and RelationManager with a single generic implementation using the Strategy pattern.

TypeDBManager

TypeDBManager(connection, model_class)

Unified CRUD manager for TypeDB entities and relations.

Source code in type_bridge/crud/typedb_manager.py
def __init__(self, connection: Connection, model_class: type[T]):
    self._connection = connection
    self._executor = ConnectionExecutor(connection)
    self.model_class = model_class
    self.compiler = QueryCompiler()

    self._hook_runner = HookRunner()

    # Select strategy
    if issubclass(model_class, Entity):
        self.strategy: ModelStrategy = EntityStrategy()
    elif issubclass(model_class, Relation):
        self.strategy = RelationStrategy()
    else:
        raise TypeError(f"Unsupported model type: {model_class}")

add_hook

add_hook(hook)

Register a lifecycle hook. Returns self for chaining.

Source code in type_bridge/crud/typedb_manager.py
def add_hook(self, hook: Any) -> Self:
    """Register a lifecycle hook. Returns self for chaining."""
    self._hook_runner.add(hook)
    return self

remove_hook

remove_hook(hook)

Unregister a lifecycle hook.

Source code in type_bridge/crud/typedb_manager.py
def remove_hook(self, hook: Any) -> None:
    """Unregister a lifecycle hook."""
    self._hook_runner.remove(hook)

insert

insert(instance)

Insert a new instance and populate _iid.

For entities, uses a single roundtrip (insert + fetch combined). For relations, uses two roundtrips (insert, then fetch) because TypeDB 3.x relation inserts don't bind the variable.

Source code in type_bridge/crud/typedb_manager.py
def insert(self, instance: T) -> T:
    """Insert a new instance and populate _iid.

    For entities, uses a single roundtrip (insert + fetch combined).
    For relations, uses two roundtrips (insert, then fetch) because
    TypeDB 3.x relation inserts don't bind the variable.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound
    # after insert. Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        query_parts.append(self.compiler.compile(insert_clause))
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)
        return instance

    # Entities: Combined insert + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=False)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

    return instance

get

get(**filters)

Get instances matching filters.

Source code in type_bridge/crud/typedb_manager.py
def get(self, **filters) -> list[T]:
    """Get instances matching filters."""
    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry

    # Use descriptive variable name to avoid conflicts
    var = "$rel" if isinstance(self.strategy, RelationStrategy) else "$ent"

    # Check if this is a relation (needs special handling for role players)
    if isinstance(self.strategy, RelationStrategy):
        return self._get_relations(var, filters, [])

    # Entity path: fetch with polymorphic type resolution
    base_type = self.model_class.get_type_name()

    from type_bridge.query.ast import EntityPattern

    # Build match clause with isa! for type variable binding (enables polymorphic resolution)
    # This allows us to fetch the actual concrete type using label()
    match_clause: MatchClause = self.strategy.build_match_all(self.model_class, var, filters)

    # Modify the AST to use 'isa!' and capture type variable '$t'
    # We iterate through patterns to find the main entity pattern
    for pattern in match_clause.patterns:
        if (
            isinstance(pattern, EntityPattern)
            and pattern.variable == var
            and pattern.type_name == base_type
        ):
            # We found the main pattern.
            # Transform to: $ent isa! $t
            # And add: $t sub base_type

            pattern.type_name = "$t"
            pattern.is_strict = True

            # Add sub constraint as a new pattern
            from type_bridge.query.ast import SubTypePattern

            match_clause.patterns.append(SubTypePattern(variable="$t", parent_type=base_type))
            break

    match_str = self.compiler.compile(match_clause)

    # Build fetch clause using wildcard to get all attributes including subtype-specific ones
    fetch_clause_str = self._build_wildcard_fetch(var, include_iid=True, include_type=True)
    query = match_str + "\n" + fetch_clause_str

    results = self._execute(query, TransactionType.READ)

    # Hydrate entity instances with polymorphic type resolution
    instances = []
    for result in results:
        try:
            iid = result.pop("_iid", None)
            if isinstance(iid, dict) and "value" in iid:
                iid = iid["value"]

            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            if iid:
                object.__setattr__(instance, "_iid", iid)
            instances.append(instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e

    return instances

update

update(instance)

Update an instance in the database.

Uses the Strategy pattern to identify the instance, then updates all non-key attributes to match the current state.

Parameters:

Name Type Description Default
instance T

Instance with updated values

required

Returns:

Type Description
T

The updated instance

Source code in type_bridge/crud/typedb_manager.py
def update(self, instance: T) -> T:
    """Update an instance in the database.

    Uses the Strategy pattern to identify the instance, then updates
    all non-key attributes to match the current state.

    Args:
        instance: Instance with updated values

    Returns:
        The updated instance
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_UPDATE, self.model_class, instance)

    var = "$x"
    constraints = self.strategy.identify(instance)
    all_attrs = self.model_class.get_all_attributes()

    # Build the base match clause using AST
    from type_bridge.query.ast import EntityPattern, RelationPattern

    if issubclass(self.model_class, Entity):
        pattern = EntityPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            constraints=constraints,
        )
    else:
        pattern = RelationPattern(
            variable=var,
            type_name=self.model_class.get_type_name(),
            role_players=[],
            constraints=constraints,
        )

    base_match = self.compiler.compile(MatchClause(patterns=[pattern]))
    # Remove "match\n" prefix as we'll rebuild it
    base_match_body = base_match[6:] if base_match.startswith("match\n") else base_match

    # Separate single-value and multi-value attributes
    single_value_updates: dict[str, Any] = {}
    multi_value_updates: dict[str, list[Any]] = {}
    single_value_deletes: list[str] = []

    for field_name, attr_info in all_attrs.items():
        # Skip key attributes - they identify the instance, can't be changed
        if attr_info.flags.is_key:
            continue

        value = getattr(instance, field_name, None)
        attr_name = attr_info.typ.get_attribute_name()
        is_multi = is_multi_value_attribute(attr_info.flags)

        if value is None:
            # Mark for deletion (if attribute exists)
            single_value_deletes.append(attr_name)
        elif is_multi and isinstance(value, list):
            multi_value_updates[attr_name] = value
        else:
            single_value_updates[attr_name] = value

    # Build try blocks for match clause
    try_blocks: list[str] = []

    # Add bindings for multi-value attributes with guards
    for attr_name, values in multi_value_updates.items():
        keep_literals = [format_value(v) for v in dict.fromkeys(values)]
        guard_lines = [f"not {{ ${attr_name} == {lit}; }};" for lit in keep_literals]
        try_block = "\n".join(
            [
                "try {",
                f"  {var} has {attr_name} ${attr_name};",
                *[f"  {g}" for g in guard_lines],
                "};",
            ]
        )
        try_blocks.append(try_block)

    # Add bindings for single-value updates (delete old + insert new)
    for attr_name in single_value_updates:
        try_blocks.append(f"try {{ {var} has {attr_name} $old_{attr_name}; }};")

    # Add bindings for single-value deletes
    for attr_name in single_value_deletes:
        try_blocks.append(f"try {{ {var} has {attr_name} ${attr_name}; }};")

    # Combine base match with try blocks
    if try_blocks:
        match_clause_str = base_match_body + "\n" + "\n".join(try_blocks)
    else:
        match_clause_str = base_match_body
    query_parts = [f"match\n{match_clause_str}"]

    # Build delete clause
    delete_parts = []
    for attr_name in multi_value_updates:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")
    for attr_name in single_value_updates:
        delete_parts.append(f"try {{ $old_{attr_name} of {var}; }};")
    for attr_name in single_value_deletes:
        delete_parts.append(f"try {{ ${attr_name} of {var}; }};")

    if delete_parts:
        query_parts.append("delete\n" + "\n".join(delete_parts))

    # Build insert clause
    insert_parts = []
    for attr_name, values in multi_value_updates.items():
        for value in values:
            insert_parts.append(f"{var} has {attr_name} {format_value(value)};")
    for attr_name, value in single_value_updates.items():
        insert_parts.append(f"{var} has {attr_name} {format_value(value)};")

    if insert_parts:
        query_parts.append("insert\n" + "\n".join(insert_parts))

    full_query = "\n".join(query_parts)
    logger.debug(f"Update query: {full_query}")

    self._execute(full_query, TransactionType.WRITE)
    logger.info(f"Updated: {self.model_class.__name__}")

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_UPDATE, self.model_class, instance)

    return instance

delete

delete(instance)

Delete an instance and return it.

Source code in type_bridge/crud/typedb_manager.py
def delete(self, instance: T) -> T:
    """Delete an instance and return it."""
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, instance)

    var = "$x"

    # Build AST-based match clause
    patterns: list[Pattern]
    if isinstance(instance, Entity):
        patterns = [instance.get_match_pattern(var)]
    elif isinstance(instance, Relation):
        patterns = instance.get_match_patterns(var)
    else:
        raise TypeError(f"Unexpected instance type: {type(instance)}")

    match_clause = MatchClause(patterns=patterns)
    delete_clause = DeleteClause(statements=[DeleteThingStatement(variable=var)])

    # Compile and execute
    match_str = self.compiler.compile(match_clause)
    delete_str = self.compiler.compile(delete_clause)
    query = f"{match_str}\n{delete_str}"

    self._execute(query, TransactionType.WRITE)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, instance)

    return instance

all

all()

Fetch all instances of this type.

Source code in type_bridge/crud/typedb_manager.py
def all(self) -> list[T]:
    """Fetch all instances of this type."""
    return self.get()

insert_many

insert_many(instances)

Insert multiple instances in a single query (batched).

For entities, combines all inserts into a single query for efficiency. For relations, falls back to individual inserts (due to match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def insert_many(self, instances: list[T]) -> list[T]:
    """Insert multiple instances in a single query (batched).

    For entities, combines all inserts into a single query for efficiency.
    For relations, falls back to individual inserts (due to match clause complexity).
    """
    if not instances:
        return instances

    # Check if all instances are entities (can be batched)
    # Relations need individual handling due to role player match clauses
    if all(isinstance(inst, Entity) for inst in instances):
        if self._hook_runner.has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_INSERT, self.model_class, instance)

        result = self._batch_insert_entities(instances)

        if self._hook_runner.has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_INSERT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.insert)
    for instance in instances:
        self.insert(instance)
    return instances

put

put(instance)

Insert or update an instance (idempotent) and populate _iid.

Uses TypeQL's PUT clause for idempotent insertion. For entities, uses a single roundtrip. For relations, uses two.

Source code in type_bridge/crud/typedb_manager.py
def put(self, instance: T) -> T:
    """Insert or update an instance (idempotent) and populate _iid.

    Uses TypeQL's PUT clause for idempotent insertion.
    For entities, uses a single roundtrip. For relations, uses two.
    """
    if self._hook_runner.has_hooks:
        self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

    var = "$x"

    # Relations use include_variable=False in to_ast(), so $x isn't bound.
    # Use the two-query approach for relations.
    if isinstance(instance, Relation):
        match_clause, insert_clause = self.strategy.build_insert(instance, var)
        query_parts = []
        if match_clause:
            query_parts.append(self.compiler.compile(match_clause))
        insert_query = self.compiler.compile(insert_clause)
        put_query = insert_query.replace("insert\n", "put\n", 1)
        query_parts.append(put_query)
        self._execute("\n".join(query_parts), TransactionType.WRITE)
        self._fetch_and_set_iid(instance, var)

        if self._hook_runner.has_hooks:
            self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)
        return instance

    # Entities: Combined put + fetch IID in single query
    iid = self._execute_insert_with_iid(instance, var, use_put=True)
    if iid:
        object.__setattr__(instance, "_iid", iid)
        logger.debug(f"Set _iid on instance: {iid}")
    else:
        # Fallback to separate fetch (for edge cases like types without keys)
        self._fetch_and_set_iid(instance, var)

    if self._hook_runner.has_hooks:
        self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

    return instance

delete_many

delete_many(instances, *, strict=False)

Delete multiple instances.

Optimized for batch deletion: instances with IIDs are deleted in a single query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

Parameters:

Name Type Description Default
instances list[T]

List of instances to delete

required
strict bool

If True, raise EntityNotFoundError if any entity doesn't exist. In strict mode, checks all entities first and raises before any deletion.

False

Returns:

Type Description
list[T]

List of actually-deleted entities (excludes those that didn't exist)

Source code in type_bridge/crud/typedb_manager.py
def delete_many(self, instances: list[T], *, strict: bool = False) -> list[T]:
    """Delete multiple instances.

    Optimized for batch deletion: instances with IIDs are deleted in a single
    query using disjunctive matching (OR pattern), reducing N roundtrips to 1.

    Args:
        instances: List of instances to delete
        strict: If True, raise EntityNotFoundError if any entity doesn't exist.
               In strict mode, checks all entities first and raises before any deletion.

    Returns:
        List of actually-deleted entities (excludes those that didn't exist)
    """
    if not instances:
        return instances

    from type_bridge.crud.exceptions import EntityNotFoundError

    # Separate instances by whether they have IIDs
    with_iids: list[T] = []
    without_iids: list[T] = []

    for instance in instances:
        if getattr(instance, "_iid", None):
            with_iids.append(instance)
        else:
            without_iids.append(instance)

    has_hooks = self._hook_runner.has_hooks

    # For strict mode, we need to check existence before deleting
    if strict:
        # Batch check existence for instances with IIDs
        existing_iids = self._batch_check_existence_by_iid(with_iids) if with_iids else set()
        not_found_with_iid = [inst for inst in with_iids if inst._iid not in existing_iids]

        # Check existence individually for instances without IIDs
        not_found_without_iid = [inst for inst in without_iids if not self._entity_exists(inst)]

        not_found = not_found_with_iid + not_found_without_iid
        if not_found:
            names = [str(e) for e in not_found]
            raise EntityNotFoundError(f"entity(ies) not found: {names}")

        # All exist - proceed with batch delete
        deleted: list[T] = []
        if with_iids:
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(with_iids)
            if has_hooks:
                for inst in with_iids:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(with_iids)
        for inst in without_iids:
            self.delete(inst)  # hooks fire inside self.delete()
            deleted.append(inst)
        return deleted

    # Non-strict mode: batch delete instances with IIDs, individual for others
    deleted = []

    if with_iids:
        # Batch check which ones exist
        existing_iids = self._batch_check_existence_by_iid(with_iids)
        existing_instances = [inst for inst in with_iids if inst._iid in existing_iids]

        if existing_instances:
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_pre(CrudEvent.PRE_DELETE, self.model_class, inst)
            self._batch_delete_by_iid(existing_instances)
            if has_hooks:
                for inst in existing_instances:
                    self._hook_runner.run_post(CrudEvent.POST_DELETE, self.model_class, inst)
            deleted.extend(existing_instances)

    # Handle instances without IIDs individually (hooks fire via self.delete)
    for inst in without_iids:
        if self._entity_exists(inst):
            self.delete(inst)
            deleted.append(inst)

    return deleted

put_many

put_many(instances)

Put multiple instances (idempotent insert/update).

Attempts batch operation first for efficiency. If a key constraint violation occurs (some entities exist with different data), falls back to individual operations which are idempotent.

For entities, uses batch PUT when possible (N→1 roundtrips). For relations, uses individual operations (match clause complexity).

Source code in type_bridge/crud/typedb_manager.py
def put_many(self, instances: list[T]) -> list[T]:
    """Put multiple instances (idempotent insert/update).

    Attempts batch operation first for efficiency. If a key constraint
    violation occurs (some entities exist with different data), falls back
    to individual operations which are idempotent.

    For entities, uses batch PUT when possible (N→1 roundtrips).
    For relations, uses individual operations (match clause complexity).
    """
    if not instances:
        return instances

    has_hooks = self._hook_runner.has_hooks

    # Check if all instances are entities (can attempt batch)
    if all(isinstance(inst, Entity) for inst in instances):
        if has_hooks:
            for instance in instances:
                self._hook_runner.run_pre(CrudEvent.PRE_PUT, self.model_class, instance)

        try:
            result = self._batch_insert_entities(instances, use_put=True)
        except Exception as e:
            # Check if this is a key constraint violation
            error_str = str(e)
            if "unique" in error_str.lower() or "constraint" in error_str.lower():
                logger.debug(
                    f"Batch put failed with constraint violation, falling back to individual: {e}"
                )
                # Fall back to individual operations.
                # Note: pre-hooks may fire again via self.put() — acceptable
                # since the batch operation was rolled back.
                for instance in instances:
                    self.put(instance)
                return instances
            # Re-raise other errors
            raise

        if has_hooks:
            for instance in result:
                self._hook_runner.run_post(CrudEvent.POST_PUT, self.model_class, instance)

        return result

    # Fallback for relations or mixed types (hooks fire via self.put)
    for instance in instances:
        self.put(instance)
    return instances

update_many

update_many(instances)

Update multiple instances.

Source code in type_bridge/crud/typedb_manager.py
def update_many(self, instances: list[T]) -> list[T]:
    """Update multiple instances."""
    for instance in instances:
        self.update(instance)
    return instances

get_by_iid

get_by_iid(iid)

Fetch an instance by its internal ID with polymorphic type resolution.

Source code in type_bridge/crud/typedb_manager.py
def get_by_iid(self, iid: str) -> T | None:
    """Fetch an instance by its internal ID with polymorphic type resolution."""
    import re

    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry
    from type_bridge.query.ast import (
        EntityPattern,
        IidConstraint,
        MatchClause,
        SubTypePattern,
    )

    # Validate IID format (TypeDB IIDs are hexadecimal strings starting with 0x)
    # Return None for invalid IIDs (graceful handling - treat as "not found")
    if not iid or not re.match(r"^0x[0-9a-fA-F]+$", iid):
        return None

    var = "$x"
    base_type = self.model_class.get_type_name()

    if issubclass(self.model_class, Entity):
        # Entity path: Build match clause with isa! for polymorphic type resolution
        # $x isa! $t, iid <iid>; $t sub base_type;
        entity_pattern = EntityPattern(
            variable=var,
            type_name="$t",  # Type variable for polymorphic resolution
            constraints=[IidConstraint(iid=iid)],
            is_strict=True,  # Use isa! for strict type matching
        )
        subtype_pattern = SubTypePattern(variable="$t", parent_type=base_type)
        match_clause = MatchClause(patterns=[entity_pattern, subtype_pattern])
        match_str = self.compiler.compile(match_clause)

        # Build fetch clause using wildcard to get all attributes including subtype-specific ones
        # No IID needed in fetch - we already have it from input
        fetch_clause_str = self._build_wildcard_fetch(var, include_iid=False, include_type=True)
        query = match_str + "\n" + fetch_clause_str

        results = self._execute(query, TransactionType.READ)

        if not results:
            return None

        result = results[0]
        try:
            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            # Resolve concrete class
            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label,
                        cast(tuple[type[Entity], ...], (self.model_class,)),
                    )
            else:
                concrete_class = self.model_class

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            object.__setattr__(instance, "_iid", iid)
            return cast(T | None, instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=self.model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e
    else:
        # Relation path: Use _get_relations with IID filter
        # This properly handles role player hydration
        results = self._get_relations(var, {"_iid": iid}, [])
        if results:
            return results[0]
        return None

filter

filter(*expressions, **filters)

Create a chainable query with filters.

Parameters:

Name Type Description Default
*expressions Any

Expression objects (Person.age.gt(Age(30)), etc.)

()
**filters Any

Attribute filters (exact match) - age=30, name="Alice"

{}

Returns:

Type Description
TypeDBQuery[T]

TypeDBQuery for chaining

Raises:

Type Description
ValueError

If expressions reference attribute types not owned by the model

Source code in type_bridge/crud/typedb_manager.py
def filter(self, *expressions: Any, **filters: Any) -> TypeDBQuery[T]:
    """Create a chainable query with filters.

    Args:
        *expressions: Expression objects (Person.age.gt(Age(30)), etc.)
        **filters: Attribute filters (exact match) - age=30, name="Alice"

    Returns:
        TypeDBQuery for chaining

    Raises:
        ValueError: If expressions reference attribute types not owned by the model
    """
    # Validate expressions reference owned attribute types
    if expressions:
        owned_attrs = self.model_class.get_all_attributes()
        owned_attr_types = {attr_info.typ for attr_info in owned_attrs.values()}

        from type_bridge.expressions.role_player import RolePlayerExpr

        for expr in expressions:
            # Skip RolePlayerExpr - they reference player attributes, not relation attributes
            if isinstance(expr, RolePlayerExpr):
                continue

            # Get attribute types from expression
            expr_attr_types = expr.get_attribute_types()

            # Check if all attribute types are owned by the model
            for attr_type in expr_attr_types:
                if attr_type not in owned_attr_types:
                    raise ValueError(
                        f"{self.model_class.__name__} does not own attribute type {attr_type.__name__}. "
                        f"Available attribute types: {', '.join(t.__name__ for t in owned_attr_types)}"
                    )

    return TypeDBQuery(self, filters, list(expressions))

count

count(**filters)

Count all instances of this type, optionally filtering.

Source code in type_bridge/crud/typedb_manager.py
def count(self, **filters) -> int:
    """Count all instances of this type, optionally filtering."""
    var = "$x"
    match_clause = self.strategy.build_match_all(self.model_class, var, filters)
    match_str = self.compiler.compile(match_clause)
    reduce_str = self._build_count_reduce(var)
    query = match_str + "\n" + reduce_str

    results = self._execute(query, TransactionType.READ)
    if results and "count" in results[0]:
        count_val = results[0]["count"]
        # Handle wrapped value format from TypeDB driver
        if isinstance(count_val, dict) and "value" in count_val:
            return int(count_val["value"])
        if isinstance(count_val, (int, float)):
            return int(count_val)
        return int(str(count_val))
    return 0

group_by

group_by(*fields)

Group results by field values and compute aggregations.

Parameters:

Name Type Description Default
*fields Any

Field descriptors to group by (e.g., Person.department)

()

Returns:

Type Description
GroupByQuery[T]

GroupByQuery for chained aggregations

Example
Group by department, compute average age per department

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Returns: {
"Engineering": {"avg_age": 35.5},
"Sales":
}
Source code in type_bridge/crud/typedb_manager.py
def group_by(self, *fields: Any) -> GroupByQuery[T]:
    """Group results by field values and compute aggregations.

    Args:
        *fields: Field descriptors to group by (e.g., Person.department)

    Returns:
        GroupByQuery for chained aggregations

    Example:
        # Group by department, compute average age per department
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
        # Returns: {
        #   "Engineering": {"avg_age": 35.5},
        #   "Sales": {"avg_age": 28.3}
        # }
    """
    return GroupByQuery(self, {}, [], fields)

TypeDBQuery

TypeDBQuery(manager, filters, expressions=None)

Chainable query builder for TypeDBManager.

Source code in type_bridge/crud/typedb_manager.py
def __init__(
    self,
    manager: TypeDBManager[T],
    filters: dict[str, Any],
    expressions: list[Any] | None = None,
):
    self._manager = manager
    self._filters = filters
    self._expressions: list[Any] = expressions or []
    self._order_fields: list[tuple[str, bool]] = []  # (field, descending)
    self._limit_val: int | None = None
    self._offset_val: int | None = None

filter

filter(*expressions, **filters)

Add additional filters to the query.

Validates that expression attribute types are owned by the model class.

Source code in type_bridge/crud/typedb_manager.py
def filter(self, *expressions: Any, **filters: Any) -> TypeDBQuery[T]:
    """Add additional filters to the query.

    Validates that expression attribute types are owned by the model class.
    """
    # Validate expressions reference owned attribute types
    if expressions:
        model_class = self._manager.model_class
        owned_attrs = model_class.get_all_attributes()
        owned_attr_types = {attr_info.typ for attr_info in owned_attrs.values()}

        # For relations, also include role player attribute types
        # (RolePlayerExpr inner expressions will be validated separately)
        from type_bridge.expressions.role_player import RolePlayerExpr

        for expr in expressions:
            # Skip RolePlayerExpr - they reference player attributes, not relation attributes
            if isinstance(expr, RolePlayerExpr):
                continue

            # Get attribute types from expression
            expr_attr_types = expr.get_attribute_types()

            # Check if all attribute types are owned by the model
            for attr_type in expr_attr_types:
                if attr_type not in owned_attr_types:
                    raise ValueError(
                        f"{model_class.__name__} does not own attribute type {attr_type.__name__}. "
                        f"Available attribute types: {', '.join(t.__name__ for t in owned_attr_types)}"
                    )

    self._expressions.extend(expressions)
    self._filters.update(filters)
    return self

order_by

order_by(*fields)

Order results by fields. Prefix with '-' for descending.

Source code in type_bridge/crud/typedb_manager.py
def order_by(self, *fields: str) -> TypeDBQuery[T]:
    """Order results by fields. Prefix with '-' for descending."""
    for field in fields:
        if field.startswith("-"):
            self._order_fields.append((field[1:], True))
        else:
            self._order_fields.append((field, False))
    return self

limit

limit(n)

Limit number of results.

Source code in type_bridge/crud/typedb_manager.py
def limit(self, n: int) -> TypeDBQuery[T]:
    """Limit number of results."""
    self._limit_val = n
    return self

offset

offset(n)

Skip first n results.

Source code in type_bridge/crud/typedb_manager.py
def offset(self, n: int) -> TypeDBQuery[T]:
    """Skip first n results."""
    self._offset_val = n
    return self

execute

execute()

Execute the query and return results.

Source code in type_bridge/crud/typedb_manager.py
def execute(self) -> list[T]:
    """Execute the query and return results."""
    from type_bridge.crud.role_players import resolve_entity_class_from_label
    from type_bridge.models.registry import ModelRegistry

    model_class = self._manager.model_class

    # Relations need special handling for role player fetching
    if isinstance(self._manager.strategy, RelationStrategy):
        return self._manager._get_relations(
            "$r",
            self._filters,
            self._expressions,
            order_by=self._order_fields if self._order_fields else None,
            limit=self._limit_val,
            offset=self._offset_val,
        )

    # Entity path with polymorphic type resolution
    var = "$x"
    base_type = model_class.get_type_name()

    # Parse Django-style lookup filters (e.g., name__startswith="Al")
    base_filters, lookup_expressions = self._parse_entity_lookup_filters(
        model_class, self._filters
    )
    all_expressions = list(self._expressions) + lookup_expressions

    from type_bridge.query.ast import EntityPattern, SubTypePattern

    # Build base match clause using parsed filters (exact match only)
    match_clause = self._manager.strategy.build_match_all(model_class, var, base_filters)

    # Add expression patterns to match clause (includes parsed lookup expressions)
    if all_expressions:
        for expr in all_expressions:
            match_clause.patterns.extend(expr.to_ast(var))

    # Add type variable binding for polymorphic resolution
    # Transform "$x isa type" to "$x isa! $t; $t sub type"

    for pattern in match_clause.patterns:
        if (
            isinstance(pattern, EntityPattern)
            and pattern.variable == var
            and pattern.type_name == base_type
        ):
            pattern.type_name = "$t"
            pattern.is_strict = True
            match_clause.patterns.append(SubTypePattern(variable="$t", parent_type=base_type))
            break

    # Add sort variable bindings to match clause (TypeDB 3.x requirement)
    # These must be added to the match clause BEFORE compilation
    modifier_clauses = []
    sort_parts = []
    all_attrs = model_class.get_all_attributes()
    if self._order_fields:
        for i, (field_name, desc) in enumerate(self._order_fields):
            if field_name in all_attrs:
                attr_name = all_attrs[field_name].typ.get_attribute_name()
                sort_var = f"$sort_{i}"
                # Bind attribute to variable in match clause
                from type_bridge.query.ast import HasPattern

                match_clause.patterns.append(
                    HasPattern(thing_var=var, attr_type=attr_name, attr_var=sort_var)
                )
                direction = "desc" if desc else "asc"
                sort_parts.append(f"{sort_var} {direction}")
        if sort_parts:
            modifier_clauses.append(f"sort {', '.join(sort_parts)};")

    match_str = self._manager.compiler.compile(match_clause)

    # Build fetch clause using wildcard to get all attributes including subtype-specific ones
    fetch_clause_str = self._manager._build_wildcard_fetch(
        var, include_iid=True, include_type=True
    )

    # offset must come BEFORE limit
    if self._offset_val is not None:
        modifier_clauses.append(f"offset {self._offset_val};")
    if self._limit_val is not None:
        modifier_clauses.append(f"limit {self._limit_val};")

    modifier_str = "\n".join(modifier_clauses)

    # Build final query: match ; modifiers ; fetch
    if modifier_str:
        query = match_str + "\n" + modifier_str + "\n" + fetch_clause_str
    else:
        query = match_str + "\n" + fetch_clause_str

    # Execute
    results = self._manager._execute(query, TransactionType.READ)

    # Hydrate objects with polymorphic type resolution
    instances = []
    entity_model = cast(type[Entity], model_class)
    for result in results:
        try:
            iid = result.pop("_iid", None)
            # Handle wrapped IID format from TypeDB driver
            if isinstance(iid, dict) and "value" in iid:
                iid = iid["value"]

            # Get actual type label for polymorphic resolution
            type_label = result.pop("_type", None)

            # Extract attributes from nested "attributes" key (wildcard fetch structure)
            attrs = result.pop("attributes", result)

            # Resolve concrete class from type label
            if type_label and type_label != base_type:
                concrete_class = ModelRegistry.get(type_label)
                if concrete_class is None:
                    concrete_class = resolve_entity_class_from_label(
                        type_label, (entity_model,)
                    )
            else:
                concrete_class = entity_model

            assert concrete_class is not None, "Failed to resolve concrete class"
            entity_class = cast(type[Entity], concrete_class)
            instance = entity_class.from_dict(attrs, strict=False)
            if iid:
                object.__setattr__(instance, "_iid", iid)
            instances.append(instance)
        except Exception as e:
            from type_bridge.crud.exceptions import HydrationError

            raise HydrationError(
                model_type=model_class.__name__,
                raw_data=result,
                cause=e,
            ) from e

    return instances

all

all()

Alias for execute().

Source code in type_bridge/crud/typedb_manager.py
def all(self) -> list[T]:
    """Alias for execute()."""
    return self.execute()

first

first()

Get the first matching instance.

Source code in type_bridge/crud/typedb_manager.py
def first(self) -> T | None:
    """Get the first matching instance."""
    # Optimize by limiting to 1
    original_limit = self._limit_val
    self._limit_val = 1
    results = self.execute()
    self._limit_val = original_limit
    return results[0] if results else None

count

count()

Count matching instances.

Note: For relation queries with role player filters, TypeDB 3.x's reduce count may not work as expected. In those cases, we fall back to fetching IIDs and counting unique results.

Source code in type_bridge/crud/typedb_manager.py
def count(self) -> int:
    """Count matching instances.

    Note: For relation queries with role player filters, TypeDB 3.x's reduce count
    may not work as expected. In those cases, we fall back to fetching IIDs and
    counting unique results.
    """
    model_class = self._manager.model_class

    # For relations with filters, fetch and count to avoid TypeDB count limitations
    if isinstance(self._manager.strategy, RelationStrategy) and self._filters:
        # Fetch only IIDs for efficiency
        results = self._manager._get_relations("$r", self._filters, self._expressions)
        return len(results)

    # Entity path or relation without filters - use reduce count
    var = "$x"

    # Build match clause with filters and expressions
    # Strategy is generic, so we need to cast appropriately based on strategy type
    if isinstance(self._manager.strategy, EntityStrategy):
        match_clause = self._manager.strategy.build_match_all(
            cast(type[Entity], model_class), var, self._filters
        )
    else:
        # RelationStrategy without filters
        assert isinstance(self._manager.strategy, RelationStrategy)
        match_clause = self._manager.strategy.build_match_all(
            cast(type[Relation], model_class), var, self._filters
        )

    if self._expressions:
        from type_bridge.query.ast import RawPattern

        for expr in self._expressions:
            # Add expressions as RawPatterns to the AST
            pattern_str = expr.to_typeql(var)
            match_clause.patterns.append(RawPattern(content=pattern_str))

    match_str = self._manager.compiler.compile(match_clause)

    # Explicitly count the variable to avoid counting joins
    reduce_str = self._manager._build_count_reduce(var)
    query = match_str + "\n" + reduce_str
    results = self._manager._execute(query, TransactionType.READ)
    if results and "count" in results[0]:
        count_val = results[0]["count"]
        # Handle wrapped value format from TypeDB driver
        if isinstance(count_val, dict) and "value" in count_val:
            return int(count_val["value"])
        if isinstance(count_val, (int, float)):
            return int(count_val)
        return int(str(count_val))
    return 0

delete

delete()

Delete all matching instances and return count.

Source code in type_bridge/crud/typedb_manager.py
def delete(self) -> int:
    """Delete all matching instances and return count."""
    instances = self.execute()
    for instance in instances:
        self._manager.delete(instance)
    return len(instances)

exists

exists()

Check if any matching instances exist.

Source code in type_bridge/crud/typedb_manager.py
def exists(self) -> bool:
    """Check if any matching instances exist."""
    return self.count() > 0

update_with

update_with(func)

Update instances by applying a function to each.

Uses atomic transaction semantics: if the function fails on any entity, no updates are persisted (all or nothing).

Note: _iid is preserved during attribute modification by the wrap validator in TypeDBType base class.

Source code in type_bridge/crud/typedb_manager.py
def update_with(self, func: Any) -> list[T]:
    """Update instances by applying a function to each.

    Uses atomic transaction semantics: if the function fails on any entity,
    no updates are persisted (all or nothing).

    Note: _iid is preserved during attribute modification by the wrap validator
    in TypeDBType base class.
    """
    instances = self.execute()
    if not instances:
        return []

    # Phase 1: Apply function to all instances FIRST
    # If any function call fails, no writes have happened yet
    for instance in instances:
        func(instance)

    # Phase 2: All functions succeeded, now persist all updates
    for instance in instances:
        self._manager.update(instance)

    return instances

aggregate

aggregate(*aggregates)

Execute aggregation queries.

Performs database-side aggregations for efficiency.

Parameters:

Name Type Description Default
*aggregates Any

AggregateExpr objects (Person.age.avg(), Person.score.sum(), etc.)

()

Returns:

Type Description
dict[str, Any]

Dictionary mapping aggregate keys to results

Examples:

Single aggregation

result = manager.filter().aggregate(Person.age.avg()) avg_age = result['avg_age']

Multiple aggregations

result = manager.filter(Person.city.eq(City("NYC"))).aggregate( Person.age.avg(), Person.score.sum(), Person.salary.max() )

Source code in type_bridge/crud/typedb_manager.py
def aggregate(self, *aggregates: Any) -> dict[str, Any]:
    """Execute aggregation queries.

    Performs database-side aggregations for efficiency.

    Args:
        *aggregates: AggregateExpr objects (Person.age.avg(), Person.score.sum(), etc.)

    Returns:
        Dictionary mapping aggregate keys to results

    Examples:
        # Single aggregation
        result = manager.filter().aggregate(Person.age.avg())
        avg_age = result['avg_age']

        # Multiple aggregations
        result = manager.filter(Person.city.eq(City("NYC"))).aggregate(
            Person.age.avg(),
            Person.score.sum(),
            Person.salary.max()
        )
    """
    from type_bridge.crud.aggregates import parse_aggregate_results
    from type_bridge.expressions import AggregateExpr
    from type_bridge.expressions.utils import generate_attr_var
    from type_bridge.query.ast import HasPattern, ReduceAssignment, ReduceClause

    if not aggregates:
        raise ValueError("At least one aggregation expression required")

    model_class = self._manager.model_class
    var = "$e"

    # Build base match clause with filters
    match_clause = self._manager.strategy.build_match_all(model_class, var, self._filters)

    # Add expression-based filters as AST patterns
    for expr in self._expressions:
        match_clause.patterns.extend(expr.to_ast(var))

    # Build reduce assignments for each aggregation
    reduce_assignments = []
    for agg in aggregates:
        if not isinstance(agg, AggregateExpr):
            raise TypeError(f"Expected AggregateExpr, got {type(agg).__name__}")

        # If this aggregation is on a specific attr_type (not count), add binding pattern
        if agg.attr_type is not None:
            attr_name = agg.attr_type.get_attribute_name()
            attr_var = generate_attr_var(var, agg.attr_type)
            match_clause.patterns.append(
                HasPattern(thing_var=var, attr_type=attr_name, attr_var=attr_var)
            )

        # Build reduce assignment using FunctionCallValue
        result_var = f"${agg.get_fetch_key()}"
        # agg.to_typeql produces something like "mean($e_age)" - use it as expression string
        reduce_assignments.append(
            ReduceAssignment(variable=result_var, expression=agg.to_typeql(var))
        )

    # Compile and execute
    match_str = self._manager.compiler.compile(match_clause)
    reduce_clause = ReduceClause(assignments=reduce_assignments)
    reduce_str = self._manager.compiler.compile(reduce_clause)
    query = f"{match_str}\n{reduce_str}"

    results = self._manager._execute(query, TransactionType.READ)
    return parse_aggregate_results(results)

group_by

group_by(*fields)

Group results by field values.

Parameters:

Name Type Description Default
*fields Any

FieldRef objects or field descriptors to group by (e.g., Person.department)

()

Returns:

Type Description
GroupByQuery[T]

GroupByQuery for chained aggregations

Example

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Source code in type_bridge/crud/typedb_manager.py
def group_by(self, *fields: Any) -> GroupByQuery[T]:
    """Group results by field values.

    Args:
        *fields: FieldRef objects or field descriptors to group by (e.g., Person.department)

    Returns:
        GroupByQuery for chained aggregations

    Example:
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
    """
    return GroupByQuery(
        self._manager,
        self._filters,
        self._expressions,
        fields,
    )

GroupByQuery

GroupByQuery(manager, filters, expressions, group_fields)

Query for grouped aggregations.

Allows grouping entities by field values and computing aggregations per group.

Initialize grouped query.

Parameters:

Name Type Description Default
manager TypeDBManager[T]

TypeDBManager instance

required
filters dict[str, Any]

Dict-based filters

required
expressions list[Any]

Expression-based filters

required
group_fields tuple[Any, ...]

Fields to group by (FieldRef or field descriptors)

required
Source code in type_bridge/crud/typedb_manager.py
def __init__(
    self,
    manager: TypeDBManager[T],
    filters: dict[str, Any],
    expressions: list[Any],
    group_fields: tuple[Any, ...],
):
    """Initialize grouped query.

    Args:
        manager: TypeDBManager instance
        filters: Dict-based filters
        expressions: Expression-based filters
        group_fields: Fields to group by (FieldRef or field descriptors)
    """
    self._manager = manager
    self._filters = filters
    self._expressions = expressions
    self._group_fields = group_fields

aggregate

aggregate(*aggregates)

Execute grouped aggregation.

Parameters:

Name Type Description Default
*aggregates Any

AggregateExpr objects

()

Returns:

Type Description
dict[Any, dict[str, Any]]

Dictionary mapping group values to aggregation results

Example
Group by department, compute average age per department

result = manager.group_by(Person.department).aggregate(Person.age.avg())

Returns: {
"Engineering": {"avg_age": 35.5},
"Sales":
}
Source code in type_bridge/crud/typedb_manager.py
def aggregate(self, *aggregates: Any) -> dict[Any, dict[str, Any]]:
    """Execute grouped aggregation.

    Args:
        *aggregates: AggregateExpr objects

    Returns:
        Dictionary mapping group values to aggregation results

    Example:
        # Group by department, compute average age per department
        result = manager.group_by(Person.department).aggregate(Person.age.avg())
        # Returns: {
        #   "Engineering": {"avg_age": 35.5},
        #   "Sales": {"avg_age": 28.3}
        # }
    """
    from type_bridge.crud.aggregates import parse_grouped_aggregate_results
    from type_bridge.expressions import AggregateExpr
    from type_bridge.expressions.utils import generate_attr_var
    from type_bridge.fields.base import FieldRef
    from type_bridge.query.ast import HasPattern, ReduceAssignment, ReduceClause

    if not aggregates:
        raise ValueError("At least one aggregation expression required")

    model_class = self._manager.model_class
    var = "$e"

    # Build base match clause with filters
    match_clause = self._manager.strategy.build_match_all(model_class, var, self._filters)

    # Add expression-based filters as AST patterns
    for expr in self._expressions:
        match_clause.patterns.extend(expr.to_ast(var))

    # Add group-by fields to match clause as AST patterns
    group_vars = []
    for i, field in enumerate(self._group_fields):
        var_name = f"$group{i}"
        # Field can be a FieldRef or a field descriptor
        if isinstance(field, FieldRef):
            attr_name = field.attr_type.get_attribute_name()
        else:
            # Assume it's a field descriptor with attr_type attribute
            attr_name = field.attr_type.get_attribute_name()
        match_clause.patterns.append(
            HasPattern(thing_var=var, attr_type=attr_name, attr_var=var_name)
        )
        group_vars.append(var_name)

    # Build reduce assignments for each aggregation
    reduce_assignments = []
    for agg in aggregates:
        if not isinstance(agg, AggregateExpr):
            raise TypeError(f"Expected AggregateExpr, got {type(agg).__name__}")

        # If this aggregation is on a specific attr_type (not count), add binding pattern
        if agg.attr_type is not None:
            attr_name = agg.attr_type.get_attribute_name()
            attr_var = generate_attr_var(var, agg.attr_type)
            match_clause.patterns.append(
                HasPattern(thing_var=var, attr_type=attr_name, attr_var=attr_var)
            )

        # Build reduce assignment
        result_var = f"${agg.get_fetch_key()}"
        reduce_assignments.append(
            ReduceAssignment(variable=result_var, expression=agg.to_typeql(var))
        )

    # Compile and execute with group-by
    match_str = self._manager.compiler.compile(match_clause)
    group_clause = ", ".join(group_vars)
    reduce_clause = ReduceClause(assignments=reduce_assignments, group_by=group_clause)
    reduce_str = self._manager.compiler.compile(reduce_clause)
    query = f"{match_str}\n{reduce_str}"

    results = self._manager._execute(query, TransactionType.READ)
    return parse_grouped_aggregate_results(results, group_vars)