Repository
logger = logging.getLogger(__name__)
module-attribute
AbstractConfigRepository
Bases: RepositoryBase[ConfigSchemaBase, ConfigID], ABC
create(schema, foreign_keys=None)
async
get(unique_id, *, include_deleted=False, fully_load=True, expected_schema_class=None, **kwargs)
async
get(
unique_id: ConfigID,
*,
include_deleted: bool = ...,
fully_load: bool = ...,
expected_schema_class: None = None,
) -> ConfigSchemaBase
get(
unique_id: ConfigID,
*,
include_deleted: bool = ...,
fully_load: bool = ...,
expected_schema_class: type[ConfigSchemaType],
) -> ConfigSchemaType
Load a schema from the database by name and type.
| PARAMETER | DESCRIPTION |
|---|---|
unique_id
|
The unique id of the schema to load
TYPE:
|
include_deleted
|
Whether to include deleted schemas in the search
TYPE:
|
fully_load
|
Whether to fully load the schema (e.g. load all sub-schemas)
TYPE:
|
expected_schema_class
|
The expected schema class to check against and for type hinting
TYPE:
|
kwargs
|
Additional keyword arguments
DEFAULT:
|
| RAISES | DESCRIPTION |
|---|---|
NotFoundError
|
If the schema does not exist |
DeletedError(NotFoundError)
|
If schema is found but marked deleted |
list_sub_assistant_uids(type_, user_id='public')
async
List the unique ids of all assistant schemas of a given type (and user).
list_uids(schema_class, type_, user_id=None)
async
List the unique ids of all schemas of a given type (and user).
mark_deleted(unique_id, nested=False)
async
Mark a schema as deleted in the database.
This should act as effectively deleting, but without actually removing from the database. Just setting a flag instead. This should prevent most operations from returning the schema.
set(schema)
async
Set a schema in the database.
If the schema already exists, it will be updated. Otherwise, it will be created.
update(schema, foreign_keys=None, include_deleted=False, fully_load=False)
async
SQLConfigRepository
Bases: SQLRepositoryMixin[ConfigID], AbstractConfigRepository
UID = ConfigID
class-attribute
instance-attribute
__check_type(schema)
staticmethod
get_existing_model_by_meta(session, model_class, meta=None, name=None, type_=None, user_id=None, version=None)
async
Find a model in the database that matches the given metadata information.
(either provided as metadata, or the parts).
session: SQLAlchemy session
model_class: The model class to find a model for
meta: The metadata to find a model for
name: The name of the model to find
type_: The type of the model to find
user_id: The user_id of the model to find
version: The version of the model to find (or latest if None)
The model that was found, or None if no model was found