Skip to content

type_bridge.migration

migration

TypeBridge Migration System.

A Django-style migration framework for TypeDB schemas with state tracking, rollback support, and CLI tools.

Example - Migration file (migrations/0001_initial.py): from typing import ClassVar from type_bridge.migration import Migration from type_bridge.models import Entity, Relation from myapp.models import Person, Company, Employment

class InitialMigration(Migration):
    dependencies: ClassVar[list[tuple[str, str]]] = []
    models: ClassVar[list[type[Entity | Relation]]] = [Person, Company, Employment]

Example - Operations-based migration (migrations/0002_add_phone.py): from typing import ClassVar from type_bridge.migration import Migration, operations as ops from type_bridge.migration.operations import Operation from myapp.models import Person, Phone

class AddPhoneMigration(Migration):
    dependencies: ClassVar[list[tuple[str, str]]] = [("migrations", "0001_initial")]
    operations: ClassVar[list[Operation]] = [
        ops.AddAttribute(Phone),
        ops.AddOwnership(Person, Phone, optional=True),
    ]
Example - Programmatic API

from pathlib import Path from type_bridge.migration import MigrationExecutor from type_bridge.session import Database

db = Database(address="localhost:1729", database="mydb") db.connect()

executor = MigrationExecutor(db, Path("migrations"))

Apply all pending migrations

executor.migrate()

Migrate to specific version

executor.migrate(target="0002_add_phone")

Show migration status

for name, is_applied in executor.showmigrations(): print(f"[{'X' if is_applied else ' '}] {name}")

Preview TypeQL

print(executor.sqlmigrate("0002_add_phone"))

CLI Usage

Apply all pending migrations

python -m type_bridge.migration migrate

Migrate to specific version

python -m type_bridge.migration migrate 0002_add_phone

Show migration status

python -m type_bridge.migration showmigrations

Preview TypeQL

python -m type_bridge.migration sqlmigrate 0002_add_phone

Generate migration from model changes

python -m type_bridge.migration makemigrations --name add_phone --models myapp.models

Migration

Base class for migration scripts.

Migrations define schema changes that can be applied to a TypeDB database. They can be either model-based (for initial migrations) or operation-based (for incremental changes).

Model-based Migration Example

class InitialMigration(Migration): dependencies = [] models = [Person, Company, Employment]

Operation-based Migration Example

class AddPhoneMigration(Migration): dependencies = [("myapp", "0001_initial")] operations = [ ops.AddAttribute(Phone), ops.AddOwnership(Person, Phone, optional=True), ]

Attributes:

Name Type Description
name str

Migration name (auto-populated from filename)

app_label str

Application label (auto-populated from directory)

dependencies list[tuple[str, str]]

List of (app_label, migration_name) tuples

models list[type[Entity | Relation]]

List of Entity/Relation classes for initial migrations

operations list[Operation]

List of Operation instances for incremental migrations

reversible bool

Whether the migration can be rolled back

get_dependencies

get_dependencies()

Get dependencies as MigrationDependency objects.

Returns:

Type Description
list[MigrationDependency]

List of MigrationDependency instances

Source code in type_bridge/migration/base.py
def get_dependencies(self) -> list[MigrationDependency]:
    """Get dependencies as MigrationDependency objects.

    Returns:
        List of MigrationDependency instances
    """
    return [MigrationDependency(app, name) for app, name in self.dependencies]

describe

describe()

Generate a human-readable description of this migration.

Returns:

Type Description
str

Description string

Source code in type_bridge/migration/base.py
def describe(self) -> str:
    """Generate a human-readable description of this migration.

    Returns:
        Description string
    """
    if self.models:
        model_names = [m.__name__ for m in self.models]
        return f"Initial migration with models: {', '.join(model_names)}"
    elif self.operations:
        op_count = len(self.operations)
        return f"Migration with {op_count} operation(s)"
    else:
        return "Empty migration"

MigrationDependency dataclass

MigrationDependency(app_label, migration_name)

Reference to another migration.

Example

dep = MigrationDependency("myapp", "0001_initial") str(dep) # "myapp.0001_initial"

BreakingChangeAnalyzer

Analyzes schema diffs to classify changes by severity.

Classification rules: - SAFE: Adding new types, widening role player types - WARNING: Adding required attributes to existing types - BREAKING: Removing types, narrowing role player types, removing roles

Example

analyzer = BreakingChangeAnalyzer() diff = old_schema.compare(new_schema) changes = analyzer.analyze(diff)

for change in changes: print(f"[{change.category.value}] {change.description}") print(f" Recommendation: {change.recommendation}")

analyze

analyze(diff)

Classify all changes in the schema diff.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
list[ClassifiedChange]

List of classified changes with recommendations

Source code in type_bridge/migration/breaking.py
def analyze(self, diff: SchemaDiff) -> list[ClassifiedChange]:
    """Classify all changes in the schema diff.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        List of classified changes with recommendations
    """
    changes: list[ClassifiedChange] = []

    # Analyze entity changes
    changes.extend(self._analyze_entity_changes(diff))

    # Analyze relation changes
    changes.extend(self._analyze_relation_changes(diff))

    # Analyze attribute changes
    changes.extend(self._analyze_attribute_changes(diff))

    return changes

has_breaking_changes

has_breaking_changes(diff)

Quick check for any breaking changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
bool

True if any breaking changes exist

Source code in type_bridge/migration/breaking.py
def has_breaking_changes(self, diff: SchemaDiff) -> bool:
    """Quick check for any breaking changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        True if any breaking changes exist
    """
    classified = self.analyze(diff)
    return any(c.category == ChangeCategory.BREAKING for c in classified)

has_warnings

has_warnings(diff)

Quick check for any warning-level changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
bool

True if any warnings exist

Source code in type_bridge/migration/breaking.py
def has_warnings(self, diff: SchemaDiff) -> bool:
    """Quick check for any warning-level changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        True if any warnings exist
    """
    classified = self.analyze(diff)
    return any(c.category == ChangeCategory.WARNING for c in classified)

get_breaking_changes

get_breaking_changes(diff)

Get only breaking changes from the diff.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
list[ClassifiedChange]

List of breaking changes only

Source code in type_bridge/migration/breaking.py
def get_breaking_changes(self, diff: SchemaDiff) -> list[ClassifiedChange]:
    """Get only breaking changes from the diff.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        List of breaking changes only
    """
    return [c for c in self.analyze(diff) if c.category == ChangeCategory.BREAKING]

summary

summary(diff)

Generate a human-readable summary of classified changes.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff from SchemaInfo.compare()

required

Returns:

Type Description
str

Formatted summary string

Source code in type_bridge/migration/breaking.py
def summary(self, diff: SchemaDiff) -> str:
    """Generate a human-readable summary of classified changes.

    Args:
        diff: SchemaDiff from SchemaInfo.compare()

    Returns:
        Formatted summary string
    """
    classified = self.analyze(diff)

    if not classified:
        return "No schema changes detected."

    lines = ["Schema Change Analysis", "=" * 50]

    # Group by category
    breaking = [c for c in classified if c.category == ChangeCategory.BREAKING]
    warnings = [c for c in classified if c.category == ChangeCategory.WARNING]
    safe = [c for c in classified if c.category == ChangeCategory.SAFE]

    if breaking:
        lines.append(f"\n[BREAKING] ({len(breaking)} changes)")
        for change in breaking:
            lines.append(f"  - {change.description}")
            lines.append(f"    Recommendation: {change.recommendation}")

    if warnings:
        lines.append(f"\n[WARNING] ({len(warnings)} changes)")
        for change in warnings:
            lines.append(f"  - {change.description}")
            lines.append(f"    Recommendation: {change.recommendation}")

    if safe:
        lines.append(f"\n[SAFE] ({len(safe)} changes)")
        for change in safe:
            lines.append(f"  - {change.description}")

    return "\n".join(lines)

ChangeCategory

Bases: Enum

Classification of schema changes by severity.

SAFE class-attribute instance-attribute

SAFE = 'safe'

Backwards compatible change - no data loss or errors.

WARNING class-attribute instance-attribute

WARNING = 'warning'

May cause issues - review required.

BREAKING class-attribute instance-attribute

BREAKING = 'breaking'

Will cause data loss or errors - requires migration plan.

ClassifiedChange dataclass

ClassifiedChange(description, category, recommendation)

A schema change with its classification and recommendation.

AttributeFlagChange dataclass

AttributeFlagChange(name, old_flags, new_flags)

Represents a change in attribute flags (e.g., cardinality change).

EntityChanges dataclass

EntityChanges(added_attributes=list(), removed_attributes=list(), modified_attributes=list())

Represents changes to an entity type.

has_changes

has_changes()

Check if there are any changes.

Source code in type_bridge/migration/diff.py
def has_changes(self) -> bool:
    """Check if there are any changes."""
    return bool(self.added_attributes or self.removed_attributes or self.modified_attributes)

RelationChanges dataclass

RelationChanges(added_roles=list(), removed_roles=list(), modified_role_players=list(), modified_role_cardinality=list(), added_attributes=list(), removed_attributes=list(), modified_attributes=list())

Represents changes to a relation type.

Tracks: - Role additions/removals - Role player type changes (which entities can play each role) - Role cardinality changes - Attribute additions/removals/modifications

