My client
Replacing the langgraph sdk client with my own.
2025-03-16 -- Don't want to use the langgraph client any more as it causes too many headaches with debugging, it's dependency management, logging, etc.
GRAPH_MAPPING = {'basic_agent_graph': basic_agent_graph, 'debug_graph': debug_graph, 'example_graph': example_graph, 'example_interrupt_graph': example_interrupt_graph(), 'example_command_jump_graph': example_command_jump_graph()}
module-attribute
logger = get_logger(__name__)
module-attribute
CheckpointerProviderProtocol
Bases: Protocol
__call__()
LGClientProtocol
Bases: Protocol
runs
property
threads
property
LGRunsProtocol
Bases: Protocol
stream(thread_id, assistant_id, *, input=None, command=None, stream_mode='values', stream_subgraphs=False, metadata=None, config=None, checkpoint=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, feedback_keys=None, on_disconnect=None, on_completion=None, webhook=None, multitask_strategy=None, if_not_exists=None, after_seconds=None)
LGThreadsProtocol
Bases: Protocol
create(*, metadata=None, thread_id=None, if_exists=None)
async
get(thread_id)
async
get_state(thread_id, checkpoint=None, checkpoint_id=None, *, subgraphs=False)
async
MyLgClient
checkpointer_context = checkpointer_func or checkpointer_context
instance-attribute
runs = MyRunsClient(self.checkpointer_context, self.thread_storage)
instance-attribute
thread_storage = thread_storage or ThreadStorage(self.checkpointer_context)
instance-attribute
threads = MyThreadsClient(self.checkpointer_context, self.thread_storage)
instance-attribute
__init__(checkpointer_func=None, thread_storage=None)
MyRunsClient
checkpointer_context = checkpointer_func
instance-attribute
thread_storage = thread_storage
instance-attribute
__init__(checkpointer_func, thread_storage)
stream(thread_id, assistant_id, *, input=None, command=None, stream_mode='values', stream_subgraphs=False, metadata=None, config=None, checkpoint=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, feedback_keys=None, on_disconnect=None, on_completion=None, webhook=None, multitask_strategy=None, if_not_exists=None, after_seconds=None)
Create a run and stream the results.
| PARAMETER | DESCRIPTION |
|---|---|
thread_id
|
the thread ID to assign to the thread. If None will create a stateless run.
TYPE:
|
assistant_id
|
The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.
TYPE:
|
input
|
The input to the graph.
TYPE:
|
command
|
A command to execute. Cannot be combined with input.
TYPE:
|
stream_mode
|
The stream mode(s) to use.
TYPE:
|
stream_subgraphs
|
Whether to stream output from subgraphs.
TYPE:
|
metadata
|
Metadata to assign to the run.
TYPE:
|
config
|
The configuration for the assistant.
TYPE:
|
checkpoint
|
The checkpoint to resume from.
TYPE:
|
interrupt_before
|
Nodes to interrupt immediately before they get executed.
TYPE:
|
interrupt_after
|
Nodes to Nodes to interrupt immediately after they get executed.
TYPE:
|
feedback_keys
|
Feedback keys to assign to run.
TYPE:
|
on_disconnect
|
The disconnect mode to use. Must be one of 'cancel' or 'continue'.
TYPE:
|
on_completion
|
Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.
TYPE:
|
webhook
|
Webhook to call after LangGraph API call is done.
TYPE:
|
multitask_strategy
|
Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.
TYPE:
|
if_not_exists
|
How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).
TYPE:
|
after_seconds
|
The number of seconds to wait before starting the run. Use to schedule future runs.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncIterator[StreamPart]
|
AsyncIterator[StreamPart]: Asynchronous iterator of stream results. |
Example Usage:
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
MyThreadsClient
checkpointer_context = checkpointer_func
instance-attribute
thread_storage = thread_storage
instance-attribute
__init__(checkpointer_func, thread_storage)
create(*, metadata=None, thread_id=None, if_exists=None)
async
get(thread_id, subgraphs=False)
async
get_state(thread_id, checkpoint=None, checkpoint_id=None, *, subgraphs=False)
async
NotInitializedError
Bases: BackendError
Raised when trying to access a graph that has not been initialized.
ThreadInfo
dataclass
graph
instance-attribute
thread
instance-attribute
thread_id
instance-attribute
__init__(thread_id, thread, graph)
ThreadStorage
__init__(checkpoint_provider)
get_graph(thread_id)
get_thread(thread_id)
graph_context(thread_id)
async
set_graph(thread_id, graph)
store(thread_id, thread, graph=None)
checkpointer_context(conn_string=settings.GRAPH_SQLITE_DB_PATH)
async
Return a checkpoint within context that allows it to operate.
Context required e.g. for async connection to database.