@workflow

Overview

The @workflow decorator creates a root trace span. All nested @task, @agent, and @tool spans are captured as children under this workflow.

1from respan import workflow

Parameters

ParameterTypeDefaultDescription
namestr | NoneFunction nameDisplay name for the workflow span.
versionint | NoneNoneVersion number for the workflow.
method_namestr | NoneNoneRequired when decorating a class. Specifies which method to use as the entry point.
processorsstr | List[str] | NoneNoneRoute this span to specific named processors. See add_processor.
export_filterFilterParamDict | NoneNoneFilter dict to control which spans are exported. Uses AND logic. See Export filtering.

Function usage

1from respan import Respan, workflow, task
2
3respan = Respan(api_key="your-api-key")
4
5@task(name="extract")
6def extract():
7 return {"records": [1, 2, 3]}
8
9@task(name="transform")
10def transform(data):
11 return [x * 2 for x in data["records"]]
12
13@workflow(name="data_pipeline")
14def data_pipeline():
15 data = extract()
16 return transform(data)
17
18print(data_pipeline()) # [2, 4, 6]

Class usage

When decorating a class, use method_name to specify the entry point. Calling that method creates the workflow span.

1from respan import Respan, workflow, task
2from openai import OpenAI
3
4respan = Respan(api_key="your-api-key")
5client = OpenAI()
6
7@workflow(name="analysis_workflow", method_name="run")
8class Analyzer:
9 @task(name="analyze")
10 def analyze(self, nums):
11 return sum(nums)
12
13 def run(self):
14 return self.analyze([1, 2, 3])
15
16print(Analyzer().run()) # 6

Processor routing

Use processors to send workflow spans to specific exporters only.

1@workflow(name="debug_workflow", processors="debug")
2def debug_workflow():
3 return "only exported to the 'debug' processor"
4
5@workflow(name="multi_export", processors=["debug", "analytics"])
6def multi_export():
7 return "exported to both 'debug' and 'analytics' processors"

Export filtering

Use export_filter to conditionally export spans based on their attributes. All conditions use AND logic.

1from respan import workflow
2
3# Only export workflow spans that ended with an error
4@workflow(
5 name="monitored_workflow",
6 export_filter={"status_code": {"operator": "", "value": "ERROR"}}
7)
8def monitored_workflow():
9 return "only exported if the span has ERROR status"

Supported operators: "" (equals), "not", "gt", "gte", "lt", "lte", "contains", "icontains", "startswith", "endswith", "regex", "in", "not_in", "empty", "not_empty".

Best practices

  • Use descriptive workflow names for easy navigation in the Traces dashboard
  • Keep workflows coarse-grained — use @task for internal steps
  • One workflow per user request or pipeline run