has_changes

has_changes()

Check if there are any changes.

Source code in type_bridge/migration/diff.py
def has_changes(self) -> bool:
    """Check if there are any changes."""
    return bool(
        self.added_roles
        or self.removed_roles
        or self.modified_role_players
        or self.modified_role_cardinality
        or self.added_attributes
        or self.removed_attributes
        or self.modified_attributes
    )

RoleCardinalityChange dataclass

RoleCardinalityChange(role_name, old_cardinality, new_cardinality)

Represents a change in role cardinality constraints.

Tracks cardinality (min, max) changes on roles. None values indicate unbounded.

RolePlayerChange dataclass

RolePlayerChange(role_name, added_player_types=list(), removed_player_types=list())

Represents a change in role player types.

Tracks when entity types are added to or removed from a role's allowed players.

Example

If a role changes from Role[Person] to Role[Person, Company]: - added_player_types = ["company"] - removed_player_types = []

has_changes

has_changes()

Check if there are any player type changes.

Source code in type_bridge/migration/diff.py
def has_changes(self) -> bool:
    """Check if there are any player type changes."""
    return bool(self.added_player_types or self.removed_player_types)

SchemaDiff dataclass

SchemaDiff(added_entities=set(), removed_entities=set(), added_relations=set(), removed_relations=set(), added_attributes=set(), removed_attributes=set(), modified_entities=dict(), modified_relations=dict())

Container for schema comparison results.

Represents the differences between two schemas for migration planning.

has_changes

has_changes()

Check if there are any schema differences.

Returns:

Type Description
bool

True if any changes exist, False otherwise

Source code in type_bridge/migration/diff.py
def has_changes(self) -> bool:
    """Check if there are any schema differences.

    Returns:
        True if any changes exist, False otherwise
    """
    return bool(
        self.added_entities
        or self.removed_entities
        or self.added_relations
        or self.removed_relations
        or self.added_attributes
        or self.removed_attributes
        or self.modified_entities
        or self.modified_relations
    )

summary

summary()

Generate a human-readable summary of changes.

Returns:

Type Description
str

Formatted summary string

Source code in type_bridge/migration/diff.py
def summary(self) -> str:
    """Generate a human-readable summary of changes.

    Returns:
        Formatted summary string
    """
    lines = []
    lines.append("Schema Comparison Summary")
    lines.append("=" * 50)

    if not self.has_changes():
        lines.append("No schema changes detected.")
        return "\n".join(lines)

    if self.added_entities:
        lines.append(f"\nAdded Entities ({len(self.added_entities)}):")
        for entity in sorted(self.added_entities, key=lambda e: e.__name__):
            lines.append(f"  + {entity.__name__}")

    if self.removed_entities:
        lines.append(f"\nRemoved Entities ({len(self.removed_entities)}):")
        for entity in sorted(self.removed_entities, key=lambda e: e.__name__):
            lines.append(f"  - {entity.__name__}")

    if self.added_relations:
        lines.append(f"\nAdded Relations ({len(self.added_relations)}):")
        for relation in sorted(self.added_relations, key=lambda r: r.__name__):
            lines.append(f"  + {relation.__name__}")

    if self.removed_relations:
        lines.append(f"\nRemoved Relations ({len(self.removed_relations)}):")
        for relation in sorted(self.removed_relations, key=lambda r: r.__name__):
            lines.append(f"  - {relation.__name__}")

    if self.added_attributes:
        lines.append(f"\nAdded Attributes ({len(self.added_attributes)}):")
        for attr in sorted(self.added_attributes, key=lambda a: a.get_attribute_name()):
            lines.append(f"  + {attr.get_attribute_name()}")

    if self.removed_attributes:
        lines.append(f"\nRemoved Attributes ({len(self.removed_attributes)}):")
        for attr in sorted(self.removed_attributes, key=lambda a: a.get_attribute_name()):
            lines.append(f"  - {attr.get_attribute_name()}")

    if self.modified_entities:
        lines.append(f"\nModified Entities ({len(self.modified_entities)}):")
        for entity, changes in self.modified_entities.items():
            lines.append(f"  ~ {entity.__name__}")
            if changes.added_attributes:
                lines.append(f"    added_attributes: {changes.added_attributes}")
            if changes.removed_attributes:
                lines.append(f"    removed_attributes: {changes.removed_attributes}")
            if changes.modified_attributes:
                lines.append("    modified_attributes:")
                for attr_change in changes.modified_attributes:
                    lines.append(f"      - {attr_change.name}:")
                    lines.append(f"          old: {attr_change.old_flags}")
                    lines.append(f"          new: {attr_change.new_flags}")

    if self.modified_relations:
        lines.append(f"\nModified Relations ({len(self.modified_relations)}):")
        for relation, rel_changes in self.modified_relations.items():
            relation_changes: RelationChanges = rel_changes
            lines.append(f"  ~ {relation.__name__}")
            if relation_changes.added_roles:
                lines.append(f"    added_roles: {relation_changes.added_roles}")
            if relation_changes.removed_roles:
                lines.append(f"    removed_roles: {relation_changes.removed_roles}")
            if relation_changes.modified_role_players:
                lines.append("    modified_role_players:")
                for rpc in relation_changes.modified_role_players:
                    lines.append(f"      - {rpc.role_name}:")
                    if rpc.added_player_types:
                        lines.append(f"          added: {rpc.added_player_types}")
                    if rpc.removed_player_types:
                        lines.append(f"          removed: {rpc.removed_player_types}")
            if relation_changes.modified_role_cardinality:
                lines.append("    modified_role_cardinality:")
                for rcc in relation_changes.modified_role_cardinality:
                    lines.append(f"      - {rcc.role_name}:")
                    lines.append(f"          old: {rcc.old_cardinality}")
                    lines.append(f"          new: {rcc.new_cardinality}")
            if relation_changes.added_attributes:
                lines.append(f"    added_attributes: {relation_changes.added_attributes}")
            if relation_changes.removed_attributes:
                lines.append(f"    removed_attributes: {relation_changes.removed_attributes}")
            if relation_changes.modified_attributes:
                lines.append("    modified_attributes:")
                for attr_change in relation_changes.modified_attributes:
                    lines.append(f"      - {attr_change.name}:")
                    lines.append(f"          old: {attr_change.old_flags}")
                    lines.append(f"          new: {attr_change.new_flags}")

    return "\n".join(lines)

SchemaConflictError

SchemaConflictError(diff, message=None)

Bases: Exception

Raised when there are conflicting schema changes during sync.

This exception is raised when attempting to sync a schema that has breaking changes (removed or modified types/attributes) compared to the existing database schema.

Initialize SchemaConflictError.

Parameters:

Name Type Description Default
diff SchemaDiff

SchemaDiff containing the conflicting changes

required
message str | None

Optional custom error message

None
Source code in type_bridge/migration/exceptions.py
def __init__(self, diff: SchemaDiff, message: str | None = None):
    """Initialize SchemaConflictError.

    Args:
        diff: SchemaDiff containing the conflicting changes
        message: Optional custom error message
    """
    self.diff = diff

    if message is None:
        message = self._build_default_message()

    super().__init__(message)

has_breaking_changes

has_breaking_changes()

Check if the diff contains breaking changes.

Breaking changes include removed or modified types/attributes.

Returns:

Type Description
bool

True if there are breaking changes

Source code in type_bridge/migration/exceptions.py
def has_breaking_changes(self) -> bool:
    """Check if the diff contains breaking changes.

    Breaking changes include removed or modified types/attributes.

    Returns:
        True if there are breaking changes
    """
    return bool(
        self.diff.removed_entities
        or self.diff.removed_relations
        or self.diff.removed_attributes
        or self.diff.modified_entities
        or self.diff.modified_relations
    )

SchemaValidationError

Bases: Exception

Raised when schema validation fails during schema generation.

This exception is raised when the Python model definitions violate TypeDB constraints or best practices.

MigrationError

Bases: Exception

Error during migration execution.

MigrationExecutor

MigrationExecutor(db, migrations_dir, dry_run=False)

Executes migrations against a TypeDB database.

Handles: - Applying pending migrations - Rolling back applied migrations - Previewing migration TypeQL - Listing migration status

Example

executor = MigrationExecutor(db, Path("migrations"))

Apply all pending migrations

results = executor.migrate()

Migrate to specific version

results = executor.migrate(target="0002_add_company")

Show migration status

status = executor.showmigrations() for name, is_applied in status: print(f"[{'X' if is_applied else ' '}] {name}")

Preview TypeQL

typeql = executor.sqlmigrate("0002_add_company") print(typeql)

Initialize executor.

Parameters:

Name Type Description Default
db Database

Database connection

required
migrations_dir Path

Directory containing migration files

required
dry_run bool

If True, preview operations without executing

False
Source code in type_bridge/migration/executor.py
def __init__(
    self,
    db: Database,
    migrations_dir: Path,
    dry_run: bool = False,
):
    """Initialize executor.

    Args:
        db: Database connection
        migrations_dir: Directory containing migration files
        dry_run: If True, preview operations without executing
    """
    self.db = db
    self.migrations_dir = migrations_dir
    self.dry_run = dry_run
    self.loader = MigrationLoader(migrations_dir)
    self.state_manager = MigrationStateManager(db)

