Skip to main content
  1. Sign up — Create an account at platform.respan.ai
  2. Create an API key — Generate one on the API keys page
  3. Add credits or a provider key — Add credits on the Credits page or connect your own provider key on the Integrations page
Add the Docs MCP to your AI coding tool to get help building with Respan. No API key needed.
{
  "mcpServers": {
    "respan-docs": {
      "url": "https://docs.respan.ai/mcp"
    }
  }
}

What is OpenAI SDK?

The OpenAI SDK is the official client for OpenAI’s APIs, available for both Python and TypeScript/JavaScript. It supports Chat Completions and the Responses API. Respan can auto-instrument all OpenAI calls for tracing, route them through the Respan gateway for model switching and prompt management, or both.

Setup

1

Install packages

pip install respan-ai respan-instrumentation-openai openai python-dotenv
2

Set environment variables

export RESPAN_API_KEY="YOUR_RESPAN_API_KEY"
export RESPAN_BASE_URL="https://api.respan.ai/api"  # optional, this is the default
The Respan API key authenticates both LLM inference (gateway) and telemetry export (tracing).
3

Initialize and run

import os
from dotenv import load_dotenv

load_dotenv()

from openai import OpenAI
from respan import Respan
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

client = OpenAI(
    api_key=os.getenv("RESPAN_API_KEY"),
    base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
)

response = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "Say hello in three languages."}],
)
print(response.choices[0].message.content)
respan.flush()
4

View your trace

Open the Traces page to see your auto-instrumented LLM spans.
Always call respan.flush() (Python) or await respan.flush() (TypeScript) before your process exits. Without it, pending spans may be lost.

Configuration

ParameterTypeDefaultDescription
api_keystr | NoneNoneFalls back to RESPAN_API_KEY env var.
base_urlstr | NoneNoneFalls back to RESPAN_BASE_URL env var.
instrumentationslist[]Plugin instrumentations to activate (e.g. OpenAIInstrumentor()).
is_auto_instrumentbool | NoneFalseAuto-discover and activate all installed instrumentors via OpenTelemetry entry points.
customer_identifierstr | NoneNoneDefault customer identifier for all spans.
metadatadict | NoneNoneDefault metadata attached to all spans.
environmentstr | NoneNoneEnvironment tag (e.g. "production").

Attributes

Attach customer identifiers, thread IDs, and metadata to spans.

In Respan()

Set defaults at initialization — these apply to all spans.
from respan import Respan
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(
    instrumentations=[OpenAIInstrumentor()],
    customer_identifier="user_123",
    metadata={"service": "chat-api", "version": "1.0.0"},
)

With propagate_attributes

Override per-request using a context scope.
from respan import Respan, workflow, propagate_attributes
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(
    instrumentations=[OpenAIInstrumentor()],
    metadata={"service": "chat-api", "version": "1.0.0"},
)

@workflow(name="handle_request")
def handle_request(user_id: str, question: str):
    with propagate_attributes(
        customer_identifier=user_id,
        thread_identifier="conv_001",
        metadata={"plan": "pro"},  # merged with default metadata
    ):
        response = client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": question}],
        )
        print(response.choices[0].message.content)
AttributeTypeDescription
customer_identifierstrIdentifies the end user in Respan analytics.
thread_identifierstrGroups related messages into a conversation.
metadatadictCustom key-value pairs. Merged with default metadata.

Decorators (optional)

Decorators are not required. All OpenAI calls are auto-traced by the instrumentor. Use @workflow and @task (Python) or withWorkflow and withTask (TypeScript) to add structure when you want to group related calls into a named workflow with nested tasks.
from respan import Respan, workflow, task
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

@task(name="generate_outline")
def outline(topic: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "Create a brief outline."},
            {"role": "user", "content": topic},
        ],
    )
    return response.choices[0].message.content

@workflow(name="content_pipeline")
def pipeline(topic: str):
    plan = outline(topic)
    response = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "Write content from this outline."},
            {"role": "user", "content": plan},
        ],
    )
    print(response.choices[0].message.content)

pipeline("Benefits of API gateways")
respan.flush()

Streaming

Streaming responses are auto-traced like regular completions.
from openai import OpenAI
from respan import Respan
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

client = OpenAI(
    api_key=os.getenv("RESPAN_API_KEY"),
    base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
)

stream = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "Write a haiku about Python."}],
    stream=True,
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)
print()

respan.flush()

Tool calls

Function calling is auto-traced. Wrap the workflow with @workflow and @task decorators for a structured trace tree.
import os
import json
from openai import OpenAI
from respan import Respan, workflow, task
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

