Skip to content

My client

Replacing the langgraph sdk client with my own.

2025-03-16 -- Don't want to use the langgraph client any more as it causes too many headaches with debugging, it's dependency management, logging, etc.

GRAPH_MAPPING = {'basic_agent_graph': basic_agent_graph, 'debug_graph': debug_graph, 'example_graph': example_graph, 'example_interrupt_graph': example_interrupt_graph(), 'example_command_jump_graph': example_command_jump_graph()} module-attribute

logger = get_logger(__name__) module-attribute

CheckpointerProviderProtocol

Bases: Protocol

__call__()

LGClientProtocol

Bases: Protocol

runs property

threads property

LGRunsProtocol

Bases: Protocol

stream(thread_id, assistant_id, *, input=None, command=None, stream_mode='values', stream_subgraphs=False, metadata=None, config=None, checkpoint=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, feedback_keys=None, on_disconnect=None, on_completion=None, webhook=None, multitask_strategy=None, if_not_exists=None, after_seconds=None)

LGThreadsProtocol

Bases: Protocol

create(*, metadata=None, thread_id=None, if_exists=None) async

get(thread_id) async

get_state(thread_id, checkpoint=None, checkpoint_id=None, *, subgraphs=False) async

MyLgClient

checkpointer_context = checkpointer_func or checkpointer_context instance-attribute

runs = MyRunsClient(self.checkpointer_context, self.thread_storage) instance-attribute

thread_storage = thread_storage or ThreadStorage(self.checkpointer_context) instance-attribute

threads = MyThreadsClient(self.checkpointer_context, self.thread_storage) instance-attribute

__init__(checkpointer_func=None, thread_storage=None)

MyRunsClient

checkpointer_context = checkpointer_func instance-attribute

thread_storage = thread_storage instance-attribute

__init__(checkpointer_func, thread_storage)

stream(thread_id, assistant_id, *, input=None, command=None, stream_mode='values', stream_subgraphs=False, metadata=None, config=None, checkpoint=None, checkpoint_id=None, interrupt_before=None, interrupt_after=None, feedback_keys=None, on_disconnect=None, on_completion=None, webhook=None, multitask_strategy=None, if_not_exists=None, after_seconds=None)

Create a run and stream the results.

PARAMETER DESCRIPTION
thread_id

the thread ID to assign to the thread. If None will create a stateless run.

TYPE: str | None

assistant_id

The assistant ID or graph name to stream from. If using graph name, will default to first assistant created from that graph.

TYPE: str

input

The input to the graph.

TYPE: dict | None DEFAULT: None

command

A command to execute. Cannot be combined with input.

TYPE: Command | None DEFAULT: None

stream_mode

The stream mode(s) to use.

TYPE: StreamMode | Sequence[StreamMode] DEFAULT: 'values'

stream_subgraphs

Whether to stream output from subgraphs.

TYPE: bool DEFAULT: False

metadata

Metadata to assign to the run.

TYPE: dict | None DEFAULT: None

config

The configuration for the assistant.

TYPE: Config | None DEFAULT: None

checkpoint

The checkpoint to resume from.

TYPE: Checkpoint | None DEFAULT: None

interrupt_before

Nodes to interrupt immediately before they get executed.

TYPE: All | Sequence[str] | None DEFAULT: None

interrupt_after

Nodes to Nodes to interrupt immediately after they get executed.

TYPE: All | Sequence[str] | None DEFAULT: None

feedback_keys

Feedback keys to assign to run.

TYPE: Sequence[str] | None DEFAULT: None

on_disconnect

The disconnect mode to use. Must be one of 'cancel' or 'continue'.

TYPE: DisconnectMode | None DEFAULT: None

on_completion

Whether to delete or keep the thread created for a stateless run. Must be one of 'delete' or 'keep'.

TYPE: OnCompletionBehavior | None DEFAULT: None

webhook

Webhook to call after LangGraph API call is done.

TYPE: str | None DEFAULT: None

multitask_strategy

Multitask strategy to use. Must be one of 'reject', 'interrupt', 'rollback', or 'enqueue'.

TYPE: MultitaskStrategy | None DEFAULT: None

if_not_exists

How to handle missing thread. Defaults to 'reject'. Must be either 'reject' (raise error if missing), or 'create' (create new thread).

TYPE: IfNotExists | None DEFAULT: None

after_seconds

The number of seconds to wait before starting the run. Use to schedule future runs.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
AsyncIterator[StreamPart]

AsyncIterator[StreamPart]: Asynchronous iterator of stream results.

Example Usage:

async for chunk in client.runs.stream(
    thread_id=None,
    assistant_id="agent",
    input={"messages": [{"role": "user", "content": "how are you?"}]},
    stream_mode=["values","debug"],
    metadata={"name":"my_run"},
    config={"configurable": {"model_name": "anthropic"}},
    interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
    interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
    feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
    webhook="https://my.fake.webhook.com",
    multitask_strategy="interrupt"
):
    print(chunk)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})

MyThreadsClient

checkpointer_context = checkpointer_func instance-attribute

thread_storage = thread_storage instance-attribute

__init__(checkpointer_func, thread_storage)

create(*, metadata=None, thread_id=None, if_exists=None) async

get(thread_id, subgraphs=False) async

get_state(thread_id, checkpoint=None, checkpoint_id=None, *, subgraphs=False) async

NotInitializedError

Bases: BackendError

Raised when trying to access a graph that has not been initialized.

ThreadInfo dataclass

graph instance-attribute

thread instance-attribute

thread_id instance-attribute

__init__(thread_id, thread, graph)

ThreadStorage

__init__(checkpoint_provider)

get_graph(thread_id)

get_thread(thread_id)

graph_context(thread_id) async

set_graph(thread_id, graph)

store(thread_id, thread, graph=None)

checkpointer_context(conn_string=settings.GRAPH_SQLITE_DB_PATH) async

Return a checkpoint within context that allows it to operate.

Context required e.g. for async connection to database.