migrate

migrate(target=None)

Apply pending migrations.

Parameters:

Name Type Description Default
target str | None

Optional target migration name (e.g., "0002_add_company") If None, apply all pending migrations. If specified, migrate to that exact state (may rollback).

None

Returns:

Type Description
list[MigrationResult]

List of migration results

Raises:

Type Description
MigrationError

If migration fails

Source code in type_bridge/migration/executor.py
def migrate(self, target: str | None = None) -> list[MigrationResult]:
    """Apply pending migrations.

    Args:
        target: Optional target migration name (e.g., "0002_add_company")
               If None, apply all pending migrations.
               If specified, migrate to that exact state (may rollback).

    Returns:
        List of migration results

    Raises:
        MigrationError: If migration fails
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()
    plan = self._create_plan(state, all_migrations, target)

    if plan.is_empty():
        logger.info("No migrations to apply")
        return []

    results: list[MigrationResult] = []

    # Rollback if needed (migrating backwards)
    for loaded in plan.to_rollback:
        result = self._rollback_one(loaded)
        results.append(result)
        if not result.success:
            raise MigrationError(f"Rollback failed: {result.error}")

    # Apply forward migrations
    for loaded in plan.to_apply:
        result = self._apply_one(loaded)
        results.append(result)
        if not result.success:
            raise MigrationError(f"Migration failed: {result.error}")

    return results

showmigrations

showmigrations()

List all migrations with their applied status.

Returns:

Type Description
list[tuple[str, bool]]

List of (migration_name, is_applied) tuples

Source code in type_bridge/migration/executor.py
def showmigrations(self) -> list[tuple[str, bool]]:
    """List all migrations with their applied status.

    Returns:
        List of (migration_name, is_applied) tuples
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()

    result: list[tuple[str, bool]] = []
    for loaded in all_migrations:
        is_applied = state.is_applied(loaded.migration.app_label, loaded.migration.name)
        result.append((loaded.migration.name, is_applied))

    return result

sqlmigrate

sqlmigrate(migration_name, reverse=False)

Preview TypeQL for a migration without executing.

Parameters:

Name Type Description Default
migration_name str

Name of the migration

required
reverse bool

If True, show rollback TypeQL

False

Returns:

Type Description
str

TypeQL string that would be executed

Raises:

Type Description
MigrationError

If migration not found or not reversible

Source code in type_bridge/migration/executor.py
def sqlmigrate(self, migration_name: str, reverse: bool = False) -> str:
    """Preview TypeQL for a migration without executing.

    Args:
        migration_name: Name of the migration
        reverse: If True, show rollback TypeQL

    Returns:
        TypeQL string that would be executed

    Raises:
        MigrationError: If migration not found or not reversible
    """
    loaded = self.loader.get_by_name(migration_name)
    if loaded is None:
        raise MigrationError(f"Migration not found: {migration_name}")

    if reverse:
        typeql = self._generate_rollback_typeql(loaded.migration)
        if typeql is None:
            raise MigrationError(f"Migration {migration_name} is not reversible")
        return typeql
    else:
        return self._generate_apply_typeql(loaded.migration)

plan

plan(target=None)

Get the migration plan without executing.

Parameters:

Name Type Description Default
target str | None

Optional target migration name

None

Returns:

Type Description
MigrationPlan

MigrationPlan showing what would be applied/rolled back

Source code in type_bridge/migration/executor.py
def plan(self, target: str | None = None) -> MigrationPlan:
    """Get the migration plan without executing.

    Args:
        target: Optional target migration name

    Returns:
        MigrationPlan showing what would be applied/rolled back
    """
    state = self.state_manager.load_state()
    all_migrations = self.loader.discover()
    return self._create_plan(state, all_migrations, target)

MigrationPlan dataclass

MigrationPlan(to_apply, to_rollback)

Plan for migration execution.

Attributes:

Name Type Description
to_apply list[LoadedMigration]

Migrations to apply (forward)

to_rollback list[LoadedMigration]

Migrations to rollback (reverse)

is_empty

is_empty()

Check if plan has no operations.

Source code in type_bridge/migration/executor.py
def is_empty(self) -> bool:
    """Check if plan has no operations."""
    return not self.to_apply and not self.to_rollback

MigrationResult dataclass

MigrationResult(name, action, success, error=None)

Result of a migration operation.

Attributes:

Name Type Description
name str

Migration name

action str

"applied" or "rolled_back"

success bool

Whether the operation succeeded

error str | None

Error message if failed

MigrationGenerator

MigrationGenerator(db, migrations_dir)

Generates migration files from model changes.

Compares current models against the last migration state and generates appropriate operations for the detected changes.

Example

generator = MigrationGenerator(db, Path("migrations"))

Generate migration from models

path = generator.generate([Person, Company, Employment], name="initial")

Creates: migrations/0001_initial.py

Generate empty migration for manual editing

path = generator.generate([], name="custom_changes", empty=True)

Initialize generator.

Parameters:

Name Type Description Default
db Database

Database connection

required
migrations_dir Path

Directory to write migration files

required
Source code in type_bridge/migration/generator.py
def __init__(self, db: Database, migrations_dir: Path):
    """Initialize generator.

    Args:
        db: Database connection
        migrations_dir: Directory to write migration files
    """
    self.db = db
    self.migrations_dir = migrations_dir
    self.loader = MigrationLoader(migrations_dir)

generate

generate(models, name='auto', empty=False)

Generate a migration file.

Parameters:

Name Type Description Default
models list[type[Entity | Relation]]

Model classes to check for changes

required
name str

Migration name suffix (e.g., "initial", "add_company")

'auto'
empty bool

Create empty migration for manual editing

False

Returns:

Type Description
Path | None

Path to created file, or None if no changes detected

Source code in type_bridge/migration/generator.py
def generate(
    self,
    models: list[type[Entity | Relation]],
    name: str = "auto",
    empty: bool = False,
) -> Path | None:
    """Generate a migration file.

    Args:
        models: Model classes to check for changes
        name: Migration name suffix (e.g., "initial", "add_company")
        empty: Create empty migration for manual editing

    Returns:
        Path to created file, or None if no changes detected
    """
    # Get current state
    existing = self.loader.discover()

    # Determine next migration number
    next_num = self.loader.get_next_number()

    # Determine dependencies
    dependencies: list[tuple[str, str]] = []
    if existing:
        last = existing[-1]
        dependencies.append((last.migration.app_label, last.migration.name))

    if empty:
        operations_code = "    operations: ClassVar[list[Operation]] = []"
        models_code = ""
        imports_code = self._generate_empty_imports()
        description = "empty migration"
    else:
        # Detect changes - now always returns operations
        operations, _ = self._detect_changes(models, existing)

        if not operations:
            logger.info("No changes detected")
            return None

        # Always use operations-based migration
        operations_code = self._render_operations(operations)
        models_code = ""
        imports_code = self._generate_operations_imports(operations)
        description = self._describe_operations(operations)

    # Generate filename
    migration_name = f"{next_num:04d}_{name}"
    filename = f"{migration_name}.py"
    filepath = self.migrations_dir / filename

    # Ensure directory exists
    self.migrations_dir.mkdir(parents=True, exist_ok=True)

    # Generate content
    content = self._render_migration(
        class_name=self._to_class_name(name),
        dependencies=dependencies,
        operations_code=operations_code,
        models_code=models_code,
        imports_code=imports_code,
        description=description,
    )

    filepath.write_text(content)
    logger.info(f"Created migration: {filepath}")

    return filepath

SchemaInfo

SchemaInfo()

Container for organized schema information.

Initialize SchemaInfo with empty collections.

Source code in type_bridge/migration/info.py
def __init__(self):
    """Initialize SchemaInfo with empty collections."""
    self.entities: list[type[Entity]] = []
    self.relations: list[type[Relation]] = []
    self.attribute_classes: set[type[Attribute]] = set()

get_entity_by_name

get_entity_by_name(name)

Get entity by type name.

Parameters:

Name Type Description Default
name str

Entity type name

required

Returns:

Type Description
type[Entity] | None

Entity class or None if not found

Source code in type_bridge/migration/info.py
def get_entity_by_name(self, name: str) -> type[Entity] | None:
    """Get entity by type name.

    Args:
        name: Entity type name

    Returns:
        Entity class or None if not found
    """
    for entity in self.entities:
        if entity.get_type_name() == name:
            return entity
    return None

get_relation_by_name

get_relation_by_name(name)

Get relation by type name.

Parameters:

Name Type Description Default
name str

Relation type name

required

Returns:

Type Description
type[Relation] | None

Relation class or None if not found

Source code in type_bridge/migration/info.py
def get_relation_by_name(self, name: str) -> type[Relation] | None:
    """Get relation by type name.

    Args:
        name: Relation type name

    Returns:
        Relation class or None if not found
    """
    for relation in self.relations:
        if relation.get_type_name() == name:
            return relation
    return None

validate

validate()

Validate schema definitions for TypeDB constraints.

Raises:

Type Description
SchemaValidationError

If schema violates TypeDB constraints

