Domain
General domain level logic that might later be split into separate modules/sub-packages.
logger = logging.getLogger(__name__)
module-attribute
AbstractStreamHandler
Bases: ABC
Protocol for handling a StreamPart.
handle_stream_part(update)
abstractmethod
async
Handle a StreamPart update.
EventStreamHandler
Bases: AbstractStreamHandler
Handle "events" events from langgraph updates.
These are equivalent to the .astream_events for a normal graph, including things like "on_chat_model_stream".
updatable = updatable
instance-attribute
__init__(updatable)
handle_stream_part(update)
async
Handle events from a LangGraph update.
https://api.python.langchain.com/en/latest/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events
InvalidGraphInputsError
Bases: BackendError
UpdatesHandler
Bases: AbstractStreamHandler
Handle "updates" events from langgraph updates.
These occur at the end of each node and contain only the return values from that node.
latest_values = {}
instance-attribute
__init__()
handle_stream_part(update)
async
Handle updates from a LangGraph update.
ValuesHandler
Bases: AbstractStreamHandler
Handle "values" events from langgraph updates.
These are at the end of each super-step and contain the full state_graph values
latest_values = {}
instance-attribute
updatable = updatable
instance-attribute
__init__(updatable)
handle_stream_part(update)
async
Handle values from a LangGraph update.
convert_known_errors(lg_update)
Raise a known error based on the lg_update.
stream_graph_directly(client, updatable, thread_id, graph_type, inputs, config, interrupt_before=None, interrupt_after=None, resuming=False)
async
Effectively a wrapper around invoking a graph that roughly matches the langgraph sdk.
2025-03-18 -- Using this while transitioning from using langgraph sdk to running langgraph locally.
stream_lg_graph_with_handling(client, updatable, thread_id, graph_type, inputs, config, interrupt_before=None, interrupt_after=None, resuming=False)
async
Run a graph and return the run_id and latest values of the graph_state.
Note: Return as a dict because this could be any graph state