Updates
logger = logging.getLogger(__name__)
module-attribute
AssistantRunUpdate
dataclass
Bases: Update
GraphStepUpdate
dataclass
Bases: AssistantRunUpdate
Full graph state after each successful super-step.
graph_state
instance-attribute
__init__(graph_state)
RunFinishedUpdate
dataclass
Bases: Update
assistant_run
instance-attribute
conv_id
instance-attribute
__init__(assistant_run, conv_id)
RunInitializedUpdate
dataclass
Bases: AssistantRunUpdate
assistant_run
instance-attribute
user_input
instance-attribute
__init__(assistant_run, user_input)
RunPartialUpdate
dataclass
Bases: Update
assistant_run
instance-attribute
__init__(assistant_run)
RunUpdateHandler
Bases: UpdatableProtocol
Wrap the updatable to intercept langgraph updates and convert to something more frontend-friendly.
streaming_messages = {}
instance-attribute
updatable = updatable
instance-attribute
__call__(updates)
async
Intercept and add custom updates here.
- ChatModelStreamUpdate -> StreamUpdate -- Where message is cumulative
- GraphValuesUpdate -> GraphStepUpdate -- Graph state after a step completes
__init__(updatable)
StreamUpdate
dataclass
Bases: AssistantRunUpdate
Cumulative message of what is currently streaming.
message
instance-attribute
__init__(message)
ToolCallStreamUpdate
dataclass
Bases: AssistantRunUpdate
Stream update while tool is being called.
tool_name
instance-attribute
__init__(tool_name)
convert_langgraph_update(update, streaming_messages)
Convert graph update to stream update.