Source code for pyseekdb.client.admin_client

"""
Admin client interface and implementation for database management

Also includes ClientProxy for strict separation of Collection vs Database operations
"""

from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from .database import Database

if TYPE_CHECKING:
    from .client_base import (
        BaseClient,
        ConfigurationParam,
        EmbeddingFunctionParam,
    )
    from .collection import Collection

# Delay import to avoid circular import
# We'll import these lazily in the functions that need them
# For now, create a placeholder that we can detect and replace
_PLACEHOLDER = object()  # Unique placeholder object


def _get_not_provided():
    """Get the real _NOT_PROVIDED from client_base"""
    from .client_base import _NOT_PROVIDED

    return _NOT_PROVIDED


# Use placeholder for default parameter - will be replaced in function
_NOT_PROVIDED = _PLACEHOLDER
ConfigurationParam = Any  # Type hint placeholder
EmbeddingFunctionParam = Any  # Type hint placeholder

DEFAULT_TENANT = "test"


[docs] class AdminAPI(ABC): """ Abstract admin API interface for database management. Defines the contract for database operations. """
[docs] @abstractmethod def create_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """ Create database Args: name: database name tenant: tenant name (for OceanBase) """ pass
[docs] @abstractmethod def get_database(self, name: str, tenant: str = DEFAULT_TENANT) -> Database: """ Get database object Args: name: database name tenant: tenant name (for OceanBase) Returns: Database object """ pass
[docs] @abstractmethod def delete_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """ Delete database Args: name: database name tenant: tenant name (for OceanBase) """ pass
[docs] @abstractmethod def list_databases( self, limit: int | None = None, offset: int | None = None, tenant: str = DEFAULT_TENANT, ) -> Sequence[Database]: """ List all databases Args: limit: maximum number of results to return offset: number of results to skip tenant: tenant name (for OceanBase) Returns: Sequence of Database objects """ pass
class _AdminClientProxy(AdminAPI): """ A lightweight facade that delegates all operations to the underlying ServerAPI (BaseClient). The actual logic is in the specific client implementations (Embedded/Server/OceanBase). Note: This is an internal class. Users should use the AdminClient() factory function. """ _server: "BaseClient" def __init__(self, server: "BaseClient") -> None: """ Initialize admin client with a server implementation Args: server: The underlying client that implements the actual logic """ self._server = server def create_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """Proxy to server implementation""" return self._server.create_database(name=name, tenant=tenant) def get_database(self, name: str, tenant: str = DEFAULT_TENANT) -> Database: """Proxy to server implementation""" return self._server.get_database(name=name, tenant=tenant) def delete_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None: """Proxy to server implementation""" return self._server.delete_database(name=name, tenant=tenant) def list_databases( self, limit: int | None = None, offset: int | None = None, tenant: str = DEFAULT_TENANT, ) -> Sequence[Database]: """Proxy to server implementation""" return self._server.list_databases(limit=limit, offset=offset, tenant=tenant) def __repr__(self): return f"<AdminClient server={self._server}>" def __enter__(self): """Context manager support - delegate to server""" self._server.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager support - delegate to server""" return self._server.__exit__(exc_type, exc_val, exc_tb) class _ClientProxy: """ Internal client proxy for collection operations only. Strictly separates collection management from database management. Note: This is an internal class. Users should use the Client() factory function. """ _server: "BaseClient" def __init__(self, server: "BaseClient") -> None: """ Initialize client with a server implementation Args: server: The underlying client that implements the actual logic """ self._server = server def create_collection( self, name: str, configuration: ConfigurationParam = _PLACEHOLDER, embedding_function: EmbeddingFunctionParam = _PLACEHOLDER, **kwargs, ) -> "Collection": """Proxy to server implementation - collection operations only""" # Replace placeholder with real _NOT_PROVIDED if needed real_not_provided = _get_not_provided() if configuration is _PLACEHOLDER: configuration = real_not_provided if embedding_function is _PLACEHOLDER: embedding_function = real_not_provided return self._server.create_collection( name=name, configuration=configuration, embedding_function=embedding_function, **kwargs, ) def get_collection(self, name: str, embedding_function: EmbeddingFunctionParam = _PLACEHOLDER) -> "Collection": """Proxy to server implementation - collection operations only""" # Replace placeholder with real _NOT_PROVIDED if needed real_not_provided = _get_not_provided() if embedding_function is _PLACEHOLDER: embedding_function = real_not_provided return self._server.get_collection(name=name, embedding_function=embedding_function) def delete_collection(self, name: str) -> None: """Proxy to server implementation - collection operations only""" return self._server.delete_collection(name=name) def list_collections(self) -> list["Collection"]: """Proxy to server implementation - collection operations only""" return self._server.list_collections() def has_collection(self, name: str) -> bool: """Proxy to server implementation - collection operations only""" return self._server.has_collection(name=name) def get_or_create_collection( self, name: str, configuration: ConfigurationParam = _PLACEHOLDER, embedding_function: EmbeddingFunctionParam = _PLACEHOLDER, **kwargs, ) -> "Collection": """Proxy to server implementation - collection operations only""" # Replace placeholder with real _NOT_PROVIDED if needed real_not_provided = _get_not_provided() if configuration is _PLACEHOLDER: configuration = real_not_provided if embedding_function is _PLACEHOLDER: embedding_function = real_not_provided return self._server.get_or_create_collection( name=name, configuration=configuration, embedding_function=embedding_function, **kwargs, ) def count_collection(self) -> int: """Proxy to server implementation - collection operations only""" return self._server.count_collection() def __repr__(self): return f"<Client server={self._server}>" def __enter__(self): """Context manager support - delegate to server""" self._server.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager support - delegate to server""" return self._server.__exit__(exc_type, exc_val, exc_tb)