Source code in type_bridge/migration/info.py
def validate(self) -> None:
    """Validate schema definitions for TypeDB constraints.

    Raises:
        SchemaValidationError: If schema violates TypeDB constraints
    """
    # Validate entities
    for entity_model in self.entities:
        self._validate_no_duplicate_attribute_types(entity_model, entity_model.get_type_name())

    # Validate relations
    for relation_model in self.relations:
        self._validate_no_duplicate_attribute_types(
            relation_model, relation_model.get_type_name()
        )

to_typeql

to_typeql()

Generate TypeQL schema definition from collected schema information.

Base classes (with base=True) are skipped as they don't appear in TypeDB schema.

Validates the schema before generation.

Returns:

Type Description
str

TypeQL schema definition string

Raises:

Type Description
SchemaValidationError

If schema validation fails

Source code in type_bridge/migration/info.py
def to_typeql(self) -> str:
    """Generate TypeQL schema definition from collected schema information.

    Base classes (with base=True) are skipped as they don't appear in TypeDB schema.

    Validates the schema before generation.

    Returns:
        TypeQL schema definition string

    Raises:
        SchemaValidationError: If schema validation fails
    """
    # Validate schema before generation
    self.validate()

    lines = []

    # Define attributes first
    lines.append("define")
    lines.append("")

    # Sort attributes by name for consistent output
    sorted_attrs = sorted(self.attribute_classes, key=lambda x: x.get_attribute_name())
    for attr_class in sorted_attrs:
        lines.append(attr_class.to_schema_definition())

    lines.append("")

    # Define entities (skip base classes)
    for entity_model in self.entities:
        schema_def = entity_model.to_schema_definition()
        if schema_def is not None:  # Skip base classes
            lines.append(schema_def)
            lines.append("")

    # Define relations (skip base classes)
    for relation_model in self.relations:
        schema_def = relation_model.to_schema_definition()
        if schema_def is not None:  # Skip base classes
            lines.append(schema_def)

            # Add role player definitions
            for role_name, role in relation_model._roles.items():
                for player_type in role.player_types:
                    lines.append(
                        f"{player_type} plays {relation_model.get_type_name()}:{role.role_name};"
                    )
            lines.append("")

    return "\n".join(lines)

compare

compare(other)

Compare this schema with another schema.

Parameters:

Name Type Description Default
other SchemaInfo

Another SchemaInfo to compare against

required

Returns:

Type Description
SchemaDiff

SchemaDiff containing all differences between the schemas

Source code in type_bridge/migration/info.py
def compare(self, other: "SchemaInfo") -> SchemaDiff:
    """Compare this schema with another schema.

    Args:
        other: Another SchemaInfo to compare against

    Returns:
        SchemaDiff containing all differences between the schemas
    """
    diff = SchemaDiff()

    # Compare entities by type name (not Python object identity)
    self_entity_by_name = {e.get_type_name(): e for e in self.entities}
    other_entity_by_name = {e.get_type_name(): e for e in other.entities}

    self_ent_names = set(self_entity_by_name.keys())
    other_ent_names = set(other_entity_by_name.keys())

    diff.added_entities = {other_entity_by_name[n] for n in other_ent_names - self_ent_names}
    diff.removed_entities = {self_entity_by_name[n] for n in self_ent_names - other_ent_names}

    # Compare entities that exist in both (by type name)
    for type_name in self_ent_names & other_ent_names:
        self_ent = self_entity_by_name[type_name]
        other_ent = other_entity_by_name[type_name]
        entity_changes = self._compare_entity(self_ent, other_ent)
        if entity_changes:
            diff.modified_entities[other_ent] = entity_changes

    # Compare relations by type name (not Python object identity)
    self_relation_by_name = {r.get_type_name(): r for r in self.relations}
    other_relation_by_name = {r.get_type_name(): r for r in other.relations}

    self_rel_names = set(self_relation_by_name.keys())
    other_rel_names = set(other_relation_by_name.keys())

    diff.added_relations = {other_relation_by_name[n] for n in other_rel_names - self_rel_names}
    diff.removed_relations = {
        self_relation_by_name[n] for n in self_rel_names - other_rel_names
    }

    # Compare relations that exist in both (by type name)
    for type_name in self_rel_names & other_rel_names:
        self_rel = self_relation_by_name[type_name]
        other_rel = other_relation_by_name[type_name]
        relation_changes = self._compare_relation(self_rel, other_rel)
        if relation_changes:
            diff.modified_relations[other_rel] = relation_changes

    # Compare attributes
    diff.added_attributes = other.attribute_classes - self.attribute_classes
    diff.removed_attributes = self.attribute_classes - other.attribute_classes

    return diff

IntrospectedAttribute dataclass

IntrospectedAttribute(name, value_type)

An attribute type from the database schema.

IntrospectedEntity dataclass

IntrospectedEntity(name, supertype=None)

An entity type from the database schema.

IntrospectedOwnership dataclass

IntrospectedOwnership(owner_name, attribute_name, annotations=list())

An ownership relationship between a type and an attribute.

IntrospectedRelation dataclass

IntrospectedRelation(name, roles=dict())

A relation type from the database schema.

IntrospectedRole dataclass

IntrospectedRole(name, player_types=list())

A role in a relation.

IntrospectedSchema dataclass

IntrospectedSchema(entities=dict(), relations=dict(), attributes=dict(), ownerships=list())

Complete introspected schema from TypeDB database.

This is a database-centric view of the schema that can be compared against Python model definitions.

is_empty

is_empty()

Check if the schema is empty (no custom types).

Source code in type_bridge/migration/introspection.py
def is_empty(self) -> bool:
    """Check if the schema is empty (no custom types)."""
    # Filter out built-in types
    custom_entities = {k: v for k, v in self.entities.items() if k not in ("entity",)}
    custom_relations = {k: v for k, v in self.relations.items() if k not in ("relation",)}
    custom_attrs = {k: v for k, v in self.attributes.items() if k not in ("attribute",)}

    return not (custom_entities or custom_relations or custom_attrs)

get_entity_names

get_entity_names()

Get names of all custom entity types.

Source code in type_bridge/migration/introspection.py
def get_entity_names(self) -> set[str]:
    """Get names of all custom entity types."""
    return {k for k in self.entities.keys() if k != "entity"}

get_relation_names

get_relation_names()

Get names of all custom relation types.

Source code in type_bridge/migration/introspection.py
def get_relation_names(self) -> set[str]:
    """Get names of all custom relation types."""
    return {k for k in self.relations.keys() if k != "relation"}

get_attribute_names

get_attribute_names()

Get names of all custom attribute types.

Source code in type_bridge/migration/introspection.py
def get_attribute_names(self) -> set[str]:
    """Get names of all custom attribute types."""
    return {k for k in self.attributes.keys() if k != "attribute"}

get_ownerships_for

get_ownerships_for(owner_name)

Get all ownerships for a specific owner type.

Source code in type_bridge/migration/introspection.py
def get_ownerships_for(self, owner_name: str) -> list[IntrospectedOwnership]:
    """Get all ownerships for a specific owner type."""
    return [o for o in self.ownerships if o.owner_name == owner_name]

SchemaIntrospector

SchemaIntrospector(db)

Introspects TypeDB database schema.

Queries the database to discover all types, attributes, ownerships, and relations defined in the schema.

Example

introspector = SchemaIntrospector(db) schema = introspector.introspect()

print(f"Found {len(schema.entities)} entities") print(f"Found {len(schema.relations)} relations") print(f"Found {len(schema.attributes)} attributes")

Initialize introspector.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/introspection.py
def __init__(self, db: Database):
    """Initialize introspector.

    Args:
        db: Database connection
    """
    self.db = db

introspect_for_models

introspect_for_models(models)

Introspect database schema for specific model types.

This is the TypeDB 3.x compatible approach that checks each model type individually instead of enumerating all types.

Parameters:

Name Type Description Default
models list[type[Entity] | type[Relation]]

List of model classes to check

required

Returns:

Type Description
IntrospectedSchema

IntrospectedSchema with info about existing types

Source code in type_bridge/migration/introspection.py
def introspect_for_models(
    self, models: list[type[Entity] | type[Relation]]
) -> IntrospectedSchema:
    """Introspect database schema for specific model types.

    This is the TypeDB 3.x compatible approach that checks each
    model type individually instead of enumerating all types.

    Args:
        models: List of model classes to check

    Returns:
        IntrospectedSchema with info about existing types
    """
    from type_bridge.models import Entity, Relation

    schema = IntrospectedSchema()

    if not self.db.database_exists():
        logger.debug("Database does not exist, returning empty schema")
        return schema

    logger.info(f"Introspecting database schema for {len(models)} model types")

    # Collect unique attribute types from all models
    attr_types: set[type[Attribute]] = set()
    for model in models:
        if hasattr(model, "get_owned_attributes"):
            for attr_info in model.get_owned_attributes().values():
                attr_types.add(attr_info.typ)

    # Check each attribute type
    for attr_type in attr_types:
        attr_name = attr_type.get_attribute_name()
        if type_exists(self.db, attr_name):
            schema.attributes[attr_name] = IntrospectedAttribute(
                name=attr_name,
                value_type=getattr(attr_type, "_value_type", "string"),
            )
            logger.debug(f"Found existing attribute: {attr_name}")

    # Check each model type
    for model in models:
        type_name = model.get_type_name()

        if issubclass(model, Entity) and model is not Entity:
            if type_exists(self.db, type_name):
                schema.entities[type_name] = IntrospectedEntity(name=type_name)
                logger.debug(f"Found existing entity: {type_name}")

                # Check ownerships (pass model for fallback)
                self._introspect_ownerships_for_type(schema, type_name, model)

        elif issubclass(model, Relation) and model is not Relation:
            if type_exists(self.db, type_name):
                schema.relations[type_name] = IntrospectedRelation(name=type_name)
                logger.debug(f"Found existing relation: {type_name}")

                # Check roles and role players
                self._introspect_roles_for_relation(schema, type_name, model)

    logger.info(
        f"Introspected: {len(schema.entities)} entities, "
        f"{len(schema.relations)} relations, "
        f"{len(schema.attributes)} attributes"
    )

    return schema

