Task Span Metrics

Task span metrics are a powerful type of evaluation metric in Opik that can analyze the detailed execution information of your LLM tasks. Unlike traditional metrics that only evaluate input-output pairs, task span metrics have access to the complete execution context, including intermediate steps, metadata, timing information, and hierarchical structure.

Important: only spans created with @trak decorators and native OPIK integrations are available for task span metrics.

What are Task Span Metrics?

Task span metrics receive a task_span parameter containing a SpanModel object that represents the complete execution context of your task. This includes:

  • Execution Details: Input, output, start/end times, and execution metadata
  • Nested Operations: Hierarchical structure of sub-operations and function calls
  • Performance Data: Timing, cost, usage statistics, and resource consumption
  • Error Information: Detailed error context and diagnostic information
  • Provider Metadata: Model information, API provider details, and configuration

When to Use Task Span Metrics

Task span metrics are particularly valuable for:

  • Performance Analysis: Evaluating execution speed, resource usage, and efficiency
  • Quality Assessment: Analyzing the quality of intermediate steps and decision-making
  • Cost Optimization: Tracking and optimizing API costs and resource consumption
  • Agent Evaluation: Assessing agent trajectories and decision-making patterns
  • Debugging: Understanding execution flows and identifying performance bottlenecks
  • Compliance: Ensuring tasks execute within expected parameters and constraints

Creating Task Span Metrics

To create a task span metric, define a class that inherits from BaseMetric and implements a score method that accepts a task_span parameter:

1from typing import Any, Dict, Optional
2from opik.evaluation.metrics import BaseMetric, score_result
3from opik.message_processing.emulation.models import SpanModel
4
5class TaskExecutionQualityMetric(BaseMetric):
6 def __init__(
7 self,
8 name: str = "task_execution_quality",
9 track: bool = True,
10 project_name: Optional[str] = None,
11 ):
12 super().__init__(name=name, track=track, project_name=project_name)
13
14 def _check_execution_success_recursively(self, span: SpanModel) -> Dict[str, Any]:
15 """Recursively check execution success across the span tree."""
16 execution_stats = {
17 'has_errors': False,
18 'error_count': 0,
19 'failed_spans': [],
20 'total_spans_checked': 0
21 }
22
23 # Check current span for errors
24 execution_stats['total_spans_checked'] += 1
25 if span.error_info:
26 execution_stats['has_errors'] = True
27 execution_stats['error_count'] += 1
28 execution_stats['failed_spans'].append(span.name)
29
30 # Recursively check nested spans
31 for nested_span in span.spans:
32 nested_stats = self._check_execution_success_recursively(nested_span)
33 execution_stats['has_errors'] = execution_stats['has_errors'] or nested_stats['has_errors']
34 execution_stats['error_count'] += nested_stats['error_count']
35 execution_stats['failed_spans'].extend(nested_stats['failed_spans'])
36 execution_stats['total_spans_checked'] += nested_stats['total_spans_checked']
37
38 return execution_stats
39
40 def score(self, task_span: SpanModel) -> score_result.ScoreResult:
41 # Check execution success across the entire span tree.
42 # Only for illustrative purposes.
43 # Please adjust for your specific use case!
44 execution_stats = self._check_execution_success_recursively(task_span)
45 execution_successful = not execution_stats['has_errors']
46
47 # Check output availability
48 has_output = task_span.output is not None
49
50 # Calculate execution time
51 execution_time = None
52 if task_span.start_time and task_span.end_time:
53 execution_time = (task_span.end_time - task_span.start_time).total_seconds()
54
55 # Custom scoring logic based on execution characteristics
56 if not execution_successful:
57 error_count = execution_stats['error_count']
58 failed_spans_count = len(execution_stats['failed_spans'])
59 total_spans = execution_stats['total_spans_checked']
60
61 if error_count == 1 and total_spans > 5:
62 score_value = 0.4
63 reason = f"Minor execution issues: 1 error in {total_spans} spans ({execution_stats['failed_spans'][0]})"
64 elif failed_spans_count <= 2:
65 score_value = 0.2
66 reason = f"Limited execution failures: {failed_spans_count} failed spans out of {total_spans}"
67 else:
68 score_value = 0.0
69 reason = f"Major execution failures: {failed_spans_count} failed spans across {total_spans} operations"
70 elif not has_output:
71 score_value = 0.3
72 reason = f"Task completed without errors across {execution_stats['total_spans_checked']} spans but produced no output"
73 elif execution_time and execution_time > 30.0:
74 score_value = 0.6
75 reason = f"Task executed successfully across {execution_stats['total_spans_checked']} spans but took too long: {execution_time:.2f}s"
76 else:
77 score_value = 1.0
78 span_count = execution_stats['total_spans_checked']
79 reason = f"Task executed successfully across all {span_count} spans with good performance"
80
81 return score_result.ScoreResult(
82 value=score_value,
83 name=self.name,
84 reason=reason
85 )

