Task Span Metrics
Task span metrics are a powerful type of evaluation metric in Opik that can analyze the detailed execution information of your LLM tasks. Unlike traditional metrics that only evaluate input-output pairs, task span metrics have access to the complete execution context, including intermediate steps, metadata, timing information, and hierarchical structure.
Important: only spans created with @trak
decorators and native OPIK integrations are available for task span metrics.
What are Task Span Metrics?
Task span metrics receive a task_span
parameter containing a SpanModel
object that represents the complete execution context of your task. This includes:
- Execution Details: Input, output, start/end times, and execution metadata
- Nested Operations: Hierarchical structure of sub-operations and function calls
- Performance Data: Timing, cost, usage statistics, and resource consumption
- Error Information: Detailed error context and diagnostic information
- Provider Metadata: Model information, API provider details, and configuration
When to Use Task Span Metrics
Task span metrics are particularly valuable for:
- Performance Analysis: Evaluating execution speed, resource usage, and efficiency
- Quality Assessment: Analyzing the quality of intermediate steps and decision-making
- Cost Optimization: Tracking and optimizing API costs and resource consumption
- Agent Evaluation: Assessing agent trajectories and decision-making patterns
- Debugging: Understanding execution flows and identifying performance bottlenecks
- Compliance: Ensuring tasks execute within expected parameters and constraints
Creating Task Span Metrics
To create a task span metric, define a class that inherits from BaseMetric
and implements a score
method that accepts a task_span
parameter:
1 from typing import Any, Dict, Optional 2 from opik.evaluation.metrics import BaseMetric, score_result 3 from opik.message_processing.emulation.models import SpanModel 4 5 class TaskExecutionQualityMetric(BaseMetric): 6 def __init__( 7 self, 8 name: str = "task_execution_quality", 9 track: bool = True, 10 project_name: Optional[str] = None, 11 ): 12 super().__init__(name=name, track=track, project_name=project_name) 13 14 def _check_execution_success_recursively(self, span: SpanModel) -> Dict[str, Any]: 15 """Recursively check execution success across the span tree.""" 16 execution_stats = { 17 'has_errors': False, 18 'error_count': 0, 19 'failed_spans': [], 20 'total_spans_checked': 0 21 } 22 23 # Check current span for errors 24 execution_stats['total_spans_checked'] += 1 25 if span.error_info: 26 execution_stats['has_errors'] = True 27 execution_stats['error_count'] += 1 28 execution_stats['failed_spans'].append(span.name) 29 30 # Recursively check nested spans 31 for nested_span in span.spans: 32 nested_stats = self._check_execution_success_recursively(nested_span) 33 execution_stats['has_errors'] = execution_stats['has_errors'] or nested_stats['has_errors'] 34 execution_stats['error_count'] += nested_stats['error_count'] 35 execution_stats['failed_spans'].extend(nested_stats['failed_spans']) 36 execution_stats['total_spans_checked'] += nested_stats['total_spans_checked'] 37 38 return execution_stats 39 40 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 41 # Check execution success across the entire span tree. 42 # Only for illustrative purposes. 43 # Please adjust for your specific use case! 44 execution_stats = self._check_execution_success_recursively(task_span) 45 execution_successful = not execution_stats['has_errors'] 46 47 # Check output availability 48 has_output = task_span.output is not None 49 50 # Calculate execution time 51 execution_time = None 52 if task_span.start_time and task_span.end_time: 53 execution_time = (task_span.end_time - task_span.start_time).total_seconds() 54 55 # Custom scoring logic based on execution characteristics 56 if not execution_successful: 57 error_count = execution_stats['error_count'] 58 failed_spans_count = len(execution_stats['failed_spans']) 59 total_spans = execution_stats['total_spans_checked'] 60 61 if error_count == 1 and total_spans > 5: 62 score_value = 0.4 63 reason = f"Minor execution issues: 1 error in {total_spans} spans ({execution_stats['failed_spans'][0]})" 64 elif failed_spans_count <= 2: 65 score_value = 0.2 66 reason = f"Limited execution failures: {failed_spans_count} failed spans out of {total_spans}" 67 else: 68 score_value = 0.0 69 reason = f"Major execution failures: {failed_spans_count} failed spans across {total_spans} operations" 70 elif not has_output: 71 score_value = 0.3 72 reason = f"Task completed without errors across {execution_stats['total_spans_checked']} spans but produced no output" 73 elif execution_time and execution_time > 30.0: 74 score_value = 0.6 75 reason = f"Task executed successfully across {execution_stats['total_spans_checked']} spans but took too long: {execution_time:.2f}s" 76 else: 77 score_value = 1.0 78 span_count = execution_stats['total_spans_checked'] 79 reason = f"Task executed successfully across all {span_count} spans with good performance" 80 81 return score_result.ScoreResult( 82 value=score_value, 83 name=self.name, 84 reason=reason 85 )
Accessing Span Properties
The SpanModel
object provides rich information about task execution:
Basic Properties
1 class BasicSpanAnalysisMetric(BaseMetric): 2 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 3 # Basic span information 4 span_id = task_span.id 5 span_name = task_span.name 6 span_type = task_span.type # "general", "llm", "tool", etc. 7 8 # Input/Output analysis 9 input_data = task_span.input 10 output_data = task_span.output 11 12 # Metadata and tags 13 metadata = task_span.metadata 14 tags = task_span.tags 15 16 # Your scoring logic here 17 return score_result.ScoreResult(value=1.0, name=self.name)
Performance Metrics
1 class PerformanceMetric(BaseMetric): 2 def _find_model_and_provider_recursively(self, span: SpanModel, model_found: str = None, provider_found: str = None): 3 """Recursively search through span tree to find model and provider information.""" 4 # Check current span 5 if not model_found and span.model: 6 model_found = span.model 7 if not provider_found and span.provider: 8 provider_found = span.provider 9 10 # If both found, return early 11 if model_found and provider_found: 12 return model_found, provider_found 13 14 # Recursively search nested spans 15 for nested_span in span.spans: 16 model_found, provider_found = self._find_model_and_provider_recursively( 17 nested_span, model_found, provider_found 18 ) 19 # If both found, return early 20 if model_found and provider_found: 21 return model_found, provider_found 22 23 return model_found, provider_found 24 25 def _calculate_usage_recursively(self, span: SpanModel, usage_summary: dict = None): 26 """Recursively calculate usage statistics from the entire span tree.""" 27 if usage_summary is None: 28 usage_summary = { 29 'total_prompt_tokens': 0, 30 'total_completion_tokens': 0, 31 'total_tokens': 0, 32 'total_spans_count': 0, 33 'llm_spans_count': 0, 34 'tool_spans_count': 0 35 } 36 37 # Count current span 38 usage_summary['total_spans_count'] += 1 39 40 # Count span types 41 if span.type == 'llm': 42 usage_summary['llm_spans_count'] += 1 43 elif span.type == 'tool': 44 usage_summary['tool_spans_count'] += 1 45 46 # Add usage from current span 47 if span.usage and isinstance(span.usage, dict): 48 usage_summary['total_prompt_tokens'] += span.usage.get('prompt_tokens', 0) 49 usage_summary['total_completion_tokens'] += span.usage.get('completion_tokens', 0) 50 usage_summary['total_tokens'] += span.usage.get('total_tokens', 0) 51 52 # Recursively process nested spans 53 for nested_span in span.spans: 54 self._calculate_usage_recursively(nested_span, usage_summary) 55 56 return usage_summary 57 58 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 59 # Timing analysis 60 # Only for illustrative purposes. 61 # Please adjust for your specific use case! 62 start_time = task_span.start_time 63 end_time = task_span.end_time 64 duration = (end_time - start_time).total_seconds() if start_time and end_time else None 65 66 # Get model and provider from anywhere in the span tree 67 model_used, provider = self._find_model_and_provider_recursively( 68 task_span, task_span.model, task_span.provider 69 ) 70 71 # Calculate comprehensive usage statistics from entire span tree 72 usage_info = self._calculate_usage_recursively(task_span) 73 74 # Performance-based scoring with enhanced analysis 75 if duration and duration < 2.0: 76 score_value = 1.0 77 reason = f"Excellent performance: {duration:.2f}s" 78 if model_used: 79 reason += f" using {model_used}" 80 if provider: 81 reason += f" ({provider})" 82 if usage_info['total_tokens'] > 0: 83 reason += f", {usage_info['total_tokens']} total tokens across {usage_info['llm_spans_count']} LLM calls" 84 elif duration and duration < 10.0: 85 score_value = 0.7 86 reason = f"Good performance: {duration:.2f}s" 87 if usage_info['total_spans_count'] > 1: 88 reason += f" with {usage_info['total_spans_count']} operations" 89 else: 90 score_value = 0.5 91 reason = "Performance could be improved" 92 if duration: 93 reason += f" (took {duration:.2f}s)" 94 if usage_info['llm_spans_count'] > 5: 95 reason += f" - consider optimizing {usage_info['llm_spans_count']} LLM calls" 96 97 return score_result.ScoreResult( 98 value=score_value, 99 name=self.name, 100 reason=reason 101 )
Error Analysis
Task span metrics can analyze execution failures and errors:
1 class ErrorAnalysisMetric(BaseMetric): 2 def _collect_errors_recursively(self, span: SpanModel, errors: list = None): 3 """Recursively collect all errors from the span tree.""" 4 if errors is None: 5 errors = [] 6 7 # Check current span for errors 8 if span.error_info: 9 error_entry = { 10 'span_id': span.id, 11 'span_name': span.name, 12 'span_type': span.type, 13 'error_info': span.error_info 14 } 15 errors.append(error_entry) 16 17 # Recursively check nested spans 18 for nested_span in span.spans: 19 self._collect_errors_recursively(nested_span, errors) 20 21 return errors 22 23 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 24 # Collect all errors from the entire span tree 25 all_errors = self._collect_errors_recursively(task_span) 26 27 if not all_errors: 28 return score_result.ScoreResult( 29 value=1.0, 30 name=self.name, 31 reason="No errors detected in any span" 32 ) 33 34 reason = f"Found {len(all_errors)} error(s) across multiple spans" 35 return score_result.ScoreResult( 36 value=0.0, 37 name=self.name, 38 reason=reason 39 )
Using Task Span Metrics in Evaluation
Task span metrics work seamlessly with regular evaluation metrics and are automatically detected by the evaluation system:
1 from opik import evaluate 2 from opik.evaluation.metrics import Equals 3 4 # Mix regular and task span metrics 5 equals_metric = Equals() 6 quality_metric = TaskExecutionQualityMetric() 7 performance_metric = PerformanceMetric() 8 9 evaluation = evaluate( 10 dataset=dataset, 11 task=evaluation_task, 12 scoring_metrics=[ 13 equals_metric, # Regular metric (input/output) 14 quality_metric, # Task span metric (execution analysis) 15 performance_metric, # Task span metric (performance analysis) 16 ], 17 experiment_name="Comprehensive Task Analysis" 18 )
Best Practices
1. Handle Missing Data Gracefully
Always check for None
values in optional span attributes:
1 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 2 # Safe access to optional fields 3 duration = None 4 if task_span.start_time and task_span.end_time: 5 duration = (task_span.end_time - task_span.start_time).total_seconds() 6 7 cost = task_span.total_cost if task_span.total_cost else 0.0 8 metadata = task_span.metadata or {}
2. Focus on Execution Patterns
Use task span metrics to evaluate how your application executes, not just the final output:
1 # Good: Analyzing execution patterns 2 def _analyze_caching_efficiency_recursively(self, span: SpanModel, cache_stats: Dict[str, Any] = None) -> Dict[str, Any]: 3 """Recursively analyze caching efficiency across the span tree.""" 4 if cache_stats is None: 5 cache_stats = { 6 'total_llm_calls': 0, 7 'llm_cache_hits': 0, 8 'llm_cache_misses': 0, 9 'other_cache_hits': 0, 10 'cached_llm_spans': [], 11 'cached_other_spans': [], 12 'llm_spans': [] 13 } 14 15 # Track LLM calls and their caching status 16 if span.type == "llm": 17 cache_stats['total_llm_calls'] += 1 18 cache_stats['llm_spans'].append(span.name) 19 20 # Check for caching indicators in metadata 21 metadata = span.metadata or {} 22 tags = span.tags or [] 23 24 is_cached = ( 25 any(cache_key in metadata for cache_key in ["cache_hit", "cached", "from_cache"]) or 26 any(cache_tag in tags for cache_tag in ["cache_hit", "cached"]) or 27 metadata.get("cache_hit", False) or 28 metadata.get("cached", False) 29 ) 30 31 if is_cached: 32 cache_stats['llm_cache_hits'] += 1 33 cache_stats['cached_llm_spans'].append(span.name) 34 else: 35 cache_stats['llm_cache_misses'] += 1 36 37 # Track non-LLM spans for caching indicators (e.g., database queries, API calls) 38 else: 39 metadata = span.metadata or {} 40 tags = span.tags or [] 41 42 if (any(cache_key in metadata for cache_key in ["cache_hit", "cached", "from_cache"]) or 43 any(cache_tag in tags for cache_tag in ["cache_hit", "cached"])): 44 cache_stats['other_cache_hits'] += 1 45 cache_stats['cached_other_spans'].append(span.name) 46 47 # Recursively check nested spans 48 for nested_span in span.spans: 49 self._analyze_caching_efficiency_recursively(nested_span, cache_stats) 50 51 return cache_stats 52 53 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 54 # Analyze caching efficiency across an entire span tree. 55 # Only for illustrative purposes. 56 # Please adjust for your specific use case! 57 cache_stats = self._analyze_caching_efficiency_recursively(task_span) 58 59 llm_cache_hits = cache_stats['llm_cache_hits'] 60 total_llm_calls = cache_stats['total_llm_calls'] 61 other_cache_hits = cache_stats['other_cache_hits'] 62 63 # Calculate a cache hit ratio specifically for LLM calls 64 llm_cache_hit_ratio = llm_cache_hits / max(1, total_llm_calls) if total_llm_calls > 0 else 0 65 66 # Score based on LLM caching efficiency and total call volume 67 if total_llm_calls == 0: 68 # Consider other cache hits for non-LLM operations 69 if other_cache_hits > 0: 70 return score_result.ScoreResult( 71 value=0.7, 72 name=self.name, 73 reason=f"No LLM calls, but {other_cache_hits} other operations cached" 74 ) 75 else: 76 return score_result.ScoreResult( 77 value=0.5, 78 name=self.name, 79 reason="No LLM calls detected" 80 ) 81 elif llm_cache_hit_ratio >= 0.8: 82 reason = f"Excellent LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})" 83 if other_cache_hits > 0: 84 reason += f" + {other_cache_hits} other cached operations" 85 return score_result.ScoreResult( 86 value=1.0, 87 name=self.name, 88 reason=reason 89 ) 90 elif llm_cache_hit_ratio >= 0.5: 91 reason = f"Good LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})" 92 if other_cache_hits > 0: 93 reason += f" + {other_cache_hits} other cached operations" 94 return score_result.ScoreResult( 95 value=0.9, 96 name=self.name, 97 reason=reason 98 ) 99 elif llm_cache_hit_ratio > 0: 100 reason = f"Some LLM caching: {llm_cache_hits}/{total_llm_calls} LLM calls cached ({llm_cache_hit_ratio:.1%})" 101 if other_cache_hits > 0: 102 reason += f" + {other_cache_hits} other cached operations" 103 return score_result.ScoreResult( 104 value=0.7, 105 name=self.name, 106 reason=reason 107 ) 108 elif total_llm_calls > 5: 109 return score_result.ScoreResult( 110 value=0.2, 111 name=self.name, 112 reason=f"No caching with {total_llm_calls} LLM calls - high cost/latency risk" 113 ) 114 elif total_llm_calls > 3: 115 return score_result.ScoreResult( 116 value=0.4, 117 name=self.name, 118 reason=f"No caching with {total_llm_calls} LLM calls - consider adding cache" 119 ) 120 else: 121 return score_result.ScoreResult( 122 value=0.8, 123 name=self.name, 124 reason=f"Efficient execution: {total_llm_calls} LLM calls (caching not critical)" 125 )
3. Combine with Regular Metrics
Task span metrics provide the most value when combined with traditional output-based metrics:
1 # Comprehensive evaluation approach 2 scoring_metrics = [ 3 # Output quality metrics 4 Equals(), 5 Hallucination(), 6 7 # Execution analysis metrics 8 TaskExecutionQualityMetric(), 9 PerformanceMetric(), 10 11 # Cost optimization metrics 12 CostEfficiencyMetric(), 13 ]
4. Security Considerations
Be mindful of sensitive data in span information:
1 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 2 # Avoid logging sensitive input data 3 input_size = len(str(task_span.input)) if task_span.input else 0 4 5 # Use aggregated information instead of raw data 6 return score_result.ScoreResult( 7 value=1.0 if input_size < 1000 else 0.5, 8 name=self.name, 9 reason=f"Input size: {input_size} characters" 10 )
Complete Example: Agent Trajectory Analysis metric
Here’s a comprehensive example that analyzes agent decision-making:
1 class AgentTrajectoryMetric(BaseMetric): 2 def __init__(self, max_steps: int = 10, name: str = "agent_trajectory_quality"): 3 super().__init__(name=name) 4 self.max_steps = max_steps 5 6 def _analyze_trajectory_recursively(self, span: SpanModel, trajectory_stats: Dict[str, Any] = None) -> Dict[str, Any]: 7 """Recursively analyze agent trajectory across the span tree.""" 8 if trajectory_stats is None: 9 trajectory_stats = { 10 'total_steps': 0, 11 'tool_uses': 0, 12 'llm_reasoning': 0, 13 'other_steps': 0, 14 'tool_spans': [], 15 'llm_spans': [], 16 'step_names': [], 17 'max_depth': 0, 18 'current_depth': 0 19 } 20 21 # Count current span as a step 22 trajectory_stats['total_steps'] += 1 23 trajectory_stats['step_names'].append(span.name) 24 trajectory_stats['max_depth'] = max(trajectory_stats['max_depth'], trajectory_stats['current_depth']) 25 26 # Categorize span types for agent decision analysis 27 if span.type == "tool": 28 trajectory_stats['tool_uses'] += 1 29 trajectory_stats['tool_spans'].append(span.name) 30 elif span.type == "llm": 31 trajectory_stats['llm_reasoning'] += 1 32 trajectory_stats['llm_spans'].append(span.name) 33 else: 34 trajectory_stats['other_steps'] += 1 35 36 # Recursively analyze nested spans with depth tracking 37 for nested_span in span.spans: 38 trajectory_stats['current_depth'] += 1 39 self._analyze_trajectory_recursively(nested_span, trajectory_stats) 40 trajectory_stats['current_depth'] -= 1 41 42 return trajectory_stats 43 44 def score(self, task_span: SpanModel) -> score_result.ScoreResult: 45 # Analyze agent trajectory across an entire span tree 46 trajectory_stats = self._analyze_trajectory_recursively(task_span) 47 48 total_steps = trajectory_stats['total_steps'] 49 tool_uses = trajectory_stats['tool_uses'] 50 llm_reasoning = trajectory_stats['llm_reasoning'] 51 max_depth = trajectory_stats['max_depth'] 52 53 # Check for an efficient path 54 if total_steps == 0: 55 return score_result.ScoreResult( 56 value=0.0, name=self.name, 57 reason="No decision steps found" 58 ) 59 60 # Analyze trajectory quality with enhanced metrics. 61 # Only for illustrative purposes. 62 # Please adjust for your specific use case! 63 if tool_uses == 0 and llm_reasoning == 0: 64 score = 0.1 65 reason = f"Poor trajectory: {total_steps} steps with no tools or reasoning" 66 elif tool_uses == 0: 67 score = 0.3 68 reason = f"Agent used {llm_reasoning} reasoning steps but no tools across {total_steps} operations" 69 elif llm_reasoning == 0: 70 score = 0.4 71 reason = f"Agent used {tool_uses} tools but no reasoning across {total_steps} operations" 72 elif total_steps > self.max_steps: 73 # Penalize excessive steps but consider tool/reasoning balance 74 efficiency_penalty = max(0.1, 1.0 - (total_steps - self.max_steps) * 0.05) 75 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning) 76 score = min(0.6, efficiency_penalty * balance_ratio) 77 reason = f"Excessive steps: {total_steps} > {self.max_steps} (depth: {max_depth}, tools: {tool_uses}, reasoning: {llm_reasoning})" 78 else: 79 # Calculate a comprehensive score based on multiple factors. 80 # Only for illustrative purposes. 81 # Please adjust for your specific use case! 82 # 83 # 1. Step efficiency (fewer steps = better) 84 # 1. Step efficiency (fewer steps = better) 85 step_efficiency = min(1.0, self.max_steps / total_steps) 86 87 # 2. Tool-reasoning balance (closer to 1:1 ratio = better) 88 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning) if max(tool_uses, llm_reasoning) > 0 else 0 89 balance_ratio = min(tool_uses, llm_reasoning) / max(tool_uses, llm_reasoning) if max(tool_uses, llm_reasoning) > 0 else 0 90 91 # 3. Depth complexity (moderate depth suggests good decomposition) 92 depth_score = 1.0 if max_depth <= 3 else max(0.7, 1.0 - (max_depth - 3) * 0.1) 93 94 # 4. Decision density (good ratio of reasoning to total steps) 95 decision_density = llm_reasoning / total_steps if total_steps > 0 else 0 96 density_score = 1.0 if decision_density >= 0.3 else decision_density / 0.3 97 98 # Combine all factors 99 score = (step_efficiency * 0.3 + balance_ratio * 0.3 + depth_score * 0.2 + density_score * 0.2) 100 101 if score >= 0.8: 102 reason = f"Excellent trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning - well balanced" 103 elif score >= 0.6: 104 reason = f"Good trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning" 105 else: 106 reason = f"Acceptable trajectory: {total_steps} steps (depth: {max_depth}), {tool_uses} tools, {llm_reasoning} reasoning - could be optimized" 107 108 return score_result.ScoreResult( 109 value=score, 110 name=self.name, 111 reason=reason 112 )
Integration with LLM Evaluation
For a complete guide on using task span metrics in LLM evaluation workflows, see the Using task span evaluation metrics section in the LLM evaluation guide.
Related Documentation
- Custom Metrics - Creating traditional input/output evaluation metrics
- SpanModel API Reference - Complete SpanModel documentation
- Evaluation Overview - Understanding Opik’s evaluation system