introspect

introspect()

Query TypeDB schema and return structured info.

Returns:

Type Description
IntrospectedSchema

IntrospectedSchema with all discovered types

Source code in type_bridge/migration/introspection.py
def introspect(self) -> IntrospectedSchema:
    """Query TypeDB schema and return structured info.

    Returns:
        IntrospectedSchema with all discovered types
    """
    schema = IntrospectedSchema()

    if not self.db.database_exists():
        logger.debug("Database does not exist, returning empty schema")
        return schema

    logger.info("Introspecting database schema")

    # Query all schema information
    self._introspect_entities(schema)
    self._introspect_relations(schema)
    self._introspect_attributes(schema)
    self._introspect_ownerships(schema)
    self._introspect_role_players(schema)

    logger.info(
        f"Introspected: {len(schema.entities)} entities, "
        f"{len(schema.relations)} relations, "
        f"{len(schema.attributes)} attributes"
    )

    return schema

LoadedMigration dataclass

LoadedMigration(migration, path, checksum)

A migration loaded from a file.

Attributes:

Name Type Description
migration Migration

The Migration instance

path Path

Path to the migration file

checksum str

SHA256 hash of file content (first 16 chars)

MigrationLoader

MigrationLoader(migrations_dir)

Loads migration files from a directory.

Migration files must follow the naming pattern: NNNN_*.py where NNNN is a 4-digit number (e.g., 0001_initial.py, 0002_add_company.py)

Example

loader = MigrationLoader(Path("migrations")) migrations = loader.discover()

for loaded in migrations: print(f"{loaded.migration.name}: {loaded.checksum}")

Initialize loader.

Parameters:

Name Type Description Default
migrations_dir Path

Directory containing migration files

required
Source code in type_bridge/migration/loader.py
def __init__(self, migrations_dir: Path):
    """Initialize loader.

    Args:
        migrations_dir: Directory containing migration files
    """
    self.migrations_dir = migrations_dir

discover

discover()

Discover all migration files in order.

Returns:

Type Description
list[LoadedMigration]

List of loaded migrations, sorted by filename

Source code in type_bridge/migration/loader.py
def discover(self) -> list[LoadedMigration]:
    """Discover all migration files in order.

    Returns:
        List of loaded migrations, sorted by filename
    """
    if not self.migrations_dir.exists():
        logger.debug(f"Migrations directory does not exist: {self.migrations_dir}")
        return []

    files = sorted(self.migrations_dir.glob(self.MIGRATION_PATTERN))
    migrations: list[LoadedMigration] = []

    for path in files:
        try:
            loaded = self._load_migration_file(path)
            if loaded:
                migrations.append(loaded)
        except Exception as e:
            logger.error(f"Failed to load migration {path}: {e}")
            raise MigrationLoadError(f"Failed to load migration {path}: {e}") from e

    logger.debug(f"Discovered {len(migrations)} migration(s) in {self.migrations_dir}")
    return migrations

get_by_name

get_by_name(name)

Get a specific migration by name.

Parameters:

Name Type Description Default
name str

Migration name (e.g., "0001_initial")

required

Returns:

Type Description
LoadedMigration | None

LoadedMigration or None if not found

Source code in type_bridge/migration/loader.py
def get_by_name(self, name: str) -> LoadedMigration | None:
    """Get a specific migration by name.

    Args:
        name: Migration name (e.g., "0001_initial")

    Returns:
        LoadedMigration or None if not found
    """
    for loaded in self.discover():
        if loaded.migration.name == name:
            return loaded
    return None

get_by_number

get_by_number(number)

Get a specific migration by number.

Parameters:

Name Type Description Default
number int

Migration number (e.g., 1 for 0001_initial)

required

Returns:

Type Description
LoadedMigration | None

LoadedMigration or None if not found

Source code in type_bridge/migration/loader.py
def get_by_number(self, number: int) -> LoadedMigration | None:
    """Get a specific migration by number.

    Args:
        number: Migration number (e.g., 1 for 0001_initial)

    Returns:
        LoadedMigration or None if not found
    """
    prefix = f"{number:04d}_"
    for loaded in self.discover():
        if loaded.migration.name.startswith(prefix):
            return loaded
    return None

get_next_number

get_next_number()

Get the next available migration number.

Returns:

Type Description
int

Next migration number (1 if no migrations exist)

Source code in type_bridge/migration/loader.py
def get_next_number(self) -> int:
    """Get the next available migration number.

    Returns:
        Next migration number (1 if no migrations exist)
    """
    migrations = self.discover()
    if not migrations:
        return 1

    # Extract numbers from existing migrations
    numbers = []
    for loaded in migrations:
        try:
            num = int(loaded.migration.name[:4])
            numbers.append(num)
        except (ValueError, IndexError):
            pass

    return max(numbers) + 1 if numbers else 1

validate_dependencies

validate_dependencies()

Validate that all migration dependencies are satisfied.

Returns:

Type Description
list[str]

List of error messages (empty if valid)

Source code in type_bridge/migration/loader.py
def validate_dependencies(self) -> list[str]:
    """Validate that all migration dependencies are satisfied.

    Returns:
        List of error messages (empty if valid)
    """
    migrations = self.discover()
    errors: list[str] = []

    # Build set of available migrations
    available = {(m.migration.app_label, m.migration.name) for m in migrations}

    for loaded in migrations:
        for dep_app, dep_name in loaded.migration.dependencies:
            if (dep_app, dep_name) not in available:
                errors.append(
                    f"Migration {loaded.migration.name} depends on "
                    f"{dep_app}.{dep_name} which does not exist"
                )

    return errors

MigrationLoadError

Bases: Exception

Error loading a migration file.

ModelRegistry

Registry for tracking Entity/Relation models.

Models can be registered manually or auto-discovered from Python modules. The registry is used by the migration generator to determine which models should be tracked for schema changes.

Example - Manual registration

from type_bridge.migration import ModelRegistry from myapp.models import Person, Company

ModelRegistry.register(Person, Company)

Example - Auto-discovery

from type_bridge.migration import ModelRegistry

Discover all Entity/Relation classes in module

models = ModelRegistry.discover("myapp.models")

Example - In models.py (recommended pattern): from type_bridge import Entity, String, Flag, Key, TypeFlags from type_bridge.migration import ModelRegistry

class Name(String):
    pass

class Person(Entity):
    flags = TypeFlags(name="person")
    name: Name = Flag(Key)

# Register at module load time
ModelRegistry.register(Person)

register classmethod

register(*models)

Register models for migration tracking.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Entity or Relation classes to register

()
Source code in type_bridge/migration/registry.py
@classmethod
def register(cls, *models: type[Entity | Relation]) -> None:
    """Register models for migration tracking.

    Args:
        models: Entity or Relation classes to register
    """
    from type_bridge.models import Entity, Relation

    for model in models:
        if not isinstance(model, type):
            logger.warning(f"Skipping non-class: {model}")
            continue

        if not issubclass(model, (Entity, Relation)):
            logger.warning(f"Skipping {model.__name__}: not an Entity or Relation subclass")
            continue

        if model in (Entity, Relation):
            continue

        if model not in cls._models:
            cls._models.add(model)
            logger.debug(f"Registered model: {model.__name__}")

unregister classmethod

unregister(*models)

Unregister models from migration tracking.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Entity or Relation classes to unregister

()
Source code in type_bridge/migration/registry.py
@classmethod
def unregister(cls, *models: type[Entity | Relation]) -> None:
    """Unregister models from migration tracking.

    Args:
        models: Entity or Relation classes to unregister
    """
    for model in models:
        cls._models.discard(model)
        logger.debug(f"Unregistered model: {model.__name__}")

clear classmethod

clear()

Clear all registered models.

Source code in type_bridge/migration/registry.py
@classmethod
def clear(cls) -> None:
    """Clear all registered models."""
    cls._models.clear()
    logger.debug("Cleared all registered models")

get_all classmethod

get_all()

Get all registered models.

Returns:

Type Description
list[type[Entity | Relation]]

List of registered Entity/Relation classes

Source code in type_bridge/migration/registry.py
@classmethod
def get_all(cls) -> list[type[Entity | Relation]]:
    """Get all registered models.

    Returns:
        List of registered Entity/Relation classes
    """
    return list(cls._models)

is_registered classmethod

is_registered(model)

Check if a model is registered.

Parameters:

Name Type Description Default
model type

Model class to check

required

Returns:

Type Description
bool

True if model is registered

Source code in type_bridge/migration/registry.py
@classmethod
def is_registered(cls, model: type) -> bool:
    """Check if a model is registered.

    Args:
        model: Model class to check

    Returns:
        True if model is registered
    """
    return model in cls._models

discover classmethod

discover(module_path, register=True)

Auto-discover Entity/Relation classes from a module.

Imports the module and finds all Entity/Relation subclasses defined in it.

Parameters:

Name Type Description Default
module_path str

Python module path (e.g., "myapp.models")

required
register bool

If True, also register discovered models

True

Returns:

Type Description
list[type[Entity | Relation]]

List of discovered Entity/Relation classes

Raises:

Type Description
ImportError

If module cannot be imported

Source code in type_bridge/migration/registry.py
@classmethod
def discover(cls, module_path: str, register: bool = True) -> list[type[Entity | Relation]]:
    """Auto-discover Entity/Relation classes from a module.

    Imports the module and finds all Entity/Relation subclasses defined in it.

    Args:
        module_path: Python module path (e.g., "myapp.models")
        register: If True, also register discovered models

    Returns:
        List of discovered Entity/Relation classes

    Raises:
        ImportError: If module cannot be imported
    """
    from type_bridge.models import Entity, Relation

    logger.info(f"Discovering models from: {module_path}")

    module = importlib.import_module(module_path)
    discovered: list[type[Entity | Relation]] = []

    for name in dir(module):
        # Skip private/magic attributes
        if name.startswith("_"):
            continue

        obj = getattr(module, name)

        # Must be a class
        if not isinstance(obj, type):
            continue

        # Must be defined in this module (not imported)
        if obj.__module__ != module_path:
            continue

        # Must be Entity or Relation subclass (but not the base classes)
        if issubclass(obj, (Entity, Relation)) and obj not in (Entity, Relation):
            discovered.append(obj)
            logger.debug(f"Discovered model: {obj.__name__}")

            if register:
                cls.register(obj)

    logger.info(f"Discovered {len(discovered)} models from {module_path}")
    return discovered

discover_recursive classmethod

discover_recursive(package_path, register=True)

Recursively discover models from a package.

Imports all modules in the package and discovers Entity/Relation classes.

Parameters:

Name Type Description Default
package_path str

Python package path (e.g., "myapp")

required
register bool

If True, also register discovered models

True

Returns:

Type Description
list[type[Entity | Relation]]

List of discovered Entity/Relation classes

Raises:

Type Description
ImportError

If package cannot be imported

Source code in type_bridge/migration/registry.py
@classmethod
def discover_recursive(
    cls, package_path: str, register: bool = True
) -> list[type[Entity | Relation]]:
    """Recursively discover models from a package.

    Imports all modules in the package and discovers Entity/Relation classes.

    Args:
        package_path: Python package path (e.g., "myapp")
        register: If True, also register discovered models

    Returns:
        List of discovered Entity/Relation classes

    Raises:
        ImportError: If package cannot be imported
    """
    import pkgutil

    logger.info(f"Recursively discovering models from: {package_path}")

    package = importlib.import_module(package_path)
    discovered: list[type[Entity | Relation]] = []

    # First discover in the package itself
    discovered.extend(cls.discover(package_path, register=register))

    # Then discover in submodules
    if hasattr(package, "__path__"):
        for importer, modname, ispkg in pkgutil.walk_packages(
            package.__path__, prefix=f"{package_path}."
        ):
            try:
                submodule_models = cls.discover(modname, register=register)
                discovered.extend(submodule_models)
            except ImportError as e:
                logger.warning(f"Could not import {modname}: {e}")
                continue

    logger.info(f"Recursively discovered {len(discovered)} models from {package_path}")
    return discovered

SchemaManager

SchemaManager(db)

Manager for database schema operations.

Initialize schema manager.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/schema_manager.py
def __init__(self, db: Database):
    """Initialize schema manager.

    Args:
        db: Database connection
    """
    self.db = db
    self.registered_models = []

register

register(*models)

Register model classes for schema management.

Parameters:

Name Type Description Default
models type[Entity | Relation]

Model classes to register

()
Source code in type_bridge/migration/schema_manager.py
def register(self, *models: type[Entity | Relation]) -> None:
    """Register model classes for schema management.

    Args:
        models: Model classes to register
    """
    for model in models:
        if model not in self.registered_models:
            logger.debug(f"Registering model: {model.__name__}")
            self.registered_models.append(model)
        else:
            logger.debug(f"Model already registered: {model.__name__}")

collect_schema_info

collect_schema_info()

Collect schema information from registered models.

Returns:

Type Description
SchemaInfo

SchemaInfo with entities, relations, and attributes

Source code in type_bridge/migration/schema_manager.py
def collect_schema_info(self) -> SchemaInfo:
    """Collect schema information from registered models.

    Returns:
        SchemaInfo with entities, relations, and attributes
    """
    logger.debug(f"Collecting schema info from {len(self.registered_models)} registered models")
    schema_info = SchemaInfo()

    for model in self.registered_models:
        if issubclass(model, Entity) and model is not Entity:
            logger.debug(f"Adding entity to schema: {model.__name__}")
            schema_info.entities.append(model)
        elif issubclass(model, Relation) and model is not Relation:
            logger.debug(f"Adding relation to schema: {model.__name__}")
            schema_info.relations.append(model)

        # Collect all attribute classes owned by this model
        owned_attrs = model.get_owned_attributes()
        for field_name, attr_info in owned_attrs.items():
            logger.debug(
                f"Adding attribute class: {attr_info.typ.__name__} (owned by {model.__name__})"
            )
            schema_info.attribute_classes.add(attr_info.typ)

    logger.info(
        f"Schema info collected: {len(schema_info.entities)} entities, "
        f"{len(schema_info.relations)} relations, {len(schema_info.attribute_classes)} attributes"
    )
    return schema_info

generate_schema

generate_schema()

Generate complete TypeQL schema definition.

Returns:

Type Description
str

TypeQL schema definition string

Source code in type_bridge/migration/schema_manager.py
def generate_schema(self) -> str:
    """Generate complete TypeQL schema definition.

    Returns:
        TypeQL schema definition string
    """
    logger.debug("Generating TypeQL schema definition")
    # Collect schema information and generate TypeQL
    schema_info = self.collect_schema_info()
    typeql = schema_info.to_typeql()
    logger.debug(f"Generated TypeQL schema ({len(typeql)} chars)")
    logger.debug(f"Schema:\n{typeql}")
    return typeql

has_existing_schema

has_existing_schema()

Check if database has existing schema defined.

Returns:

Type Description
bool

True if database exists and has custom schema beyond built-in types

Source code in type_bridge/migration/schema_manager.py
def has_existing_schema(self) -> bool:
    """Check if database has existing schema defined.

    Returns:
        True if database exists and has custom schema beyond built-in types
    """
    logger.debug("Checking for existing schema in database")
    if not self.db.database_exists():
        logger.debug("Database does not exist, no existing schema")
        return False

    # Check if any of the registered types already exist in the schema
    # This is the most reliable way in TypeDB 3.x
    for model in self.registered_models:
        if issubclass(model, Entity) and model is not Entity:
            type_name = model.get_type_name()
            if type_exists(self.db, type_name):
                logger.debug(f"Found existing entity type: {type_name}")
                return True
        elif issubclass(model, Relation) and model is not Relation:
            type_name = model.get_type_name()
            if type_exists(self.db, type_name):
                logger.debug(f"Found existing relation type: {type_name}")
                return True

    logger.debug("No existing schema found for registered models")
    return False

introspect_current_schema_info

introspect_current_schema_info()

Introspect current database schema and build SchemaInfo.

Note: This is a best-effort attempt. It cannot perfectly reconstruct Python class hierarchies from TypeDB schema.

Returns:

Type Description
SchemaInfo | None

SchemaInfo with current schema, or None if database doesn't exist

Source code in type_bridge/migration/schema_manager.py
def introspect_current_schema_info(self) -> SchemaInfo | None:
    """Introspect current database schema and build SchemaInfo.

    Note: This is a best-effort attempt. It cannot perfectly reconstruct
    Python class hierarchies from TypeDB schema.

    Returns:
        SchemaInfo with current schema, or None if database doesn't exist
    """
    if not self.db.database_exists():
        return None

    # For now, we return None and rely on has_existing_schema()
    # Full reconstruction would require complex TypeDB schema introspection
    return None

verify_compatibility

verify_compatibility(old_schema_info)

Verify that new schema is compatible with old schema.

Checks for breaking changes (removed or modified types/attributes) and raises SchemaConflictError if found.

Parameters:

Name Type Description Default
old_schema_info SchemaInfo

The previous schema to compare against

required

Raises:

Type Description
SchemaConflictError

If breaking changes are detected