Accessing Span Properties

The SpanModel object provides rich information about task execution:

Basic Properties

1class BasicSpanAnalysisMetric(BaseMetric):
2 def score(self, task_span: SpanModel) -> score_result.ScoreResult:
3 # Basic span information
4 span_id = task_span.id
5 span_name = task_span.name
6 span_type = task_span.type # "general", "llm", "tool", etc.
7
8 # Input/Output analysis
9 input_data = task_span.input
10 output_data = task_span.output
11
12 # Metadata and tags
13 metadata = task_span.metadata
14 tags = task_span.tags
15
16 # Your scoring logic here
17 return score_result.ScoreResult(value=1.0, name=self.name)

Performance Metrics

1class PerformanceMetric(BaseMetric):
2 def _find_model_and_provider_recursively(self, span: SpanModel, model_found: str = None, provider_found: str = None):
3 """Recursively search through span tree to find model and provider information."""
4 # Check current span
5 if not model_found and span.model:
6 model_found = span.model
7 if not provider_found and span.provider:
8 provider_found = span.provider
9
10 # If both found, return early
11 if model_found and provider_found:
12 return model_found, provider_found
13
14 # Recursively search nested spans
15 for nested_span in span.spans:
16 model_found, provider_found = self._find_model_and_provider_recursively(
17 nested_span, model_found, provider_found
18 )
19 # If both found, return early
20 if model_found and provider_found:
21 return model_found, provider_found
22
23 return model_found, provider_found
24
25 def _calculate_usage_recursively(self, span: SpanModel, usage_summary: dict = None):
26 """Recursively calculate usage statistics from the entire span tree."""
27 if usage_summary is None:
28 usage_summary = {
29 'total_prompt_tokens': 0,
30 'total_completion_tokens': 0,
31 'total_tokens': 0,
32 'total_spans_count': 0,
33 'llm_spans_count': 0,
34 'tool_spans_count': 0
35 }
36
37 # Count current span
38 usage_summary['total_spans_count'] += 1
39
40 # Count span types
41 if span.type == 'llm':
42 usage_summary['llm_spans_count'] += 1
43 elif span.type == 'tool':
44 usage_summary['tool_spans_count'] += 1
45
46 # Add usage from current span
47 if span.usage and isinstance(span.usage, dict):
48 usage_summary['total_prompt_tokens'] += span.usage.get('prompt_tokens', 0)
49 usage_summary['total_completion_tokens'] += span.usage.get('completion_tokens', 0)
50 usage_summary['total_tokens'] += span.usage.get('total_tokens', 0)
51
52 # Recursively process nested spans
53 for nested_span in span.spans:
54 self._calculate_usage_recursively(nested_span, usage_summary)
55
56 return usage_summary
57
58 def score(self, task_span: SpanModel) -> score_result.ScoreResult:
59 # Timing analysis
60 # Only for illustrative purposes.
61 # Please adjust for your specific use case!
62 start_time = task_span.start_time
63 end_time = task_span.end_time
64 duration = (end_time - start_time).total_seconds() if start_time and end_time else None
65
66 # Get model and provider from anywhere in the span tree
67 model_used, provider = self._find_model_and_provider_recursively(
68 task_span, task_span.model, task_span.provider
69 )
70
71 # Calculate comprehensive usage statistics from entire span tree
72 usage_info = self._calculate_usage_recursively(task_span)
73
74 # Performance-based scoring with enhanced analysis
75 if duration and duration < 2.0:
76 score_value = 1.0
77 reason = f"Excellent performance: {duration:.2f}s"
78 if model_used:
79 reason += f" using {model_used}"
80 if provider:
81 reason += f" ({provider})"
82 if usage_info['total_tokens'] > 0:
83 reason += f", {usage_info['total_tokens']} total tokens across {usage_info['llm_spans_count']} LLM calls"
84 elif duration and duration < 10.0:
85 score_value = 0.7
86 reason = f"Good performance: {duration:.2f}s"
87 if usage_info['total_spans_count'] > 1:
88 reason += f" with {usage_info['total_spans_count']} operations"
89 else:
90 score_value = 0.5
91 reason = "Performance could be improved"
92 if duration:
93 reason += f" (took {duration:.2f}s)"
94 if usage_info['llm_spans_count'] > 5:
95 reason += f" - consider optimizing {usage_info['llm_spans_count']} LLM calls"
96
97 return score_result.ScoreResult(
98 value=score_value,
99 name=self.name,
100 reason=reason
101 )