client = OpenAI(
    api_key=os.getenv("RESPAN_API_KEY"),
    base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
)

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the weather for a city.",
            "parameters": {
                "type": "object",
                "properties": {"city": {"type": "string"}},
                "required": ["city"],
            },
        },
    }
]

@task(name="get_weather")
def get_weather(city: str) -> str:
    return f"Sunny, 72F in {city}"

@workflow(name="weather_assistant")
def run(question: str):
    messages = [{"role": "user", "content": question}]

    response = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=messages,
        tools=tools,
    )
    message = response.choices[0].message

    if message.tool_calls:
        messages.append(message)
        for tc in message.tool_calls:
            args = json.loads(tc.function.arguments)
            result = get_weather(**args)
            messages.append(
                {"role": "tool", "tool_call_id": tc.id, "content": result}
            )

        final = client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=messages,
            tools=tools,
        )
        print(f"Answer: {final.choices[0].message.content}")

run("What's the weather in Paris?")
respan.flush()

Multi-turn conversations

Multi-turn conversations are auto-traced. Each create() call becomes its own span. Use @workflow to group them into a single trace.
import os
from openai import OpenAI
from respan import Respan, workflow
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

client = OpenAI(
    api_key=os.getenv("RESPAN_API_KEY"),
    base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
)

@workflow(name="conversation")
def chat():
    messages = [
        {"role": "system", "content": "You are a concise cooking assistant."}
    ]
    questions = [
        "What can I make with eggs and cheese?",
        "How long does the omelette take?",
        "Any tips to make it fluffy?",
    ]

    for question in questions:
        messages.append({"role": "user", "content": question})
        response = client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=messages,
        )
        answer = response.choices[0].message.content
        messages.append({"role": "assistant", "content": answer})
        print(f"User: {question}")
        print(f"Bot:  {answer}\n")

chat()
respan.flush()

Structured output

JSON mode with Pydantic models is auto-traced.
import os
from pydantic import BaseModel
from openai import OpenAI
from respan import Respan, workflow
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

client = OpenAI(
    api_key=os.getenv("RESPAN_API_KEY"),
    base_url=os.getenv("RESPAN_BASE_URL", "https://api.respan.ai/api"),
)

class MovieReview(BaseModel):
    title: str
    rating: int
    summary: str
    pros: list[str]
    cons: list[str]

@workflow(name="movie_review")
def review(movie: str) -> MovieReview:
    response = client.beta.chat.completions.parse(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "You are a film critic. Rate movies 1-10."},
            {"role": "user", "content": f"Review: {movie}"},
        ],
        response_format=MovieReview,
    )
    return response.choices[0].message.parsed

result = review("The Matrix")
print(f"{result.title}{result.rating}/10")
print(f"Summary: {result.summary}")
respan.flush()

Batch API

The Batch API lets you submit large batches of requests for async processing at 50% cost. Use respan.log_batch_results() to log each batch result as an individual traced span in Respan. Respan also provides a Batch API endpoint for batch processing with tracking parameters.
The Batch API requires a direct OPENAI_API_KEY. It does not go through the Respan gateway.
import os
import json
import time
from openai import OpenAI
from respan import Respan, workflow, task
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])

# Batch API requires direct OpenAI access
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

@task(name="create_batch_file")
def create_batch_file() -> str:
    tasks = []
    for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"]):
        tasks.append({
            "custom_id": f"topic-{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4.1-nano",
                "messages": [
                    {"role": "system", "content": "Explain in one sentence."},
                    {"role": "user", "content": f"What is {topic}?"},
                ],
            },
        })

    file_path = "/tmp/batch_input.jsonl"
    with open(file_path, "w") as f:
        for t in tasks:
            f.write(json.dumps(t) + "\n")
    return file_path

@task(name="upload_and_submit")
def upload_and_submit(file_path: str) -> str:
    batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch")
    batch = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
    )
    return batch.id

@task(name="poll_batch")
def poll_batch(batch_id: str) -> str:
    while True:
        batch = client.batches.retrieve(batch_id)
        if batch.status == "completed":
            return batch.output_file_id
        elif batch.status in ("failed", "expired", "cancelled"):
            raise RuntimeError(f"Batch {batch.status}")
        time.sleep(5)

@task(name="download_results")
def download_results(output_file_id: str):
    content = client.files.content(output_file_id).content
    results = [json.loads(line) for line in content.decode().strip().split("\n")]

    with open("/tmp/batch_input.jsonl") as f:
        requests = [json.loads(line) for line in f]

    # Log each batch result as an individual traced span
    respan.log_batch_results(requests, results)

    for r in results:
        print(f"{r['custom_id']}: {r['response']['body']['choices'][0]['message']['content']}")