Source code in type_bridge/migration/schema_manager.py
def verify_compatibility(self, old_schema_info: SchemaInfo) -> None:
    """Verify that new schema is compatible with old schema.

    Checks for breaking changes (removed or modified types/attributes)
    and raises SchemaConflictError if found.

    Args:
        old_schema_info: The previous schema to compare against

    Raises:
        SchemaConflictError: If breaking changes are detected
    """
    logger.debug("Verifying schema compatibility")
    new_schema_info = self.collect_schema_info()
    diff = old_schema_info.compare(new_schema_info)

    # Check for breaking changes
    has_breaking_changes = bool(
        diff.removed_entities
        or diff.removed_relations
        or diff.removed_attributes
        or diff.modified_entities
        or diff.modified_relations
    )

    if has_breaking_changes:
        logger.warning(f"Breaking schema changes detected: {diff}")
        raise SchemaConflictError(diff)

    logger.debug("Schema compatibility verified - no breaking changes")

sync_schema

sync_schema(force=False, skip_if_exists=False)

Synchronize database schema with registered models.

Automatically checks for existing schema in the database and raises SchemaConflictError if schema already exists and might conflict.

Parameters:

Name Type Description Default
force bool

If True, recreate database from scratch, ignoring conflicts

False
skip_if_exists bool

If True, skip conflict checks when types already exist. Use this for idempotent deployments where you want to ensure the schema is in place without recreating the database. TypeDB 3.x's define statement is idempotent for identical definitions.

False

Raises:

Type Description
SchemaConflictError

If database has existing schema and force=False and skip_if_exists=False

Source code in type_bridge/migration/schema_manager.py
def sync_schema(self, force: bool = False, skip_if_exists: bool = False) -> None:
    """Synchronize database schema with registered models.

    Automatically checks for existing schema in the database and raises
    SchemaConflictError if schema already exists and might conflict.

    Args:
        force: If True, recreate database from scratch, ignoring conflicts
        skip_if_exists: If True, skip conflict checks when types already exist.
                       Use this for idempotent deployments where you want to ensure
                       the schema is in place without recreating the database.
                       TypeDB 3.x's define statement is idempotent for identical
                       definitions.

    Raises:
        SchemaConflictError: If database has existing schema and force=False
                            and skip_if_exists=False
    """
    logger.info(f"Syncing schema (force={force}, skip_if_exists={skip_if_exists})")
    # Check for existing schema before making changes
    if not force and not skip_if_exists and self.has_existing_schema():
        logger.debug("Existing schema detected, checking for conflicts")
        # In TypeDB 3.x, schema introspection is limited without instances
        # For safety, we treat any attempt to redefine existing types as a potential conflict
        existing_types = []
        for model in self.registered_models:
            if issubclass(model, Entity) and model is not Entity:
                type_name = model.get_type_name()
                if type_exists(self.db, type_name):
                    existing_types.append(f"entity '{type_name}'")
            elif issubclass(model, Relation) and model is not Relation:
                type_name = model.get_type_name()
                if type_exists(self.db, type_name):
                    existing_types.append(f"relation '{type_name}'")

        if existing_types:
            from type_bridge.migration.diff import SchemaDiff

            types_str = ", ".join(existing_types)
            logger.error(f"Schema conflict: types already exist: {types_str}")
            raise SchemaConflictError(
                SchemaDiff(),
                message=(
                    f"Schema conflict detected! The following types already exist in the database: {types_str}\n"
                    "\n"
                    "Redefining existing types may cause:\n"
                    "  - Data loss if attributes or roles are removed\n"
                    "  - Schema conflicts if types are modified\n"
                    "  - Undefined behavior if ownership changes\n"
                    "\n"
                    "Resolution options:\n"
                    "1. Use sync_schema(force=True) to recreate database from scratch (⚠️  DATA LOSS)\n"
                    "2. Manually drop the existing database first\n"
                    "3. Use MigrationManager for incremental schema changes\n"
                    "4. Ensure no conflicting types exist before syncing\n"
                ),
            )

    if force:
        # Delete and recreate database
        logger.info("Force mode: recreating database from scratch")
        if self.db.database_exists():
            logger.debug("Deleting existing database")
            self.db.delete_database()
        self.db.create_database()

    # Ensure database exists
    if not self.db.database_exists():
        logger.debug("Creating database")
        self.db.create_database()

    # Generate and apply schema
    schema = self.generate_schema()

    logger.debug("Applying schema to database")
    with self.db.transaction("schema") as tx:
        tx.execute(schema)
        tx.commit()
    logger.info("Schema synchronized successfully")

drop_schema

drop_schema()

Drop all schema definitions.

Source code in type_bridge/migration/schema_manager.py
def drop_schema(self) -> None:
    """Drop all schema definitions."""
    logger.info("Dropping schema")
    if self.db.database_exists():
        self.db.delete_database()
        logger.info("Schema dropped (database deleted)")
    else:
        logger.debug("Database does not exist, nothing to drop")

introspect_schema

introspect_schema()

Introspect current database schema.

Returns:

Type Description
dict[str, list[str]]

Dictionary of schema information

Source code in type_bridge/migration/schema_manager.py
def introspect_schema(self) -> dict[str, list[str]]:
    """Introspect current database schema.

    Returns:
        Dictionary of schema information
    """
    logger.debug("Introspecting database schema")
    # Query to get all types
    query = """
    match
    $x sub thing;
    fetch
    $x: label;
    """

    with self.db.transaction("read") as tx:
        results = tx.execute(query)

    schema_info: dict[str, list[str]] = {"entities": [], "relations": [], "attributes": []}

    for result in results:
        # Parse result to categorize types
        # This is a simplified implementation
        pass

    logger.debug(f"Schema introspection complete: {schema_info}")
    return schema_info

SimpleMigrationManager

SimpleMigrationManager(db)

Manager for schema migrations.

Initialize migration manager.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/simple_migration.py
def __init__(self, db: Database):
    """Initialize migration manager.

    Args:
        db: Database connection
    """
    self.db = db
    self.migrations: list[tuple[str, str]] = []

add_migration

add_migration(name, schema)

Add a migration.

Parameters:

Name Type Description Default
name str

Migration name

required
schema str

TypeQL schema definition

required
Source code in type_bridge/migration/simple_migration.py
def add_migration(self, name: str, schema: str) -> None:
    """Add a migration.

    Args:
        name: Migration name
        schema: TypeQL schema definition
    """
    logger.debug(f"Adding migration: {name} ({len(schema)} chars)")
    self.migrations.append((name, schema))

apply_migrations

apply_migrations()

Apply all pending migrations.

Source code in type_bridge/migration/simple_migration.py
def apply_migrations(self) -> None:
    """Apply all pending migrations."""
    logger.info(f"Applying {len(self.migrations)} migration(s)")
    for name, schema in self.migrations:
        logger.info(f"Applying migration: {name}")
        logger.debug(f"Migration schema:\n{schema}")

        with self.db.transaction("schema") as tx:
            tx.execute(schema)
            tx.commit()

        logger.info(f"Migration {name} applied successfully")
    logger.info("All migrations applied")

create_attribute_migration

create_attribute_migration(attr_name, value_type)

Create a migration to add an attribute.

Parameters:

Name Type Description Default
attr_name str

Attribute name

required
value_type str

Value type

required

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_attribute_migration(self, attr_name: str, value_type: str) -> str:
    """Create a migration to add an attribute.

    Args:
        attr_name: Attribute name
        value_type: Value type

    Returns:
        TypeQL migration
    """
    return f"define\nattribute {attr_name}, value {value_type};"

create_entity_migration

create_entity_migration(entity_name, attributes)

Create a migration to add an entity.

Parameters:

Name Type Description Default
entity_name str

Entity name

required
attributes list[str]

List of attribute names

required

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_entity_migration(self, entity_name: str, attributes: list[str]) -> str:
    """Create a migration to add an entity.

    Args:
        entity_name: Entity name
        attributes: List of attribute names

    Returns:
        TypeQL migration
    """
    lines = ["define", f"entity {entity_name}"]
    for attr in attributes:
        lines.append(f"    owns {attr}")
    lines.append(";")
    return "\n".join(lines)

create_relation_migration

create_relation_migration(relation_name, roles, attributes=None)

Create a migration to add a relation.

Parameters:

Name Type Description Default
relation_name str

Relation name

required
roles list[tuple[str, str]]

List of (role_name, player_type) tuples

required
attributes list[str] | None

Optional list of attribute names

None

Returns:

Type Description
str

TypeQL migration

Source code in type_bridge/migration/simple_migration.py
def create_relation_migration(
    self, relation_name: str, roles: list[tuple[str, str]], attributes: list[str] | None = None
) -> str:
    """Create a migration to add a relation.

    Args:
        relation_name: Relation name
        roles: List of (role_name, player_type) tuples
        attributes: Optional list of attribute names

    Returns:
        TypeQL migration
    """
    lines = ["define", f"relation {relation_name}"]

    seen_roles: set[str] = set()
    for role_name, _ in roles:
        if role_name in seen_roles:
            continue
        seen_roles.add(role_name)
        lines.append(f"    relates {role_name}")

    if attributes:
        for attr in attributes:
            lines.append(f"    owns {attr}")

    lines.append(";")
    lines.append("")

    # Add role player definitions
    for role_name, player_type in roles:
        lines.append(f"{player_type} plays {relation_name}:{role_name};")

    return "\n".join(lines)

MigrationRecord dataclass

MigrationRecord(app_label, name, applied_at, checksum)

Record of an applied migration.

MigrationState dataclass

