Medical diagnosis workflow
A full agent workflow using@workflow, @task, and @tool with concurrent specialist agents.
Copy
from respan_tracing import RespanTelemetry, workflow, task, tool
from openai import OpenAI
telemetry = RespanTelemetry(api_key="your-api-key")
client = OpenAI()
@tool(name="load_medical_report")
def load_medical_report(path: str):
with open(path, "r") as f:
return f.read()
@task(name="specialist_analysis")
def specialist_analysis(role: str, report: str):
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"You are a {role}. Analyze this report."},
{"role": "user", "content": report},
],
)
return response.choices[0].message.content
@task(name="final_diagnosis")
def final_diagnosis(analyses: dict):
combined = "\n".join(f"{k}: {v}" for k, v in analyses.items())
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Synthesize these specialist analyses into a final diagnosis."},
{"role": "user", "content": combined},
],
)
return response.choices[0].message.content
@workflow(name="medical_diagnosis_workflow")
def run_diagnosis(report_path: str):
report = load_medical_report(report_path)
analyses = {
"Cardiologist": specialist_analysis("cardiologist", report),
"Pulmonologist": specialist_analysis("pulmonologist", report),
}
return final_diagnosis(analyses)
Update span with customer metadata
Attach user identifiers and custom metadata to spans during execution.Copy
from respan_tracing import RespanTelemetry, get_client, workflow
from openai import OpenAI
telemetry = RespanTelemetry(api_key="your-api-key")
@workflow(name="user_query")
def user_query(user_id: str, prompt: str):
client = get_client()
client.update_current_span(
respan_params={
"customer_identifier": user_id,
"metadata": {"source": "api", "tier": "premium"},
}
)
response = OpenAI().chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
user_query("user-123", "Explain tracing")
Events and exception handling
Add structured events and record caught exceptions for debugging.Copy
from respan_tracing import RespanTelemetry, get_client, workflow, task
from opentelemetry.trace import StatusCode
telemetry = RespanTelemetry(api_key="your-api-key")
@task(name="validate_data")
def validate_data(data):
client = get_client()
try:
if not data or len(str(data)) < 3:
raise ValueError("Data is too short")
client.update_current_span(
status=StatusCode.OK,
attributes={"validation.result": "success"},
)
return f"validated_{data}"
except Exception as e:
client.record_exception(e)
client.update_current_span(attributes={"validation.result": "failed"})
raise
@workflow(name="data_processing")
def data_processing(data):
client = get_client()
client.add_event("processing_started", {"data_size": len(str(data))})
result = validate_data(data)
client.add_event("processing_completed")
return result
Multiple exporters with processor routing
Route different spans to different destinations using named processors.Copy
from respan_tracing import RespanTelemetry, workflow, task
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
class FileExporter(SpanExporter):
def __init__(self, filename: str):
self.filename = filename
def export(self, spans):
with open(self.filename, "a") as f:
for span in spans:
f.write(f"{span.name}: {span.status}\n")
return SpanExportResult.SUCCESS
# Default Respan exporter + named "debug" file exporter
telemetry = RespanTelemetry(api_key="your-api-key")
telemetry.add_processor(
exporter=FileExporter("debug_traces.log"),
name="debug",
)
@task(name="debug_only_task", processors="debug")
def debug_only_task():
return "only goes to file exporter"
@task(name="everywhere_task")
def everywhere_task():
return "goes to both Respan and file exporter"
@workflow(name="routing_example")
def routing_example():
debug_only_task()
everywhere_task()
Span buffering and batch export
Collect spans into a buffer for manual inspection and deferred export.Copy
from respan_tracing import RespanTelemetry, get_client
telemetry = RespanTelemetry(api_key="your-api-key")
client = get_client()
with client.get_span_buffer("batch-trace-001") as buffer:
buffer.create_span("step_1", {"status": "completed"})
buffer.create_span("step_2", {"status": "completed"})
buffer.create_span("step_3", {"status": "failed"})
print(f"Buffered spans: {buffer.get_span_count()}") # 3
collected = buffer.get_all_spans()
# Export all at once
client.process_spans(collected)
Manual span creation
Use the OpenTelemetry tracer directly for fine-grained span control.Copy
from respan_tracing import RespanTelemetry, get_client, workflow
telemetry = RespanTelemetry(api_key="your-api-key")
@workflow(name="manual_spans")
def manual_spans():
client = get_client()
tracer = client.get_tracer()
with tracer.start_as_current_span("data_fetch") as span:
span.set_attribute("source", "database")
data = [1, 2, 3]
with tracer.start_as_current_span("data_transform") as span:
span.set_attribute("transform", "double")
result = [x * 2 for x in data]
return result
manual_spans()
Context-based attributes
Apply Respan attributes to all spans within a context block.Copy
from respan_tracing import RespanTelemetry, workflow, task, respan_span_attributes
telemetry = RespanTelemetry(api_key="your-api-key")
@task(name="inner_task")
def inner_task():
return "processed"
@workflow(name="context_workflow")
def context_workflow(user_id: str):
with respan_span_attributes({
"customer_identifier": user_id,
"trace_group_identifier": "experiment-v2",
"metadata": {"team": "ml"},
}):
return inner_task()
context_workflow("user-789")
Multi-LLM provider workflow
Trace calls across multiple LLM providers in a single workflow. Useblock_instruments to suppress noisy HTTP-level spans.
Copy
import asyncio
from openai import OpenAI
from anthropic import Anthropic
from respan_tracing import RespanTelemetry, get_client
from respan_tracing.decorators import workflow, task
from respan_tracing.instruments import Instruments
telemetry = RespanTelemetry(
app_name="multi-llm",
api_key="your-api-key",
block_instruments={Instruments.REQUESTS, Instruments.URLLIB3},
)
openai_client = OpenAI()
anthropic_client = Anthropic()
@task(name="openai_call")
async def openai_call(prompt: str) -> str:
resp = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=50,
)
return resp.choices[0].message.content
@task(name="anthropic_call")
async def anthropic_call(prompt: str) -> str:
resp = anthropic_client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=50,
messages=[{"role": "user", "content": prompt}],
)
return resp.content[0].text
@workflow(name="multi_llm_workflow")
async def multi_llm_workflow(prompt: str):
client = get_client()
client.update_current_span(
respan_params={
"customer_identifier": "multi_llm_user",
"metadata": {"prompt": prompt},
}
)
openai_result = await openai_call(prompt)
anthropic_result = await anthropic_call(prompt)
return {"openai": openai_result, "anthropic": anthropic_result}
asyncio.run(multi_llm_workflow("Say hello briefly."))
telemetry.flush()
Cost and pricing updates
Set custom cost and unit pricing on spans for accurate cost tracking.Copy
from respan_tracing import RespanTelemetry, get_client, workflow, task
telemetry = RespanTelemetry(api_key="your-api-key")
@task(name="tracked_generation")
def tracked_generation():
client = get_client()
# After an LLM call, update cost and pricing
prompt_tokens, completion_tokens = 120, 80
prompt_unit_price = 0.0000025 # per token
completion_unit_price = 0.000010 # per token
cost = prompt_tokens * prompt_unit_price + completion_tokens * completion_unit_price
client.update_current_span(
attributes={
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_request_tokens": prompt_tokens + completion_tokens,
"cost": cost,
"prompt_unit_price": prompt_unit_price,
"completion_unit_price": completion_unit_price,
}
)
Customer details and TTFT
Attach customer email/name and track time-to-first-token (TTFT) via metadata.Copy
import time
from respan_tracing import RespanTelemetry, get_client, workflow, task
telemetry = RespanTelemetry(api_key="your-api-key")
@task(name="customer_aware_task")
def customer_aware_task():
client = get_client()
# Set customer details
client.update_current_span(
respan_params={
"customer_email": "user@example.com",
"customer_name": "Demo User",
}
)
# Track TTFT
start = time.perf_counter()
# ... wait for first token from LLM ...
ttft = round(time.perf_counter() - start, 3)
client.update_current_span(
respan_params={
"metadata": {"time_to_first_token": ttft}
}
)
Debug logging
Enable verbose SDK logging to troubleshoot tracing issues.Copy
from respan_tracing import RespanTelemetry
telemetry = RespanTelemetry(
api_key="your-api-key",
app_name="debug_app",
log_level="DEBUG",
)
log_level="DEBUG" to see detailed output from the tracer, processors, and exporters.