Source code for pyseekdb.client.admin_client

"""
Admin client interface and implementation for database management

Also includes ClientProxy for strict separation of Collection vs Database operations
"""

from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from .database import Database
from .types import _NOT_PROVIDED

if TYPE_CHECKING:
    from .client_base import (
        BaseClient,
        ConfigurationParam,
        EmbeddingFunctionParam,
    )
    from .collection import Collection

SchemaParam = Any  # Type hint placeholder
ConfigurationParam = Any  # Type hint placeholder
EmbeddingFunctionParam = Any  # Type hint placeholder

DEFAULT_TENANT = "test"


[docs] class AdminAPI(ABC): """ Abstract admin API interface for database management. Defines the contract for database operations. """
[docs] @abstractmethod def create_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """ Create database Args: name: database name tenant: tenant name (for OceanBase) """ pass
[docs] @abstractmethod def get_database(self, name: str, tenant: str = DEFAULT_TENANT) -> Database: """ Get database object Args: name: database name tenant: tenant name (for OceanBase) Returns: Database object """ pass
[docs] @abstractmethod def delete_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """ Delete database Args: name: database name tenant: tenant name (for OceanBase) """ pass
[docs] @abstractmethod def list_databases( self, limit: int | None = None, offset: int | None = None, tenant: str = DEFAULT_TENANT, ) -> Sequence[Database]: """ List all databases Args: limit: maximum number of results to return offset: number of results to skip tenant: tenant name (for OceanBase) Returns: Sequence of Database objects """ pass
[docs] @abstractmethod def fork_database(self, source_name: str, destination_name: str, tenant: str = DEFAULT_TENANT) -> Database: """ Fork (duplicate) a database to create a new independent copy. The destination database is logically equivalent to the source database at the fork snapshot moment, containing all user tables and their data. It can be used as an independent database for subsequent read/write operations. Args: source_name: source database name destination_name: destination database name (must not already exist) tenant: tenant name (for OceanBase) Returns: Database object for the newly created destination database Raises: ValueError: If fork is not supported (requires seekdb >= 1.2.0), or if the destination database already exists. """ pass
class _AdminClientProxy(AdminAPI): """ A lightweight facade that delegates all operations to the underlying ServerAPI (BaseClient). The actual logic is in the specific client implementations (Embedded/Server/OceanBase). Note: This is an internal class. Users should use the AdminClient() factory function. """ _server: "BaseClient" def __init__(self, server: "BaseClient") -> None: """ Initialize admin client with a server implementation Args: server: The underlying client that implements the actual logic """ self._server = server def create_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """Proxy to server implementation""" return self._server.create_database(name=name, tenant=tenant) def get_database(self, name: str, tenant: str = DEFAULT_TENANT) -> Database: """Proxy to server implementation""" return self._server.get_database(name=name, tenant=tenant) def delete_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """Proxy to server implementation""" return self._server.delete_database(name=name, tenant=tenant) def list_databases( self, limit: int | None = None, offset: int | None = None, tenant: str = DEFAULT_TENANT, ) -> Sequence[Database]: """Proxy to server implementation""" return self._server.list_databases(limit=limit, offset=offset, tenant=tenant) def fork_database(self, source_name: str, destination_name: str, tenant: str = DEFAULT_TENANT) -> Database: """Proxy to server implementation""" return self._server.fork_database(source_name=source_name, destination_name=destination_name, tenant=tenant) def __repr__(self): return f"<AdminClient server={self._server}>" def __enter__(self): """Context manager support - delegate to server""" self._server.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager support - delegate to server""" return self._server.__exit__(exc_type, exc_val, exc_tb) class _ClientProxy: """ Internal client proxy for collection operations only. Strictly separates collection management from database management. Note: This is an internal class. Users should use the Client() factory function. """ _server: "BaseClient" def __init__(self, server: "BaseClient") -> None: """ Initialize client with a server implementation Args: server: The underlying client that implements the actual logic """ self._server = server def create_collection( self, name: str, schema: SchemaParam = None, configuration: ConfigurationParam = _NOT_PROVIDED, embedding_function: EmbeddingFunctionParam = _NOT_PROVIDED, **kwargs, ) -> "Collection": """Proxy to server implementation - collection operations only""" return self._server.create_collection( name=name, schema=schema, configuration=configuration, embedding_function=embedding_function, **kwargs, ) def get_collection(self, name: str, embedding_function: EmbeddingFunctionParam = _NOT_PROVIDED) -> "Collection": """Proxy to server implementation - collection operations only""" return self._server.get_collection(name=name, embedding_function=embedding_function) def delete_collection(self, name: str) -> None: """Proxy to server implementation - collection operations only""" return self._server.delete_collection(name=name) def list_collections(self) -> list["Collection"]: """Proxy to server implementation - collection operations only""" return self._server.list_collections() def has_collection(self, name: str) -> bool: """Proxy to server implementation - collection operations only""" return self._server.has_collection(name=name) def get_or_create_collection( self, name: str, schema: SchemaParam = None, configuration: ConfigurationParam = _NOT_PROVIDED, embedding_function: EmbeddingFunctionParam = _NOT_PROVIDED, **kwargs, ) -> "Collection": """Proxy to server implementation - collection operations only""" return self._server.get_or_create_collection( name=name, schema=schema, configuration=configuration, embedding_function=embedding_function, **kwargs, ) def count_collection(self) -> int: """Proxy to server implementation - collection operations only""" return self._server.count_collection() def __repr__(self): return f"<Client server={self._server}>" def __enter__(self): """Context manager support - delegate to server""" self._server.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager support - delegate to server""" return self._server.__exit__(exc_type, exc_val, exc_tb)