Skip to content

Domain

General domain level logic that might later be split into separate modules/sub-packages.

logger = logging.getLogger(__name__) module-attribute

AbstractStreamHandler

Bases: ABC

Protocol for handling a StreamPart.

handle_stream_part(update) abstractmethod async

Handle a StreamPart update.

EventStreamHandler

Bases: AbstractStreamHandler

Handle "events" events from langgraph updates.

These are equivalent to the .astream_events for a normal graph, including things like "on_chat_model_stream".

updatable = updatable instance-attribute

__init__(updatable)

handle_stream_part(update) async

Handle events from a LangGraph update.

https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events

InvalidGraphInputsError

Bases: BackendError

UpdatesHandler

Bases: AbstractStreamHandler

Handle "updates" events from langgraph updates.

These occur at the end of each node and contain only the return values from that node.

latest_values = {} instance-attribute

__init__()

handle_stream_part(update) async

Handle updates from a LangGraph update.

ValuesHandler

Bases: AbstractStreamHandler

Handle "values" events from langgraph updates.

These are at the end of each super-step and contain the full state_graph values

latest_values = {} instance-attribute

updatable = updatable instance-attribute

__init__(updatable)

handle_stream_part(update) async

Handle values from a LangGraph update.

convert_known_errors(lg_update)

Raise a known error based on the lg_update.

stream_graph_directly(client, updatable, thread_id, graph_type, inputs, config, interrupt_before=None, interrupt_after=None, resuming=False) async

Effectively a wrapper around invoking a graph that roughly matches the langgraph sdk.

2025-03-18 -- Using this while transitioning from using langgraph sdk to running langgraph locally.

stream_lg_graph_with_handling(client, updatable, thread_id, graph_type, inputs, config, interrupt_before=None, interrupt_after=None, resuming=False) async

Run a graph and return the run_id and latest values of the graph_state.

Note: Return as a dict because this could be any graph state