OpenAI Agents SDK

The OpenAI Agents SDK (openai-agents) is a lightweight framework for building multi-agent workflows with tools, handoffs, and guardrails. Respan gives you full observability over every agent run, LLM generation, tool call, and handoff — and gateway routing to 250+ models.

Create an account at platform.respan.ai and grab an API key. For gateway, also add credits or a provider key.

Run npx @respan/cli setup to set up with your coding agent.

Setup

1

Install packages

$pip install respan-ai respan-instrumentation-openai-agents
2

Set environment variables

$export OPENAI_API_KEY="YOUR_OPENAI_API_KEY"
$export RESPAN_API_KEY="YOUR_RESPAN_API_KEY"
3

Initialize and run

1import os
2import asyncio
3from dotenv import load_dotenv
4
5load_dotenv()
6
7from respan import Respan
8from respan_instrumentation_openai_agents import OpenAIAgentsInstrumentor
9from agents import Agent, Runner, trace
10
11respan = Respan(instrumentations=[OpenAIAgentsInstrumentor()])
12
13agent = Agent(
14 name="Assistant",
15 instructions="You only respond in haikus.",
16)
17
18async def main():
19 with trace("Hello world"):
20 result = await Runner.run(agent, "Tell me about recursion.")
21 print(result.final_output)
22 respan.flush()
23
24asyncio.run(main())
4

View your trace

Open the Traces page to see your workflow with agent spans, LLM generations, tool calls, and handoffs.

Configuration

ParameterTypeDefaultDescription
api_keystr | NoneNoneFalls back to RESPAN_API_KEY env var.
base_urlstr | NoneNoneFalls back to RESPAN_BASE_URL env var.
instrumentationslist[]Plugin instrumentations to activate (e.g. OpenAIAgentsInstrumentor()).
customer_identifierstr | NoneNoneDefault customer identifier for all spans.
metadatadict | NoneNoneDefault metadata attached to all spans.
environmentstr | NoneNoneEnvironment tag (e.g. "production").

Attributes

In Respan()

Set defaults at initialization — these apply to all spans.

1from respan import Respan
2from respan_instrumentation_openai_agents import OpenAIAgentsInstrumentor
3
4respan = Respan(
5 instrumentations=[OpenAIAgentsInstrumentor()],
6 customer_identifier="user_123",
7 metadata={"service": "agent-api", "version": "1.0.0"},
8)

With propagate_attributes

Override per-request using a context scope.

1from respan import Respan, propagate_attributes
2from respan_instrumentation_openai_agents import OpenAIAgentsInstrumentor
3from agents import Agent, Runner, trace
4
5respan = Respan(instrumentations=[OpenAIAgentsInstrumentor()])
6
7agent = Agent(name="Assistant", instructions="You are a helpful assistant.")
8
9async def handle_request(user_id: str, message: str):
10 with trace("User request"):
11 with propagate_attributes(
12 customer_identifier=user_id,
13 thread_identifier="conv_abc_123",
14 metadata={"plan": "pro"},
15 ):
16 result = await Runner.run(agent, message)
17 print(result.final_output)
AttributeTypeDescription
customer_identifierstrIdentifies the end user in Respan analytics.
thread_identifierstrGroups related messages into a conversation.
metadatadictCustom key-value pairs. Merged with default metadata.

Decorators (optional)

Decorators are not required. All agent runs, LLM calls, tool calls, and handoffs are auto-traced by the instrumentor. Use @workflow and @task to add structure when you want to group agent runs into named workflows with nested tasks.

1from respan import Respan, workflow, task
2from respan_instrumentation_openai_agents import OpenAIAgentsInstrumentor
3from agents import Agent, Runner, function_tool
4
5respan = Respan(instrumentations=[OpenAIAgentsInstrumentor()])
6
7@function_tool
8def search_docs(query: str) -> str:
9 """Search the documentation."""
10 return f"Results for: {query}"
11
12researcher = Agent(
13 name="Researcher",
14 instructions="You research topics using the search tool.",
15 tools=[search_docs],
16)
17
18writer = Agent(
19 name="Writer",
20 instructions="You write concise summaries.",
21)
22
23@task(name="research")
24async def research(topic: str) -> str:
25 result = await Runner.run(researcher, f"Research: {topic}")
26 return result.final_output
27
28@workflow(name="research_and_write")
29async def pipeline(topic: str):
30 findings = await research(topic)
31 result = await Runner.run(writer, f"Summarize: {findings}")
32 print(result.final_output)
33
34import asyncio
35asyncio.run(pipeline("API gateways"))
36respan.flush()

Examples

Tool calls

Tool calls are automatically captured as spans with inputs, outputs, and timing.

1from agents import Agent, Runner, function_tool, trace
2
3@function_tool
4def get_weather(city: str) -> str:
5 """Get the weather for a city."""
6 return f"The weather in {city} is sunny, 72F"
7
8agent = Agent(
9 name="Weather Agent",
10 instructions="Help users check the weather.",
11 tools=[get_weather],
12)
13
14async def main():
15 with trace("Weather check"):
16 result = await Runner.run(agent, "What's the weather in San Francisco?")
17 print(result.final_output)
18 respan.flush()

Handoffs

Agent-to-agent handoffs are traced with full context.

1from agents import Agent, Runner, trace
2
3billing_agent = Agent(
4 name="Billing Agent",
5 instructions="Handle billing questions.",
6)
7
8support_agent = Agent(
9 name="Support Agent",
10 instructions="Route billing questions to the billing agent.",
11 handoffs=[billing_agent],
12)
13
14async def main():
15 with trace("Support handoff"):
16 result = await Runner.run(support_agent, "I have a billing question")
17 print(result.final_output)

Streaming

Stream agent responses with real-time text deltas.

1from openai.types.responses import ResponseTextDeltaEvent
2from agents import Agent, Runner
3
4agent = Agent(name="Joker", instructions="You tell jokes.")
5
6result = Runner.run_streamed(agent, input="Tell me 3 jokes.")
7async for event in result.stream_events():
8 if event.type == "raw_response_event" and isinstance(
9 event.data, ResponseTextDeltaEvent
10 ):
11 print(event.data.delta, end="", flush=True)