Error Analysis

Task span metrics can analyze execution failures and errors:

1class ErrorAnalysisMetric(BaseMetric):
2 def _collect_errors_recursively(self, span: SpanModel, errors: list = None):
3 """Recursively collect all errors from the span tree."""
4 if errors is None:
5 errors = []
6
7 # Check current span for errors
8 if span.error_info:
9 error_entry = {
10 'span_id': span.id,
11 'span_name': span.name,
12 'span_type': span.type,
13 'error_info': span.error_info
14 }
15 errors.append(error_entry)
16
17 # Recursively check nested spans
18 for nested_span in span.spans:
19 self._collect_errors_recursively(nested_span, errors)
20
21 return errors
22
23 def score(self, task_span: SpanModel) -> score_result.ScoreResult:
24 # Collect all errors from the entire span tree
25 all_errors = self._collect_errors_recursively(task_span)
26
27 if not all_errors:
28 return score_result.ScoreResult(
29 value=1.0,
30 name=self.name,
31 reason="No errors detected in any span"
32 )
33
34 reason = f"Found {len(all_errors)} error(s) across multiple spans"
35 return score_result.ScoreResult(
36 value=0.0,
37 name=self.name,
38 reason=reason
39 )

Using Task Span Metrics in Evaluation

Task span metrics work seamlessly with regular evaluation metrics and are automatically detected by the evaluation system:

1from opik import evaluate
2from opik.evaluation.metrics import Equals
3
4# Mix regular and task span metrics
5equals_metric = Equals()
6quality_metric = TaskExecutionQualityMetric()
7performance_metric = PerformanceMetric()
8
9evaluation = evaluate(
10 dataset=dataset,
11 task=evaluation_task,
12 scoring_metrics=[
13 equals_metric, # Regular metric (input/output)
14 quality_metric, # Task span metric (execution analysis)
15 performance_metric, # Task span metric (performance analysis)
16 ],
17 experiment_name="Comprehensive Task Analysis"
18)

Best Practices

1. Handle Missing Data Gracefully

Always check for None values in optional span attributes:

1def score(self, task_span: SpanModel) -> score_result.ScoreResult:
2 # Safe access to optional fields
3 duration = None
4 if task_span.start_time and task_span.end_time:
5 duration = (task_span.end_time - task_span.start_time).total_seconds()
6
7 cost = task_span.total_cost if task_span.total_cost else 0.0
8 metadata = task_span.metadata or {}

2. Focus on Execution Patterns

Use task span metrics to evaluate how your application executes, not just the final output:

