Skip to content

Repository

logger = logging.getLogger(__name__) module-attribute

AbstractConfigRepository

Bases: RepositoryBase[ConfigSchemaBase, ConfigID], ABC

create(schema, foreign_keys=None) async

get(unique_id, *, include_deleted=False, fully_load=True, expected_schema_class=None, **kwargs) async

get(
    unique_id: ConfigID,
    *,
    include_deleted: bool = ...,
    fully_load: bool = ...,
    expected_schema_class: None = None,
) -> ConfigSchemaBase
get(
    unique_id: ConfigID,
    *,
    include_deleted: bool = ...,
    fully_load: bool = ...,
    expected_schema_class: type[ConfigSchemaType],
) -> ConfigSchemaType

Load a schema from the database by name and type.

PARAMETER DESCRIPTION
unique_id

The unique id of the schema to load

TYPE: ConfigID

include_deleted

Whether to include deleted schemas in the search

TYPE: bool DEFAULT: False

fully_load

Whether to fully load the schema (e.g. load all sub-schemas)

TYPE: bool DEFAULT: True

expected_schema_class

The expected schema class to check against and for type hinting

TYPE: type[ConfigSchemaType] | None DEFAULT: None

kwargs

Additional keyword arguments

DEFAULT: {}

RAISES DESCRIPTION
NotFoundError

If the schema does not exist

DeletedError(NotFoundError)

If schema is found but marked deleted

list_sub_assistant_uids(type_, user_id='public') async

List the unique ids of all assistant schemas of a given type (and user).

list_uids(schema_class, type_, user_id=None) async

List the unique ids of all schemas of a given type (and user).

mark_deleted(unique_id, nested=False) async

Mark a schema as deleted in the database.

This should act as effectively deleting, but without actually removing from the database. Just setting a flag instead. This should prevent most operations from returning the schema.

set(schema) async

Set a schema in the database.

If the schema already exists, it will be updated. Otherwise, it will be created.

update(schema, foreign_keys=None, include_deleted=False, fully_load=False) async

SQLConfigRepository

Bases: SQLRepositoryMixin[ConfigID], AbstractConfigRepository

UID = ConfigID class-attribute instance-attribute

__check_type(schema) staticmethod

get_existing_model_by_meta(session, model_class, meta=None, name=None, type_=None, user_id=None, version=None) async

Find a model in the database that matches the given metadata information.

(either provided as metadata, or the parts).


session: SQLAlchemy session
model_class: The model class to find a model for
meta: The metadata to find a model for
name: The name of the model to find
type_: The type of the model to find
user_id: The user_id of the model to find
version: The version of the model to find (or latest if None)

The model that was found, or None if no model was found