@workflow(name="batch_pipeline")
def run():
    file_path = create_batch_file()
    batch_id = upload_and_submit(file_path)
    output_file_id = poll_batch(batch_id)
    download_results(output_file_id)

run()
respan.flush()

Async batch (cross-process)

For long-running batches where submission and retrieval happen in separate processes, save the trace_id and pass it to log_batch_results() later to link results back to the original trace.
import os
import json
import time
from openai import OpenAI
from respan import Respan, workflow, task, get_client
from respan_instrumentation_openai import OpenAIInstrumentor

respan = Respan(instrumentations=[OpenAIInstrumentor()])
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Phase 1: Submit batch and save trace context
@task(name="create_and_submit")
def create_and_submit(requests: list) -> dict:
    file_path = "/tmp/batch_async.jsonl"
    with open(file_path, "w") as f:
        for r in requests:
            f.write(json.dumps(r) + "\n")

    batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch")
    batch = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
    )

    # Save trace ID — in production, persist this to a database
    rc = get_client()
    trace_id = rc.get_current_trace_id()

    return {"batch_id": batch.id, "trace_id": trace_id, "input_file": file_path}

@workflow(name="batch_submit")
def submit(requests):
    return create_and_submit(requests)

# Phase 2: Retrieve results (separate process/job)
def retrieve_and_log(saved: dict):
    batch_id = saved["batch_id"]

    while True:
        batch = client.batches.retrieve(batch_id)
        if batch.status == "completed":
            break
        elif batch.status in ("failed", "expired", "cancelled"):
            raise RuntimeError(f"Batch {batch.status}")
        time.sleep(5)

    content = client.files.content(batch.output_file_id).content
    results = [json.loads(line) for line in content.decode().strip().split("\n")]

    with open(saved["input_file"]) as f:
        requests = [json.loads(line) for line in f]

    # Log results into the ORIGINAL trace
    respan.log_batch_results(requests, results, trace_id=saved["trace_id"])

# Run
requests = [
    {
        "custom_id": f"topic-{i}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4.1-nano",
            "messages": [
                {"role": "system", "content": "Explain in one sentence."},
                {"role": "user", "content": f"What is {topic}?"},
            ],
        },
    }
    for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"])
]

saved = submit(requests)
respan.flush()

# Later (could be a different process)...
retrieve_and_log(saved)
respan.flush()

Gateway features

The features below require the Gateway or Both setup from Step 3.

Switch models

Change the model parameter to use 250+ models from different providers through the same gateway.
# OpenAI
response = client.chat.completions.create(model="gpt-4o", messages=messages)

# Anthropic
response = client.chat.completions.create(model="claude-sonnet-4-5-20250929", messages=messages)

# Google
response = client.chat.completions.create(model="gemini-2.5-flash", messages=messages)
See the full model list.

Prompt management

Use Respan prompt management to serve prompt templates from the platform. Use schema_version: 2 for all new integrations.

Chat Completions

Prompt messages are the base layer (system/context). Body messages are appended as runtime user turns.
PROMPT_ID = "YOUR_PROMPT_ID"

response = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[
        {"role": "user", "content": "Add dark mode support to the dashboard"},
    ],
    extra_body={
        "prompt": {
            "prompt_id": PROMPT_ID,
            "schema_version": 2,
            "variables": {
                "feature_request": "Add dark mode support",
            },
        }
    },
)
print(response.choices[0].message.content)

Responses API

For the Responses API, pass prompt config under respan_params. The prompt template becomes instructions and body input is preserved.
PROMPT_ID = "YOUR_PROMPT_ID"

response = client.responses.create(
    model="gpt-4.1-nano",
    input=[
        {"role": "user", "content": "Add dark mode support to the dashboard"},
    ],
    extra_body={
        "respan_params": {
            "prompt": {
                "prompt_id": PROMPT_ID,
                "schema_version": 2,
                "variables": {
                    "feature_request": "Add dark mode support",
                },
            }
        }
    },
)
print(response.output_text)

Prompt options

FieldTypeDefaultDescription
prompt_idstrPrompt identifier (required).
schema_versionint1Set 2 for v2 merge semantics (recommended).
versionint | "latest"deployed versionPin a specific version or use "latest" for the most recent draft.
variablesdict{}Key-value pairs for template rendering.
patchdictRuntime overrides for prompt config (v2 only). Cannot include messages or input.

Respan parameters

Pass additional Respan parameters via extra_body for gateway features.
response = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "Hello"}],
    extra_body={
        "customer_identifier": "user_123",
        "fallback_models": ["gpt-3.5-turbo"],
        "metadata": {"session_id": "abc123"},
        "thread_identifier": "conversation_456",
    },
)
See Respan parameters for the full list.