MigrationState(applied=list(), version='1.0')

Complete state of applied migrations.

is_applied

is_applied(app_label, name)

Check if a migration has been applied.

Parameters:

Name Type Description Default
app_label str

Application label

required
name str

Migration name

required

Returns:

Type Description
bool

True if migration has been applied

Source code in type_bridge/migration/state.py
def is_applied(self, app_label: str, name: str) -> bool:
    """Check if a migration has been applied.

    Args:
        app_label: Application label
        name: Migration name

    Returns:
        True if migration has been applied
    """
    return any(r.app_label == app_label and r.name == name for r in self.applied)

add

add(record)

Add a migration record.

Parameters:

Name Type Description Default
record MigrationRecord

Migration record to add

required
Source code in type_bridge/migration/state.py
def add(self, record: MigrationRecord) -> None:
    """Add a migration record.

    Args:
        record: Migration record to add
    """
    if not self.is_applied(record.app_label, record.name):
        self.applied.append(record)

remove

remove(app_label, name)

Remove a migration record (for rollback).

Parameters:

Name Type Description Default
app_label str

Application label

required
name str

Migration name

required
Source code in type_bridge/migration/state.py
def remove(self, app_label: str, name: str) -> None:
    """Remove a migration record (for rollback).

    Args:
        app_label: Application label
        name: Migration name
    """
    self.applied = [
        r for r in self.applied if not (r.app_label == app_label and r.name == name)
    ]

get_latest

get_latest(app_label)

Get the most recently applied migration for an app.

Parameters:

Name Type Description Default
app_label str

Application label

required

Returns:

Type Description
MigrationRecord | None

Most recent migration record, or None

Source code in type_bridge/migration/state.py
def get_latest(self, app_label: str) -> MigrationRecord | None:
    """Get the most recently applied migration for an app.

    Args:
        app_label: Application label

    Returns:
        Most recent migration record, or None
    """
    app_migrations = [r for r in self.applied if r.app_label == app_label]
    return app_migrations[-1] if app_migrations else None

get_all_for_app

get_all_for_app(app_label)

Get all applied migrations for an app.

Parameters:

Name Type Description Default
app_label str

Application label

required

Returns:

Type Description
list[MigrationRecord]

List of migration records in application order

Source code in type_bridge/migration/state.py
def get_all_for_app(self, app_label: str) -> list[MigrationRecord]:
    """Get all applied migrations for an app.

    Args:
        app_label: Application label

    Returns:
        List of migration records in application order
    """
    return [r for r in self.applied if r.app_label == app_label]

MigrationStateManager

MigrationStateManager(db)

Manages migration state in TypeDB.

State is stored in TypeDB as type_bridge_migration entities.

Example

manager = MigrationStateManager(db) state = manager.load_state()

if not state.is_applied("myapp", "0001_initial"): # Apply migration... manager.record_applied("myapp", "0001_initial", "abc123")

Initialize state manager.

Parameters:

Name Type Description Default
db Database

Database connection

required
Source code in type_bridge/migration/state.py
def __init__(self, db: Database):
    """Initialize state manager.

    Args:
        db: Database connection
    """
    self.db = db
    self._state: MigrationState | None = None
    self._schema_ensured = False

ensure_schema

ensure_schema()

Ensure migration tracking schema exists in TypeDB.

Creates the type_bridge_migration entity type if it doesn't exist.

Source code in type_bridge/migration/state.py
    def ensure_schema(self) -> None:
        """Ensure migration tracking schema exists in TypeDB.

        Creates the type_bridge_migration entity type if it doesn't exist.
        """
        if self._schema_ensured:
            return

        # Check if entity type exists
        check_query = f"""
            match $t type {self.ENTITY_NAME};
            fetch {{ "exists": true }};
        """

        try:
            with self.db.transaction("read") as tx:
                results = list(tx.execute(check_query))
                if results:
                    self._schema_ensured = True
                    return
        except Exception:
            # Type doesn't exist, will create it
            pass

        # Create migration tracking schema
        # Use composite key (app_label:name) since TypeDB 3.x @key requires unique ownership
        schema = f"""define
attribute migration_id, value string;
attribute migration_app_label, value string;
attribute migration_name, value string;
attribute migration_applied_at, value datetime;
attribute migration_checksum, value string;

entity {self.ENTITY_NAME},
    owns migration_id @key,
    owns migration_app_label,
    owns migration_name,
    owns migration_applied_at,
    owns migration_checksum;
"""

        logger.info("Creating migration tracking schema")
        with self.db.transaction("schema") as tx:
            tx.execute(schema)
            tx.commit()

        self._schema_ensured = True

load_state

load_state()

Load migration state from TypeDB.

Returns:

Type Description
MigrationState

Current migration state

Source code in type_bridge/migration/state.py
    def load_state(self) -> MigrationState:
        """Load migration state from TypeDB.

        Returns:
            Current migration state
        """
        self.ensure_schema()

        query = f"""
match
$m isa {self.ENTITY_NAME},
    has migration_app_label $app,
    has migration_name $name,
    has migration_applied_at $applied,
    has migration_checksum $checksum;
fetch {{
    "app": $app,
    "name": $name,
    "applied": $applied,
    "checksum": $checksum
}};
"""

        state = MigrationState()

        try:
            with self.db.transaction("read") as tx:
                results = tx.execute(query)
                for result in results:
                    # Handle TypeDB result format
                    app = self._extract_value(result, "app")
                    name = self._extract_value(result, "name")
                    applied = self._extract_value(result, "applied")
                    checksum = self._extract_value(result, "checksum")

                    if all([app, name, applied, checksum]):
                        state.add(
                            MigrationRecord(
                                app_label=str(app),
                                name=str(name),
                                applied_at=str(applied),
                                checksum=str(checksum),
                            )
                        )
        except Exception as e:
            logger.warning(f"Failed to load migration state from TypeDB: {e}")

        self._state = state
        return state

record_applied

record_applied(app_label, name, checksum)

Record that a migration was applied.

Parameters:

Name Type Description Default
app_label str

Application label

required
name str

Migration name

required
checksum str

Migration content hash

required
Source code in type_bridge/migration/state.py
    def record_applied(self, app_label: str, name: str, checksum: str) -> None:
        """Record that a migration was applied.

        Args:
            app_label: Application label
            name: Migration name
            checksum: Migration content hash
        """
        self.ensure_schema()

        applied_at = datetime.now(UTC)
        applied_at_str = applied_at.strftime("%Y-%m-%dT%H:%M:%S.%f")

        migration_id = f"{app_label}:{name}"
        query = f"""
insert $m isa {self.ENTITY_NAME},
    has migration_id "{migration_id}",
    has migration_app_label "{app_label}",
    has migration_name "{name}",
    has migration_applied_at {applied_at_str},
    has migration_checksum "{checksum}";
"""

        with self.db.transaction("write") as tx:
            tx.execute(query)
            tx.commit()

        logger.info(f"Recorded migration: {app_label}.{name}")

        # Update local state
        if self._state:
            self._state.add(
                MigrationRecord(
                    app_label=app_label,
                    name=name,
                    applied_at=applied_at.isoformat(),
                    checksum=checksum,
                )
            )

record_unapplied

record_unapplied(app_label, name)

Record that a migration was rolled back.

Parameters:

Name Type Description Default
app_label str

Application label

required
name str

Migration name

required
Source code in type_bridge/migration/state.py
    def record_unapplied(self, app_label: str, name: str) -> None:
        """Record that a migration was rolled back.

        Args:
            app_label: Application label
            name: Migration name
        """
        self.ensure_schema()

        query = f"""
match
$m isa {self.ENTITY_NAME},
    has migration_app_label "{app_label}",
    has migration_name "{name}";
delete $m isa {self.ENTITY_NAME};
"""

        with self.db.transaction("write") as tx:
            tx.execute(query)
            tx.commit()

        logger.info(f"Removed migration record: {app_label}.{name}")

        # Update local state
        if self._state:
            self._state.remove(app_label, name)

type_exists

type_exists(db, type_name)

Check if a type exists in the database schema.

Uses a simple query to check if the type name is valid in the schema. If the type doesn't exist, the query will raise an error.

Parameters:

Name Type Description Default
db Database

Database connection

required
type_name str

Name of the type to check (entity, relation, or attribute)

required

Returns:

Type Description
bool

True if type exists in schema, False otherwise

Example

type_exists(db, "person") True type_exists(db, "nonexistent") False

Source code in type_bridge/migration/utils.py
def type_exists(db: "Database", type_name: str) -> bool:
    """Check if a type exists in the database schema.

    Uses a simple query to check if the type name is valid in the schema.
    If the type doesn't exist, the query will raise an error.

    Args:
        db: Database connection
        type_name: Name of the type to check (entity, relation, or attribute)

    Returns:
        True if type exists in schema, False otherwise

    Example:
        >>> type_exists(db, "person")
        True
        >>> type_exists(db, "nonexistent")
        False
    """
    query = f"""
    match $t isa {type_name};
    fetch {{ $t.* }};
    """

    try:
        with db.transaction("read") as tx:
            # If type exists, query succeeds (even with 0 results)
            # If type doesn't exist, query raises an error
            list(tx.execute(query))
            return True
    except Exception:
        return False