Skip to content

type_bridge.session

session

Session and transaction management for TypeDB.

Database

Database(address='localhost:1729', database='typedb', username=None, password=None, driver=None)

Main database connection and session manager.

Initialize database connection.

Parameters:

Name Type Description Default
address str

TypeDB server address

'localhost:1729'
database str

Database name

'typedb'
username str | None

Optional username for authentication

None
password str | None

Optional password for authentication

None
driver Driver | None

Optional pre-existing Driver instance to use. If provided, the Database will use this driver instead of creating a new one. The caller retains ownership and is responsible for closing it.

None
Source code in type_bridge/session.py
def __init__(
    self,
    address: str = "localhost:1729",
    database: str = "typedb",
    username: str | None = None,
    password: str | None = None,
    driver: Driver | None = None,
):
    """Initialize database connection.

    Args:
        address: TypeDB server address
        database: Database name
        username: Optional username for authentication
        password: Optional password for authentication
        driver: Optional pre-existing Driver instance to use. If provided,
            the Database will use this driver instead of creating a new one.
            The caller retains ownership and is responsible for closing it.
    """
    self.address = address
    self.database_name = database
    self.username = username
    self.password = password
    self._driver: Driver | None = driver
    self._owns_driver: bool = driver is None  # Track ownership

driver property

driver

Get the TypeDB driver, connecting if necessary.

connect

connect()

Connect to TypeDB server.

If a driver was injected via init, this method does nothing (the driver is already connected). Otherwise, creates a new driver.

Source code in type_bridge/session.py
def connect(self) -> None:
    """Connect to TypeDB server.

    If a driver was injected via __init__, this method does nothing
    (the driver is already connected). Otherwise, creates a new driver.
    """
    if self._driver is None:
        logger.debug(f"Connecting to TypeDB at {self.address} (database: {self.database_name})")
        # Create credentials if username/password provided
        credentials = (
            Credentials(self.username, self.password)
            if self.username and self.password
            else None
        )

        # Create driver options
        # Disable TLS for local connections (non-HTTPS addresses)
        is_tls_enabled = self.address.startswith("https://")
        driver_options = DriverOptions(is_tls_enabled=is_tls_enabled)
        logger.debug(f"TLS enabled: {is_tls_enabled}")

        # Connect to TypeDB
        try:
            if credentials:
                logger.debug("Using provided credentials for authentication")
                self._driver = TypeDB.driver(self.address, credentials, driver_options)
            else:
                # For local TypeDB Core without authentication
                logger.debug("Using default credentials for local connection")
                self._driver = TypeDB.driver(
                    self.address, Credentials("admin", "password"), driver_options
                )
            self._owns_driver = True
            logger.info(f"Connected to TypeDB at {self.address}")
        except Exception as e:
            logger.error(f"Failed to connect to TypeDB at {self.address}: {e}")
            raise

close

close()

Close connection to TypeDB server.

If the driver was injected via init, this method only clears the reference without closing the driver (the caller retains ownership). If the driver was created internally, it will be closed.

Source code in type_bridge/session.py
def close(self) -> None:
    """Close connection to TypeDB server.

    If the driver was injected via __init__, this method only clears the
    reference without closing the driver (the caller retains ownership).
    If the driver was created internally, it will be closed.
    """
    if self._driver:
        if self._owns_driver:
            logger.debug(f"Closing connection to TypeDB at {self.address}")
            self._driver.close()
            logger.info(f"Disconnected from TypeDB at {self.address}")
        else:
            logger.debug("Clearing driver reference (external driver, not closing)")
        self._driver = None

__enter__

__enter__()

Context manager entry.

Source code in type_bridge/session.py
def __enter__(self) -> "Database":
    """Context manager entry."""
    self.connect()
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in type_bridge/session.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Context manager exit."""
    del exc_type, exc_val, exc_tb  # unused
    self.close()

__del__

__del__()

Destructor that warns if driver was not properly closed.

Source code in type_bridge/session.py
def __del__(self) -> None:
    """Destructor that warns if driver was not properly closed."""
    import warnings

    if self._driver is not None and self._owns_driver:
        warnings.warn(
            f"Database connection to {self.address} was not closed. "
            "Use 'with Database(...) as db:' or call db.close() explicitly.",
            ResourceWarning,
            stacklevel=2,
        )
        # Attempt to close to prevent resource leak
        try:
            self._driver.close()
        except Exception:
            pass  # Ignore errors during cleanup

create_database

create_database()

Create the database if it doesn't exist.

Source code in type_bridge/session.py
def create_database(self) -> None:
    """Create the database if it doesn't exist."""
    if not self.driver.databases.contains(self.database_name):
        logger.debug(f"Creating database: {self.database_name}")
        self.driver.databases.create(self.database_name)
        logger.info(f"Database created: {self.database_name}")
    else:
        logger.debug(f"Database already exists: {self.database_name}")

delete_database

