The OpenAI SDK is the official client for OpenAI’s APIs, available for both Python and TypeScript/JavaScript. It supports Chat Completions and the Responses API. Respan can auto-instrument all OpenAI calls for tracing, route them through the Respan gateway for model switching and prompt management, or both.
Decorators are not required. All OpenAI calls are auto-traced by the instrumentor. Use @workflow and @task (Python) or withWorkflow and withTask (TypeScript) to add structure when you want to group related calls into a named workflow with nested tasks.
Copy
from respan import Respan, workflow, taskfrom respan_instrumentation_openai import OpenAIInstrumentorrespan = Respan(instrumentations=[OpenAIInstrumentor()])@task(name="generate_outline")def outline(topic: str) -> str: response = client.chat.completions.create( model="gpt-4.1-nano", messages=[ {"role": "system", "content": "Create a brief outline."}, {"role": "user", "content": topic}, ], ) return response.choices[0].message.content@workflow(name="content_pipeline")def pipeline(topic: str): plan = outline(topic) response = client.chat.completions.create( model="gpt-4.1-nano", messages=[ {"role": "system", "content": "Write content from this outline."}, {"role": "user", "content": plan}, ], ) print(response.choices[0].message.content)pipeline("Benefits of API gateways")respan.flush()
The Batch API lets you submit large batches of requests for async processing at 50% cost. Use respan.log_batch_results() to log each batch result as an individual traced span in Respan.Respan also provides a Batch API endpoint for batch processing with tracking parameters.
The Batch API requires a direct OPENAI_API_KEY. It does not go through the Respan gateway.
Copy
import osimport jsonimport timefrom openai import OpenAIfrom respan import Respan, workflow, taskfrom respan_instrumentation_openai import OpenAIInstrumentorrespan = Respan(instrumentations=[OpenAIInstrumentor()])# Batch API requires direct OpenAI accessclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))@task(name="create_batch_file")def create_batch_file() -> str: tasks = [] for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"]): tasks.append({ "custom_id": f"topic-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4.1-nano", "messages": [ {"role": "system", "content": "Explain in one sentence."}, {"role": "user", "content": f"What is {topic}?"}, ], }, }) file_path = "/tmp/batch_input.jsonl" with open(file_path, "w") as f: for t in tasks: f.write(json.dumps(t) + "\n") return file_path@task(name="upload_and_submit")def upload_and_submit(file_path: str) -> str: batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch") batch = client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h", ) return batch.id@task(name="poll_batch")def poll_batch(batch_id: str) -> str: while True: batch = client.batches.retrieve(batch_id) if batch.status == "completed": return batch.output_file_id elif batch.status in ("failed", "expired", "cancelled"): raise RuntimeError(f"Batch {batch.status}") time.sleep(5)@task(name="download_results")def download_results(output_file_id: str): content = client.files.content(output_file_id).content results = [json.loads(line) for line in content.decode().strip().split("\n")] with open("/tmp/batch_input.jsonl") as f: requests = [json.loads(line) for line in f] # Log each batch result as an individual traced span respan.log_batch_results(requests, results) for r in results: print(f"{r['custom_id']}: {r['response']['body']['choices'][0]['message']['content']}")@workflow(name="batch_pipeline")def run(): file_path = create_batch_file() batch_id = upload_and_submit(file_path) output_file_id = poll_batch(batch_id) download_results(output_file_id)run()respan.flush()
For long-running batches where submission and retrieval happen in separate processes, save the trace_id and pass it to log_batch_results() later to link results back to the original trace.
Copy
import osimport jsonimport timefrom openai import OpenAIfrom respan import Respan, workflow, task, get_clientfrom respan_instrumentation_openai import OpenAIInstrumentorrespan = Respan(instrumentations=[OpenAIInstrumentor()])client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))# Phase 1: Submit batch and save trace context@task(name="create_and_submit")def create_and_submit(requests: list) -> dict: file_path = "/tmp/batch_async.jsonl" with open(file_path, "w") as f: for r in requests: f.write(json.dumps(r) + "\n") batch_file = client.files.create(file=open(file_path, "rb"), purpose="batch") batch = client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h", ) # Save trace ID — in production, persist this to a database rc = get_client() trace_id = rc.get_current_trace_id() return {"batch_id": batch.id, "trace_id": trace_id, "input_file": file_path}@workflow(name="batch_submit")def submit(requests): return create_and_submit(requests)# Phase 2: Retrieve results (separate process/job)def retrieve_and_log(saved: dict): batch_id = saved["batch_id"] while True: batch = client.batches.retrieve(batch_id) if batch.status == "completed": break elif batch.status in ("failed", "expired", "cancelled"): raise RuntimeError(f"Batch {batch.status}") time.sleep(5) content = client.files.content(batch.output_file_id).content results = [json.loads(line) for line in content.decode().strip().split("\n")] with open(saved["input_file"]) as f: requests = [json.loads(line) for line in f] # Log results into the ORIGINAL trace respan.log_batch_results(requests, results, trace_id=saved["trace_id"])# Runrequests = [ { "custom_id": f"topic-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-4.1-nano", "messages": [ {"role": "system", "content": "Explain in one sentence."}, {"role": "user", "content": f"What is {topic}?"}, ], }, } for i, topic in enumerate(["quantum computing", "blockchain", "edge computing"])]saved = submit(requests)respan.flush()# Later (could be a different process)...retrieve_and_log(saved)respan.flush()