Skip to content

Updates

logger = logging.getLogger(__name__) module-attribute

AssistantRunUpdate dataclass

Bases: Update

GraphStepUpdate dataclass

Bases: AssistantRunUpdate

Full graph state after each successful super-step.

graph_state instance-attribute

__init__(graph_state)

RunFinishedUpdate dataclass

Bases: Update

assistant_run instance-attribute

conv_id instance-attribute

__init__(assistant_run, conv_id)

RunInitializedUpdate dataclass

Bases: AssistantRunUpdate

assistant_run instance-attribute

user_input instance-attribute

__init__(assistant_run, user_input)

RunPartialUpdate dataclass

Bases: Update

assistant_run instance-attribute

__init__(assistant_run)

RunUpdateHandler

Bases: UpdatableProtocol

Wrap the updatable to intercept langgraph updates and convert to something more frontend-friendly.

streaming_messages = {} instance-attribute

updatable = updatable instance-attribute

__call__(updates) async

Intercept and add custom updates here.

  • ChatModelStreamUpdate -> StreamUpdate -- Where message is cumulative
  • GraphValuesUpdate -> GraphStepUpdate -- Graph state after a step completes

__init__(updatable)

StreamUpdate dataclass

Bases: AssistantRunUpdate

Cumulative message of what is currently streaming.

message instance-attribute

__init__(message)

ToolCallStreamUpdate dataclass

Bases: AssistantRunUpdate

Stream update while tool is being called.

tool_name instance-attribute

__init__(tool_name)

convert_langgraph_update(update, streaming_messages)

Convert graph update to stream update.