delete_database()

Delete the database.

Source code in type_bridge/session.py
def delete_database(self) -> None:
    """Delete the database."""
    if self.driver.databases.contains(self.database_name):
        logger.debug(f"Deleting database: {self.database_name}")
        self.driver.databases.get(self.database_name).delete()
        logger.info(f"Database deleted: {self.database_name}")
    else:
        logger.debug(f"Database does not exist, skipping delete: {self.database_name}")

database_exists

database_exists()

Check if database exists.

Source code in type_bridge/session.py
def database_exists(self) -> bool:
    """Check if database exists."""
    exists = self.driver.databases.contains(self.database_name)
    logger.debug(f"Database exists check for '{self.database_name}': {exists}")
    return exists

transaction

transaction(transaction_type: TransactionType) -> TransactionContext
transaction(transaction_type: str = 'read') -> TransactionContext
transaction(transaction_type='read')

Create a transaction context.

Parameters:

Name Type Description Default
transaction_type TransactionType | str

TransactionType or string ("read", "write", "schema")

'read'

Returns:

Type Description
TransactionContext

TransactionContext for use as a context manager

Source code in type_bridge/session.py
def transaction(self, transaction_type: TransactionType | str = "read") -> "TransactionContext":
    """Create a transaction context.

    Args:
        transaction_type: TransactionType or string ("read", "write", "schema")

    Returns:
        TransactionContext for use as a context manager
    """
    tx_type_map: dict[str, TransactionType] = {
        "read": TransactionType.READ,
        "write": TransactionType.WRITE,
        "schema": TransactionType.SCHEMA,
    }

    if isinstance(transaction_type, str):
        tx_type = tx_type_map.get(transaction_type, TransactionType.READ)
    else:
        tx_type = transaction_type

    logger.debug(
        f"Creating {_tx_type_name(tx_type)} transaction for database: {self.database_name}"
    )
    return TransactionContext(self, tx_type)

execute_query

execute_query(query, transaction_type='read')

Execute a query and return results.

Parameters:

Name Type Description Default
query str

TypeQL query string

required
transaction_type str

Type of transaction ("read", "write", or "schema")

'read'

Returns:

Type Description
list[dict[str, Any]]

List of result dictionaries

Source code in type_bridge/session.py
def execute_query(self, query: str, transaction_type: str = "read") -> list[dict[str, Any]]:
    """Execute a query and return results.

    Args:
        query: TypeQL query string
        transaction_type: Type of transaction ("read", "write", or "schema")

    Returns:
        List of result dictionaries
    """
    logger.debug(f"Executing query (type={transaction_type}, {len(query)} chars)")
    logger.debug(f"Query: {query}")
    with self.transaction(transaction_type) as tx:
        results = tx.execute(query)
        if isinstance(transaction_type, str):
            needs_commit = transaction_type in ("write", "schema")
        else:
            needs_commit = transaction_type in (TransactionType.WRITE, TransactionType.SCHEMA)
        if needs_commit:
            tx.commit()
        logger.debug(f"Query returned {len(results)} results")
        return results

get_schema

get_schema()

Get the schema definition for this database.

Source code in type_bridge/session.py
def get_schema(self) -> str:
    """Get the schema definition for this database."""
    logger.debug(f"Fetching schema for database: {self.database_name}")
    db = self.driver.databases.get(self.database_name)
    schema = db.schema()
    logger.debug(f"Schema fetched ({len(schema)} chars)")
    return schema

Transaction

Transaction(tx)

Wrapper around TypeDB transaction.

Initialize transaction wrapper.

Parameters:

Name Type Description Default
tx Transaction

TypeDB transaction

required
Source code in type_bridge/session.py
def __init__(self, tx: TypeDBTransaction):
    """Initialize transaction wrapper.

    Args:
        tx: TypeDB transaction
    """
    self._tx = tx

is_open property

is_open

Check if transaction is open.

execute

execute(query)

Execute a query.

Parameters:

Name Type Description Default
query str

TypeQL query string

required

Returns:

Type Description
list[dict[str, Any]]

List of result dictionaries

Source code in type_bridge/session.py
def execute(self, query: str) -> list[dict[str, Any]]:
    """Execute a query.

    Args:
        query: TypeQL query string

    Returns:
        List of result dictionaries
    """
    logger.debug(f"Transaction.execute: query ({len(query)} chars)")
    logger.debug(f"Query: {query}")
    # Execute query - returns a Promise[QueryAnswer]
    promise = self._tx.query(query)
    answer = promise.resolve()

    # Process based on answer type
    results = []

    # Check if the answer has an iterator (for fetch/get queries)
    if hasattr(answer, "__iter__"):
        for item in answer:
            if hasattr(item, "as_dict"):
                # ConceptRow with as_dict method - extract values from concepts
                raw_dict = dict(item.as_dict())
                results.append(_extract_values_from_dict(raw_dict))
            elif hasattr(item, "as_json"):
                # Document with as_json method
                results.append(item.as_json())
            elif hasattr(item, "column_names") and hasattr(item, "get"):
                # ConceptRow - extract IID and concept info
                result = _extract_concept_row(item)
                results.append(result)
            else:
                # Try to convert to dict
                results.append(
                    dict(item) if hasattr(item, "__iter__") else {"result": str(item)}
                )

    logger.debug(f"Query executed, {len(results)} results returned")
    return results

