Observability for LangGraph with Opik

Opik provides a seamless integration with LangGraph, allowing you to easily log and trace your LangGraph-based applications. By using the OpikTracer callback, you can automatically capture detailed information about your LangGraph graph executions during both development and production.

Account Setup

Comet provides a hosted version of the Opik platform, simply create an account and grab your API Key.

You can also run the Opik platform locally, see the installation guide for more information.

Getting Started

Installation

To use the OpikTracer with LangGraph, you’ll need to have both the opik and langgraph packages installed. You can install them using pip:

$pip install opik langgraph langchain

Configuring Opik

Configure the Opik Python SDK for your deployment type. See the Python SDK Configuration guide for detailed instructions on:

  • CLI configuration: opik configure
  • Code configuration: opik.configure()
  • Self-hosted vs Cloud vs Enterprise setup
  • Configuration files and environment variables

Using the OpikTracer

You can use the OpikTracer callback with any LangGraph graph by passing it in as an argument to the stream or invoke functions:

1from typing import List, Annotated
2from pydantic import BaseModel
3from opik.integrations.langchain import OpikTracer
4from langchain_core.messages import HumanMessage
5from langgraph.graph import StateGraph, START, END
6from langgraph.graph.message import add_messages
7
8# create your LangGraph graph
9class State(BaseModel):
10 messages: Annotated[list, add_messages]
11
12def chatbot(state):
13 # Typically your LLM calls would be done here
14 return {"messages": "Hello, how can I help you today?"}
15
16graph = StateGraph(State)
17graph.add_node("chatbot", chatbot)
18graph.add_edge(START, "chatbot")
19graph.add_edge("chatbot", END)
20app = graph.compile()
21
22# Create the OpikTracer
23opik_tracer = OpikTracer(graph=app.get_graph(xray=True))
24
25# Pass the OpikTracer callback to the Graph.stream function
26for s in app.stream({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
27 config={"callbacks": [opik_tracer]}):
28 print(s)
29
30# Pass the OpikTracer callback to the Graph.invoke function
31result = app.invoke({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
32 config={"callbacks": [opik_tracer]})

Once the OpikTracer is configured, you will start to see the traces in the Opik UI:

Practical Example: Classification Workflow

Let’s walk through a real-world example of using LangGraph with Opik for a classification workflow. This example demonstrates how to create a graph with conditional routing and track its execution.

Setting up the Environment

First, let’s set up our environment with the necessary dependencies:

1import opik
2
3# Configure Opik
4opik.configure(use_local=False)

Creating the LangGraph Workflow

We’ll create a LangGraph workflow with 3 nodes that demonstrates conditional routing:

1from langgraph.graph import StateGraph, END
2from typing import TypedDict, Optional
3
4# Define the graph state
5class GraphState(TypedDict):
6 question: Optional[str] = None
7 classification: Optional[str] = None
8 response: Optional[str] = None
9
10# Create the node functions
11def classify(question: str) -> str:
12 return "greeting" if question.startswith("Hello") else "search"
13
14def classify_input_node(state):
15 question = state.get("question", "").strip()
16 classification = classify(question)
17 return {"classification": classification}
18
19def handle_greeting_node(state):
20 return {"response": "Hello! How can I help you today?"}
21
22def handle_search_node(state):
23 question = state.get("question", "").strip()
24 search_result = f"Search result for '{question}'"
25 return {"response": search_result}
26
27# Create the workflow
28workflow = StateGraph(GraphState)
29workflow.add_node("classify_input", classify_input_node)
30workflow.add_node("handle_greeting", handle_greeting_node)
31workflow.add_node("handle_search", handle_search_node)
32
33# Add conditional routing
34def decide_next_node(state):
35 return (
36 "handle_greeting"
37 if state.get("classification") == "greeting"
38 else "handle_search"
39 )
40
41workflow.add_conditional_edges(
42 "classify_input",
43 decide_next_node,
44 {"handle_greeting": "handle_greeting", "handle_search": "handle_search"},
45)
46
47workflow.set_entry_point("classify_input")
48workflow.add_edge("handle_greeting", END)
49workflow.add_edge("handle_search", END)
50
51app = workflow.compile()

Executing with Opik Tracing

Now let’s execute the workflow with Opik tracing enabled:

1from opik.integrations.langchain import OpikTracer
2
3# Create the OpikTracer with graph visualization
4tracer = OpikTracer(graph=app.get_graph(xray=True))
5
6# Execute the workflow
7inputs = {"question": "Hello, how are you?"}
8result = app.invoke(inputs, config={"callbacks": [tracer]})
9print(result)

The graph execution is now logged on the Opik platform and can be viewed in the UI. The trace will show the complete execution path through the graph, including the classification decision and the chosen response path.

Logging threads

When you are running multi-turn conversations using LangGraph persistence, Opik will use Langgraph’s thread_id as Opik thread_id. Here is an example below:

1import sqlite3
2from langgraph.checkpoint.sqlite import SqliteSaver
3from typing import Annotated
4from pydantic import BaseModel
5from opik.integrations.langchain import OpikTracer
6from langchain_core.messages import HumanMessage
7from langgraph.graph import StateGraph, START, END
8from langgraph.graph.message import add_messages
9from langchain.chat_models import init_chat_model
10
11llm = init_chat_model("openai:gpt-4.1")
12
13
14# create your LangGraph graph
15class State(BaseModel):
16 messages: Annotated[list, add_messages]
17
18
19def chatbot(state):
20 # Typically your LLM calls would be done here
21 return {"messages": [llm.invoke(state.messages)]}
22
23
24graph = StateGraph(State)
25graph.add_node("chatbot", chatbot)
26graph.add_edge(START, "chatbot")
27graph.add_edge("chatbot", END)
28
29# Create a new SqliteSaver instance
30# Note: check_same_thread=False is OK as the implementation uses a lock
31# to ensure thread safety.
32conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
33memory = SqliteSaver(conn)
34
35app = graph.compile(checkpointer=memory)
36
37# Create the OpikTracer
38opik_tracer = OpikTracer(graph=app.get_graph(xray=True))
39
40thread_id = "e424a45e-7763-443a-94ae-434b39b67b72"
41config = {"callbacks": [opik_tracer], "configurable": {"thread_id": thread_id}}
42
43# Initialize the state
44state = State(**app.get_state(config).values) or State(messages=[])
45print("STATE", state)
46
47# Add the user message
48state.messages.append(HumanMessage(content="Hello, my name is Bob, how are you doing ?"))
49# state.messages.append(HumanMessage(content="What is my name ?"))
50
51result = app.invoke(state, config=config)
52
53print("Result", result)

Updating logged traces

You can use the OpikTracer.created_traces method to access the trace IDs collected by the OpikTracer callback:

1from opik.integrations.langchain import OpikTracer
2
3opik_tracer = OpikTracer()
4
5# Calling LangGraph stream or invoke functions
6
7traces = opik_tracer.created_traces()
8print([trace.id for trace in traces])

These can then be used with the Opik.log_traces_feedback_scores method to update the logged traces.

Advanced usage

The OpikTracer object has a flush method that can be used to make sure that all traces are logged to the Opik platform before you exit a script. This method will return once all traces have been logged or if the timeout is reach, whichever comes first.

1from opik.integrations.langchain import OpikTracer
2
3opik_tracer = OpikTracer()
4opik_tracer.flush()