1# Good: Analyzing execution patterns
2def _analyze_caching_efficiency_recursively(self, span: SpanModel, cache_stats: Dict[str, Any] = None) -> Dict[str, Any]:
3 """Recursively analyze caching efficiency across the span tree."""
4 if cache_stats is None:
5 cache_stats = {
6 'total_llm_calls': 0,
7 'llm_cache_hits': 0,
8 'llm_cache_misses': 0,
9 'other_cache_hits': 0,
10 'cached_llm_spans': [],
11 'cached_other_spans': [],
12 'llm_spans': []
13 }
14
15 # Track LLM calls and their caching status
16 if span.type == "llm":
17 cache_stats['total_llm_calls'] += 1
18 cache_stats['llm_spans'].append(span.name)
19
20 # Check for caching indicators in metadata
21 metadata = span.metadata or {}
22 tags = span.tags or []
23
24 is_cached = (
25 any(cache_key in metadata for cache_key in ["cache_hit", "cached", "from_cache"]) or
26 any(cache_tag in tags for cache_tag in ["cache_hit", "cached"]) or
27 metadata.get("cache_hit", False) or
28 metadata.get("cached", False)
29 )
30
31 if is_cached:
32 cache_stats['llm_cache_hits'] += 1
33 cache_stats['cached_llm_spans'].append(span.name)
34 else:
35 cache_stats['llm_cache_misses'] += 1
36
37 # Track non-LLM spans for caching indicators (e.g., database queries, API calls)
38 else:
39 metadata = span.metadata or {}
40 tags = span.tags or []
41
42 if (any(cache_key in metadata for cache_key in ["cache_hit", "cached", "from_cache"]) or
43 any(cache_tag in tags for cache_tag in ["cache_hit", "cached"])):
44 cache_stats['other_cache_hits'] += 1
45 cache_stats['cached_other_spans'].append(span.name)
46
47 # Recursively check nested spans
48 for nested_span in span.spans:
49 self._analyze_caching_efficiency_recursively(nested_span, cache_stats)
50
51 return cache_stats
52
53def score(self, task_span: SpanModel) -> score_result.ScoreResult:
54 # Analyze caching efficiency across an entire span tree.
55 # Only for illustrative purposes.
56 # Please adjust for your specific use case!
57 cache_stats = self._analyze_caching_efficiency_recursively(task_span)
58
59 llm_cache_hits = cache_stats['llm_cache_hits']
60 total_llm_calls = cache_stats['total_llm_calls']
61 other_cache_hits = cache_stats['other_cache_hits']
62
63 # Calculate a cache hit ratio specifically for LLM calls
64 llm_cache_hit_ratio = llm_cache_hits / max(1, total_llm_calls) if total_llm_calls > 0 else 0
65
66 # Score based on LLM caching efficiency and total call volume
67 if total_llm_calls == 0:
68 # Consider other cache hits for non-LLM operations
69 if other_cache_hits > 0:
70 return score_result.ScoreResult(
71 value=0.7,
72 name=self.name,
73 reason=f"No LLM calls, but {other_cache_hits} other operations cached"
74 )
75 else:
76 return score_result.ScoreResult(
77 value=0.5,
78 name=self.name,
79 reason="No LLM calls detected"
80 )
81 elif llm_cache_hit_ratio >= 0.8:
82 reason = f"Excellent LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})"
83 if other_cache_hits > 0:
84 reason += f" + {other_cache_hits} other cached operations"
85 return score_result.ScoreResult(
86 value=1.0,
87 name=self.name,
88 reason=reason
89 )
90 elif llm_cache_hit_ratio >= 0.5:
91 reason = f"Good LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})"
92 if other_cache_hits > 0:
93 reason += f" + {other_cache_hits} other cached operations"
94 return score_result.ScoreResult(
95 value=0.9,
96 name=self.name,
97 reason=reason
98 )
99 elif llm_cache_hit_ratio > 0:
100 reason = f"Some LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})"
101 if other_cache_hits > 0:
102 reason += f" + {other_cache_hits} other cached operations"
103 return score_result.ScoreResult(
104 value=0.7,
105 name=self.name,
106 reason=reason
107 )
108 elif total_llm_calls > 5:
109 return score_result.ScoreResult(
110 value=0.2,
111 name=self.name,
112 reason=f"No caching with {total_llm_calls} LLM calls - high cost/latency risk"
113 )
114 elif total_llm_calls > 3:
115 return score_result.ScoreResult(
116 value=0.4,
117 name=self.name,
118 reason=f"No caching with {total_llm_calls} LLM calls - consider adding cache"
119 )
120 else:
121 return score_result.ScoreResult(
122 value=0.8,
123 name=self.name,
124 reason=f"Efficient execution: {total_llm_calls} LLM calls (caching not critical)"
125 )

3. Combine with Regular Metrics

Task span metrics provide the most value when combined with traditional output-based metrics:

1# Comprehensive evaluation approach
2scoring_metrics = [
3 # Output quality metrics
4 Equals(),
5 Hallucination(),
6
7 # Execution analysis metrics
8 TaskExecutionQualityMetric(),
9 PerformanceMetric(),
10
11 # Cost optimization metrics
12 CostEfficiencyMetric(),
13]

4. Security Considerations

Be mindful of sensitive data in span information:

1def score(self, task_span: SpanModel) -> score_result.ScoreResult:
2 # Avoid logging sensitive input data
3 input_size = len(str(task_span.input)) if task_span.input else 0
4
5 # Use aggregated information instead of raw data
6 return score_result.ScoreResult(
7 value=1.0 if input_size < 1000 else 0.5,
8 name=self.name,
9 reason=f"Input size: {input_size} characters"
10 )

Complete Example: Agent Trajectory Analysis metric

Here’s a comprehensive example that analyzes agent decision-making:

1class AgentTrajectoryMetric(BaseMetric):
2 def __init__(self, max_steps: int = 10, name: str = "agent_trajectory_quality"):
3 super().__init__(name=name)
4 self.max_steps = max_steps
5
6 def _analyze_trajectory_recursively(self, span: SpanModel, trajectory_stats: Dict[str, Any] = None) -> Dict[str, Any]:
7 """Recursively analyze agent trajectory across the span tree."""
8 if trajectory_stats is None:
9 trajectory_stats = {
10 'total_steps': 0,
11 'tool_uses': 0,
12 'llm_reasoning': 0,
13 'other_steps': 0,
14 'tool_spans': [],
15 'llm_spans': [],
16 'step_names': [],
17 'max_depth': 0,
18 'current_depth': 0
19 }
20
21 # Count current span as a step
22 trajectory_stats['total_steps'] += 1
23 trajectory_stats['step_names'].append(span.name)
24 trajectory_stats['max_depth'] = max(trajectory_stats['max_depth'], trajectory_stats['current_depth'])
25
26 # Categorize span types for agent decision analysis
27 if span.type == "tool":
28 trajectory_stats['tool_uses'] += 1
29 trajectory_stats['tool_spans'].append(span.name)
30 elif span.type == "llm":
31 trajectory_stats['llm_reasoning'] += 1
32 trajectory_stats['llm_spans'].append(span.name)
33 else:
34 trajectory_stats['other_steps'] += 1
35
36 # Recursively analyze nested spans with depth tracking
37 for nested_span in span.spans:
38 trajectory_stats['current_depth'] += 1
39 self._analyze_trajectory_recursively(nested_span, trajectory_stats)
40 trajectory_stats['current_depth'] -= 1
41
42 return trajectory_stats
43
44 def score(self, task_span: SpanModel) -> score_result.ScoreResult:
45 # Analyze agent trajectory across an entire span tree
46 trajectory_stats = self._analyze_trajectory_recursively(task_span)
47
48 total_steps = trajectory_stats['total_steps']
49 tool_uses = trajectory_stats['tool_uses']
50 llm_reasoning = trajectory_stats['llm_reasoning']
51 max_depth = trajectory_stats['max_depth']
52
53 # Check for an efficient path
54 if total_steps == 0:
55 return score_result.ScoreResult(
56 value=0.0, name=self.name,
57 reason="No decision steps found"
58 )
59
60 # Analyze trajectory quality with enhanced metrics.
61 # Only for illustrative purposes.
62 # Please adjust for your specific use case!
63 if tool_uses == 0 and llm_reasoning == 0:
64 score = 0.1
65 reason = f"Poor trajectory: {total_steps} steps with no tools or reasoning"
66 elif tool_uses == 0:
67 score = 0.3
68 reason = f"Agent used {llm_reasoning} reasoning steps but no tools across {total_steps} operations"
69 elif llm_reasoning == 0:
70 score = 0.4
71 reason = f"Agent used {tool_uses} tools but no reasoning across {total_steps} operations"
72 elif total_steps > self.max_steps:
73 # Penalize excessive steps but consider tool/reasoning balance
74 efficiency_penalty = max(0.1, 1.0 - (total_steps - self.max_steps) * 0.05)
75 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning)
76 score = min(0.6, efficiency_penalty * balance_ratio)
77 reason = f"Excessive steps: {total_steps} > {self.max_steps} (depth: {max_depth}, tools: {tool_uses}, reasoning: {llm_reasoning})"
78 else:
79 # Calculate a comprehensive score based on multiple factors.
80 # Only for illustrative purposes.
81 # Please adjust for your specific use case!
82 #
83 # 1. Step efficiency (fewer steps = better)
84 # 1. Step efficiency (fewer steps = better)
85 step_efficiency = min(1.0, self.max_steps / total_steps)
86
87 # 2. Tool-reasoning balance (closer to 1:1 ratio = better)
88 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning) if max(tool_uses, llm_reasoning) > 0 else 0
89 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning) if max(tool_uses, llm_reasoning) > 0 else 0
90
91 # 3. Depth complexity (moderate depth suggests good decomposition)
92 depth_score = 1.0 if max_depth <= 3 else max(0.7, 1.0 - (max_depth - 3) * 0.1)
93
94 # 4. Decision density (good ratio of reasoning to total steps)
95 decision_density = llm_reasoning / total_steps if total_steps > 0 else 0
96 density_score = 1.0 if decision_density >= 0.3 else decision_density / 0.3
97
98 # Combine all factors
99 score = (step_efficiency * 0.3 + balance_ratio * 0.3 + depth_score * 0.2 + density_score * 0.2)
100
101 if score >= 0.8:
102 reason = f"Excellent trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning - well balanced"
103 elif score >= 0.6:
104 reason = f"Good trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning"
105 else:
106 reason = f"Acceptable trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning - could be optimized"
107
108 return score_result.ScoreResult(
109 value=score,
110 name=self.name,
111 reason=reason
112 )

Integration with LLM Evaluation

For a complete guide on using task span metrics in LLM evaluation workflows, see the Using task span evaluation metrics section in the LLM evaluation guide.