def build_fetch_query(
self,
model_class: type[Relation],
var: str,
filters: dict[str, Any],
expressions: list[Any],
order_by: list[tuple[str, bool]] | None = None,
limit: int | None = None,
offset: int | None = None,
) -> str:
"""Build a complete fetch query for relations including role players.
Uses the TypeDB 3.x pattern with isa! for type variable binding to enable
label() fetching for polymorphic type resolution.
Args:
model_class: The relation class
var: Variable name for the relation (e.g., "$r")
filters: Attribute and role player filters
expressions: Expression objects for filtering
order_by: List of (field_name, descending) tuples for sorting
limit: Maximum number of results
offset: Number of results to skip
Returns:
Complete TypeQL query string
"""
from type_bridge.crud.formatting import format_value
from type_bridge.crud.types import is_multi_value_attribute
from type_bridge.crud.utils import build_role_player_fetch_items
roles = getattr(model_class, "_roles", {})
owned_attrs = model_class.get_all_attributes()
base_type = model_class.get_type_name()
# Build role player variables
role_vars: dict[str, str] = {} # role_name -> var
role_info: dict[str, tuple[str, tuple[type, ...]]] = {} # for fetch items
for role_name, role in roles.items():
role_var = f"${role_name}"
role_vars[role_name] = role_var
role_info[role_name] = (role_var, role.player_entity_types)
constrained_roles = set()
for field_name in filters:
if field_name in roles:
constrained_roles.add(field_name)
for field_name, _ in order_by or []:
role_name = field_name.split("__", 1)[0]
if role_name in roles:
constrained_roles.add(role_name)
if expressions:
from type_bridge.expressions.role_player import RolePlayerExpr
constrained_roles.update(
expr.role_name for expr in expressions if isinstance(expr, RolePlayerExpr)
)
# Build match clause using isa! pattern for type variable binding
# This enables label($t) to fetch the actual type name
required_role_names = [
role_name
for role_name, role in roles.items()
if not role.is_optional or role_name in constrained_roles
]
optional_role_names = [
role_name
for role_name, role in roles.items()
if role.is_optional and role_name not in constrained_roles
]
role_player_parts = [
f"{roles[role_name].role_name}: {role_vars[role_name]}"
for role_name in required_role_names
]
roles_str = ", ".join(role_player_parts)
# Use isa! to bind exact type to $t for label() function
relation_pattern = f"{var} isa! $t"
if roles_str:
relation_pattern += f" ({roles_str})"
match_clauses = [relation_pattern, f"$t sub {base_type}"]
# Add type variable bindings for required role players to enable label() fetch
for role_name in required_role_names:
role_var = role_vars[role_name]
type_var = f"{role_var}_type"
match_clauses.append(f"{role_var} isa! {type_var}")
# Optional roles must not participate in the primary relation pattern.
# Matching them inside try blocks lets relation instances with no player
# for that role remain visible while still fetching players when present.
for role_name in optional_role_names:
role = roles[role_name]
role_var = role_vars[role_name]
type_var = f"{role_var}_type"
match_clauses.append(
"try {\n"
f" {var} links ({role.role_name}: {role_var});\n"
f" {role_var} isa! {type_var};\n"
"}"
)
# Add attribute filter clauses to relation match
for field_name, value in filters.items():
if field_name in owned_attrs:
attr_info = owned_attrs[field_name]
attr_name = attr_info.typ.get_attribute_name()
raw_val = value.value if hasattr(value, "value") else value
match_clauses.append(f"{var} has {attr_name} {format_value(raw_val)}")
# Add role player filter clauses (handles both Entity and Relation role players)
for role_name, value in filters.items():
if role_name in roles and hasattr(value, "get_type_name"):
player = value
player_type = player.get_type_name()
role_var = f"${role_name}"
# Get player identification (IID or keys for entities)
if player._iid:
match_clauses.append(f"{role_var} isa {player_type}, iid {player._iid}")
elif hasattr(player, "_build_identification_constraints"):
# Entity: use key attributes
key_match = f"{role_var} isa {player_type}"
for field_name, attr_info in player.get_all_attributes().items():
if attr_info.flags.is_key:
attr_value = getattr(player, field_name, None)
if attr_value is not None:
attr_name = attr_info.typ.get_attribute_name()
raw_val = (
attr_value.value if hasattr(attr_value, "value") else attr_value
)
key_match += f", has {attr_name} {format_value(raw_val)}"
match_clauses.append(key_match)
else:
# Relation as role player without IID - can't identify
raise ValueError(
f"Relation role player '{role_name}' cannot be identified: "
f"no _iid set. Fetch the relation from the database first."
)
match_clause = "match\n" + ";\n".join(match_clauses) + ";"
# Add expression patterns
if expressions:
from type_bridge.expressions.role_player import RolePlayerExpr
match_clause = match_clause.rstrip(";")
for expr in expressions:
# For RolePlayerExpr, use the role player variable instead of relation variable
if isinstance(expr, RolePlayerExpr):
role_var = role_vars.get(expr.role_name, f"${expr.role_name}")
pattern = expr.to_typeql(role_var)
else:
pattern = expr.to_typeql(var)
match_clause += f";\n{pattern}"
match_clause += ";"
# Build fetch clause with type label and role player fetch items
fetch_items = [f'"_iid": iid({var})', '"_type": label($t)']
# Add relation attributes
for field_name, attr_info in owned_attrs.items():
attr_name = attr_info.typ.get_attribute_name()
if is_multi_value_attribute(attr_info.flags):
fetch_items.append(f'"{attr_name}": [{var}.{attr_name}]')
else:
fetch_items.append(f'"{attr_name}": {var}.{attr_name}')
# Add role player fetch items (IID, type label, and attributes)
fetch_items.extend(build_role_player_fetch_items(role_info))
# Build modifiers (sort, offset, limit)
# For relations, we need to bind sort attributes to variables in the match clause
modifier_clauses = []
sort_var_bindings = []
if order_by:
sort_parts = []
for i, (field_name, desc) in enumerate(order_by):
sort_var = f"$sort_{i}"
direction = "desc" if desc else "asc"
# Check for role player attribute syntax: role__attribute
if "__" in field_name:
role_name, attr_field = field_name.split("__", 1)
if role_name in roles:
role_var = role_vars.get(role_name, f"${role_name}")
# Get the role player entity types to find the attribute
role = roles[role_name]
player_types = role.player_entity_types
# Look up the attribute in the player entity types
for player_type in player_types:
player_attrs = player_type.get_all_attributes()
if attr_field in player_attrs:
attr_name = player_attrs[attr_field].typ.get_attribute_name()
sort_var_bindings.append(f"{role_var} has {attr_name} {sort_var}")
sort_parts.append(f"{sort_var} {direction}")
break
# Direct relation attribute
elif field_name in owned_attrs:
attr_name = owned_attrs[field_name].typ.get_attribute_name()
# Add binding to match clause
sort_var_bindings.append(f"{var} has {attr_name} {sort_var}")
sort_parts.append(f"{sort_var} {direction}")
if sort_parts:
modifier_clauses.append(f"sort {', '.join(sort_parts)};")
# Add sort variable bindings to match clause
if sort_var_bindings:
match_clause = match_clause.rstrip(";") + ";\n" + ";\n".join(sort_var_bindings) + ";"
if offset is not None:
modifier_clauses.append(f"offset {offset};")
if limit is not None:
modifier_clauses.append(f"limit {limit};")
modifier_str = "\n".join(modifier_clauses)
fetch_clause = "fetch {\n " + ",\n ".join(fetch_items) + "\n}"
# Build query: match + modifiers + fetch
if modifier_str:
return match_clause + "\n" + modifier_str + "\n" + fetch_clause + ";"
return match_clause + "\n" + fetch_clause + ";"