Pipecat (tracing)

Pipecat is an open-source framework for building real-time, multimodal AI applications. It provides a pipeline architecture for voice agents, video processing, and other real-time experiences with support for speech-to-text, LLMs, and text-to-speech. Respan gives you full observability over every Pipecat turn, LLM call, STT step, TTS step, and tool execution — and gateway routing through the OpenAI-compatible Respan endpoint.

Create an account at platform.respan.ai and grab an API key.

Run npx @respan/cli setup to set up with your coding agent.

See Pipecat gateway setup to route this integration through the Respan gateway.

Setup

1

Install packages

$pip install respan-ai respan-instrumentation-pipecat
2

Set environment variables

$export OPENAI_API_KEY="YOUR_OPENAI_API_KEY"
$export RESPAN_API_KEY="YOUR_RESPAN_API_KEY"

OPENAI_API_KEY is used by Pipecat’s OpenAI service. RESPAN_API_KEY exports traces to Respan. If you use different Pipecat services, set their provider keys as usual.

3

Initialize and run

1import asyncio
2import os
3
4from dotenv import load_dotenv
5from pipecat.frames.frames import EndFrame, LLMContextFrame, LLMFullResponseEndFrame, LLMTextFrame
6from pipecat.pipeline.pipeline import Pipeline
7from pipecat.pipeline.runner import PipelineRunner
8from pipecat.pipeline.task import PipelineTask
9from pipecat.processors.aggregators.llm_context import LLMContext
10from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
11from pipecat.services.openai.llm import OpenAILLMService
12from respan import Respan
13from respan_instrumentation_pipecat import PipecatInstrumentor
14
15load_dotenv()
16
17respan = Respan(instrumentations=[PipecatInstrumentor()])
18
19class TextCollector(FrameProcessor):
20 def __init__(self):
21 super().__init__(name="text_collector", enable_direct_mode=True)
22 self.text = []
23 self.done = asyncio.Event()
24
25 async def process_frame(self, frame, direction: FrameDirection):
26 await super().process_frame(frame, direction)
27 if isinstance(frame, LLMTextFrame):
28 self.text.append(frame.text)
29 elif isinstance(frame, LLMFullResponseEndFrame):
30 self.done.set()
31 await self.push_frame(frame, direction)
32
33async def main():
34 collector = TextCollector()
35 llm = OpenAILLMService(
36 api_key=os.environ["OPENAI_API_KEY"],
37 settings=OpenAILLMService.Settings(model=os.getenv("OPENAI_MODEL", "gpt-4.1-nano")),
38 )
39
40 pipeline = Pipeline([llm, collector])
41 task = PipelineTask(pipeline, cancel_on_idle_timeout=False, enable_rtvi=False)
42
43 async def push_frames():
44 await asyncio.sleep(0.05)
45 context = LLMContext(
46 messages=[{"role": "user", "content": "Reply in one short sentence about Pipecat tracing."}]
47 )
48 await task.queue_frame(LLMContextFrame(context))
49 await asyncio.wait_for(collector.done.wait(), timeout=30)
50 await asyncio.sleep(0.2)
51 await task.queue_frame(EndFrame())
52
53 await asyncio.gather(PipelineRunner(handle_sigint=False).run(task), push_frames())
54 print("".join(collector.text))
55
56 await asyncio.sleep(1)
57 respan.flush()
58
59asyncio.run(main())
4

View your trace

Open the Traces page to see your Pipecat turn with child LLM, STT, TTS, and tool spans.

Configuration

ParameterTypeDefaultDescription
api_keystr | NoneNoneFalls back to RESPAN_API_KEY env var.
base_urlstr | NoneNoneFalls back to RESPAN_BASE_URL env var.
instrumentationslist[]Plugin instrumentations to activate (e.g. PipecatInstrumentor()).
customer_identifierstr | NoneNoneDefault customer identifier for all spans.
thread_identifierstr | NoneNoneDefault thread identifier for all spans.
metadatadict | NoneNoneDefault metadata attached to all spans.
environmentstr | NoneNoneEnvironment tag (e.g. "production").

PipecatInstrumentor forwards keyword arguments to the upstream OpenInference Pipecat instrumentor, including options such as debug_log_filename and config.

Attributes

In Respan()

Set defaults at initialization — these apply to all spans.

1from respan import Respan
2from respan_instrumentation_pipecat import PipecatInstrumentor
3
4respan = Respan(
5 instrumentations=[PipecatInstrumentor()],
6 customer_identifier="user_123",
7 thread_identifier="session_abc_123",
8 metadata={"service": "voice-agent", "version": "1.0.0"},
9)

With propagate_attributes

Override per-request using a context scope.

1from respan import Respan, propagate_attributes
2from respan_instrumentation_pipecat import PipecatInstrumentor
3
4respan = Respan(instrumentations=[PipecatInstrumentor()])
5
6async def handle_session(user_id: str):
7 with propagate_attributes(
8 customer_identifier=user_id,
9 thread_identifier="session_abc_123",
10 metadata={"plan": "pro"},
11 ):
12 await runner.run(task)
AttributeTypeDescription
customer_identifierstrIdentifies the end user in Respan analytics.
thread_identifierstrGroups related messages into a conversation.
metadatadictCustom key-value pairs. Merged with default metadata.

Decorators (optional)

Decorators are not required. Pipecat turns, LLM calls, STT, TTS, and tools are auto-traced by the instrumentor. Use @workflow and @task to add application-level structure around a Pipecat session.

1from respan import Respan, workflow, task
2from respan_instrumentation_pipecat import PipecatInstrumentor
3
4respan = Respan(instrumentations=[PipecatInstrumentor()])
5
6@task(name="run_pipecat_task")
7async def run_pipecat_task(task):
8 await runner.run(task)
9
10@workflow(name="voice_session")
11async def voice_session(task):
12 await run_pipecat_task(task)
13 respan.flush()