commit

commit()

Commit the transaction.

Source code in type_bridge/session.py
def commit(self) -> None:
    """Commit the transaction."""
    logger.debug("Committing transaction")
    self._tx.commit()
    logger.info("Transaction committed")

rollback

rollback()

Rollback the transaction.

Source code in type_bridge/session.py
def rollback(self) -> None:
    """Rollback the transaction."""
    logger.debug("Rolling back transaction")
    self._tx.rollback()
    logger.info("Transaction rolled back")

close

close()

Close the transaction if open.

Source code in type_bridge/session.py
def close(self) -> None:
    """Close the transaction if open."""
    if self._tx.is_open():
        logger.debug("Closing transaction")
        self._tx.close()

TransactionContext

TransactionContext(db, tx_type)

Context manager for sharing a TypeDB transaction across operations.

Source code in type_bridge/session.py
def __init__(self, db: Database, tx_type: TransactionType):
    self.db = db
    self.tx_type = tx_type
    self._tx: Transaction | None = None

transaction property

transaction

Underlying transaction wrapper.

database property

database

Database backing this transaction.

execute

execute(query)

Execute a query within the active transaction.

Source code in type_bridge/session.py
def execute(self, query: str) -> list[dict[str, Any]]:
    """Execute a query within the active transaction."""
    return self.transaction.execute(query)

commit

commit()

Commit the active transaction.

Source code in type_bridge/session.py
def commit(self) -> None:
    """Commit the active transaction."""
    self.transaction.commit()

rollback

rollback()

Rollback the active transaction.

Source code in type_bridge/session.py
def rollback(self) -> None:
    """Rollback the active transaction."""
    self.transaction.rollback()

manager

manager(model_cls)

Get a TypeDBManager bound to this transaction.

Source code in type_bridge/session.py
def manager(self, model_cls: Any):
    """Get a TypeDBManager bound to this transaction."""
    from type_bridge.crud import TypeDBManager
    from type_bridge.models import Entity, Relation

    if issubclass(model_cls, (Entity, Relation)):
        return TypeDBManager(self.transaction, model_cls)

    raise TypeError("manager() expects an Entity or Relation subclass")

ConnectionExecutor

ConnectionExecutor(connection)

Delegate that handles query execution across connection types.

This class encapsulates the logic for executing queries against different connection types (Database, Transaction, TransactionContext, or proxy equivalents), providing a unified interface for CRUD operations.

Initialize the executor with a connection.

Parameters:

Name Type Description Default
connection Connection

Database, Transaction, TransactionContext, or proxy equivalent

required
Source code in type_bridge/session.py
def __init__(self, connection: Connection):
    """Initialize the executor with a connection.

    Args:
        connection: Database, Transaction, TransactionContext, or proxy equivalent
    """
    if isinstance(connection, (TransactionContext, ProxyTransactionContext)):
        logger.debug("ConnectionExecutor initialized with TransactionContext")
        self._transaction: Transaction | ProxyTransaction | None = connection.transaction
        self._database: Database | ProxyDatabase | None = None
    elif isinstance(connection, (Transaction, ProxyTransaction)):
        logger.debug("ConnectionExecutor initialized with Transaction")
        self._transaction = connection
        self._database = None
    else:
        logger.debug("ConnectionExecutor initialized with Database")
        self._transaction = None
        self._database = connection

has_transaction property

has_transaction

Check if using an existing transaction.

database property

database

Get database if available (for creating new transactions).

transaction property

transaction

Get transaction if available.

execute

execute(query, tx_type)

Execute query, using existing transaction or creating a new one.

Parameters:

Name Type Description Default
query str

TypeQL query string

required
tx_type TransactionType

Transaction type (used only when creating new transaction)

required

Returns:

Type Description
list[dict[str, Any]]

List of result dictionaries

Source code in type_bridge/session.py
def execute(self, query: str, tx_type: TransactionType) -> list[dict[str, Any]]:
    """Execute query, using existing transaction or creating a new one.

    Args:
        query: TypeQL query string
        tx_type: Transaction type (used only when creating new transaction)

    Returns:
        List of result dictionaries
    """
    if self._transaction:
        logger.debug("ConnectionExecutor: using existing transaction")
        return self._transaction.execute(query)
    assert self._database is not None
    logger.debug(f"ConnectionExecutor: creating new {_tx_type_name(tx_type)} transaction")
    with self._database.transaction(tx_type) as tx:
        return tx.execute(query)