Message Processing Emulation Models¶
This module provides data models used for message processing emulation in Opik. These models represent the core data structures for traces, spans, and feedback scores that are used internally by the Opik SDK during evaluation.
Overview¶
The message processing emulation models are primarily used in evaluation contexts, particularly for task span evaluation where custom metrics need access to detailed execution information. These models provide a structured representation of:
Traces: Complete execution paths of requests or operations
Spans: Individual steps or operations within a trace
Feedback Scores: Evaluation results attached to traces and spans
Key Classes¶
Class Hierarchy¶
The models form a hierarchical relationship:
TraceModel
├── spans: List[SpanModel]
│ ├── spans: List[SpanModel] (nested spans)
│ └── feedback_scores: List[FeedbackScoreModel]
└── feedback_scores: List[FeedbackScoreModel]
Quick Start¶
Import the models:
from opik.message_processing.emulation.models import (
TraceModel,
SpanModel,
FeedbackScoreModel
)
Common Usage Patterns¶
Task Span Evaluation¶
The primary use case for these models is in task span evaluation, where custom metrics analyze span data:
from opik.evaluation.metrics import BaseMetric, score_result
from opik.message_processing.emulation.models import SpanModel
class CustomSpanMetric(BaseMetric):
def score(self, task_span: SpanModel) -> score_result.ScoreResult:
# Access span properties
span_name = task_span.name
input_data = task_span.input
output_data = task_span.output
# Perform evaluation logic
score_value = self.evaluate_span(span_name, input_data, output_data)
return score_result.ScoreResult(
value=score_value,
name=self.name,
reason=f"Evaluated span: {span_name}"
)
Analyzing Trace Structure¶
You can traverse and analyze the hierarchical structure of traces:
def analyze_trace_structure(trace: TraceModel):
print(f"Trace: {trace.name}")
print(f"Total spans: {len(trace.spans)}")
for span in trace.spans:
print(f" Span: {span.name} (type: {span.type})")
# Analyze nested spans
for nested_span in span.spans:
print(f" Nested: {nested_span.name}")
Working with Feedback Scores¶
Both traces and spans can contain feedback scores from evaluations:
def collect_all_scores(trace: TraceModel):
all_scores = []
# Collect trace-level scores
all_scores.extend(trace.feedback_scores)
# Collect span-level scores
for span in trace.spans:
all_scores.extend(span.feedback_scores)
# Recursively collect from nested spans
for nested_span in span.spans:
all_scores.extend(nested_span.feedback_scores)
return all_scores
Integration with Evaluation System¶
These models are automatically populated and used by the Opik evaluation system:
Trace Creation: When you run
opik.evaluate()
, traces are automatically createdSpan Population: Individual function calls become spans within the trace
Task Span Evaluation: Metrics with
task_span
parameters receiveSpanModel
objectsScore Attachment: Feedback scores are automatically attached to the appropriate traces and spans
You typically don’t need to create these models manually - they’re generated automatically during evaluation. However, understanding their structure is essential for writing effective task span evaluation metrics.
Use Cases¶
These models are commonly used for:
Custom Evaluation Metrics: Analyzing detailed execution data in custom metrics
Performance Analysis: Understanding execution patterns and performance characteristics
Debugging: Investigating issues in complex operations
Cost Tracking: Aggregating usage and cost information across operations
Quality Assessment: Evaluating the quality of individual steps and overall operations
Module Reference¶
FeedbackScoreModel¶
- class opik.message_processing.emulation.models.FeedbackScoreModel(id: str, name: str, value: float, category_name: str | None = None, reason: str | None = None)¶
Bases:
object
Represents a model for a feedback score used to evaluate specific spans or traces.
This class stores and manages feedback scores linked to defined criteria, including identifiers, names, values, categories, and explanations for each score.
- id¶
Unique identifier for the feedback score.
- Type:
str
- name¶
Name associated with the feedback score.
- Type:
str
- value¶
The numerical value of the feedback score.
- Type:
float
- category_name¶
Category to which the feedback score belongs, if any.
- Type:
str | None
- reason¶
Reason or explanation for the feedback score, if available.
- Type:
str | None
- id: str¶
- name: str¶
- value: float¶
- category_name: str | None = None¶
- reason: str | None = None¶
SpanModel¶
- class opik.message_processing.emulation.models.SpanModel(id: str, start_time: ~datetime.datetime, name: str | None = None, input: ~typing.Dict[str, ~typing.Any] | None = None, output: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] | None = None, metadata: ~typing.Dict[str, ~typing.Any] | None = None, type: str = 'general', usage: ~typing.Dict[str, ~typing.Any] | None = None, end_time: ~datetime.datetime | None = None, project_name: str = 'Default Project', spans: ~typing.List[~opik.message_processing.emulation.models.SpanModel] = <factory>, feedback_scores: ~typing.List[~opik.message_processing.emulation.models.FeedbackScoreModel] = <factory>, model: str | None = None, provider: str | None = None, error_info: ~opik.types.ErrorInfoDict | None = None, total_cost: float | None = None, last_updated_at: ~datetime.datetime | None = None)¶
Bases:
object
Represents a span model used to describe specific points in a process, their metadata, and associated data.
This class is used to store and manipulate structured data for events or spans, including metadata, time markers, associated input/output, tags, and additional properties. It serves as a representative structure for recording and organizing event-specific information, often used in applications like logging, distributed tracing, or data processing pipelines.
- id¶
Unique identifier for the span.
- Type:
str
- start_time¶
Start time of the span.
- Type:
datetime.datetime
- name¶
Name of the span, if provided.
- Type:
str | None
- input¶
Input data associated with the span, if any.
- Type:
Dict[str, Any] | None
- output¶
Output data associated with the span, if any.
- Type:
Dict[str, Any] | None
- tags¶
List of tags linked to the span.
- Type:
List[str] | None
- metadata¶
Additional metadata for the span.
- Type:
Dict[str, Any] | None
- type¶
Type of the span, defaulting to “general”.
- Type:
str
- usage¶
Usage-related information for the span.
- Type:
Dict[str, Any] | None
- end_time¶
End time of the span, if available.
- Type:
datetime.datetime | None
- project_name¶
Name of the project the span is associated with, defaulting to a predefined project name.
- Type:
str
- spans¶
List of nested spans related to this span.
- feedback_scores¶
List of feedback scores associated with the span.
- model¶
Model identification used, if applicable.
- Type:
str | None
- provider¶
Provider of the span or associated services, if any.
- Type:
str | None
- error_info¶
Error information or diagnostics for the span, if applicable.
- Type:
opik.types.ErrorInfoDict | None
- total_cost¶
Total cost incurred associated with this span, if relevant.
- Type:
float | None
- last_updated_at¶
Timestamp of when the span was last updated, if available.
- Type:
datetime.datetime | None
- id: str¶
- start_time: datetime¶
- name: str | None = None¶
- input: Dict[str, Any] | None = None¶
- output: Dict[str, Any] | None = None¶
- tags: List[str] | None = None¶
- metadata: Dict[str, Any] | None = None¶
- type: str = 'general'¶
- usage: Dict[str, Any] | None = None¶
- end_time: datetime | None = None¶
- project_name: str = 'Default Project'¶
- feedback_scores: List[FeedbackScoreModel]¶
- model: str | None = None¶
- provider: str | None = None¶
- error_info: ErrorInfoDict | None = None¶
- total_cost: float | None = None¶
- last_updated_at: datetime | None = None¶
TraceModel¶
- class opik.message_processing.emulation.models.TraceModel(id: str, start_time: ~datetime.datetime, name: str | None, project_name: str, input: ~typing.Dict[str, ~typing.Any] | None = None, output: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] | None = None, metadata: ~typing.Dict[str, ~typing.Any] | None = None, end_time: ~datetime.datetime | None = None, spans: ~typing.List[~opik.message_processing.emulation.models.SpanModel] = <factory>, feedback_scores: ~typing.List[~opik.message_processing.emulation.models.FeedbackScoreModel] = <factory>, error_info: ~opik.types.ErrorInfoDict | None = None, thread_id: str | None = None, last_updated_at: ~datetime.datetime | None = None)¶
Bases:
object
Represents a trace model that encapsulates data about a trace, its related metadata, and associated spans. It is used for tracking and analyzing data during execution or processing tasks.
This class provides a structure to represent trace information, including the start and end times, associated project details, input/output data, feedback scores, error information, and thread association. It is designed to handle optional fields for flexible use across various scenarios.
- id¶
Unique identifier for the trace.
- Type:
str
- start_time¶
Timestamp representing the start of the trace.
- Type:
datetime.datetime
- name¶
Optional name for the trace, which can provide a descriptive label.
- Type:
str | None
- project_name¶
Name of the project associated with the trace.
- Type:
str
- input¶
Optional dictionary containing the input data associated with the trace.
- Type:
Dict[str, Any] | None
- output¶
Optional dictionary containing the output data generated by the trace.
- Type:
Dict[str, Any] | None
- tags¶
Optional list of tags associated with the trace for classification or filtering purposes.
- Type:
List[str] | None
- metadata¶
Optional metadata providing additional information about the trace.
- Type:
Dict[str, Any] | None
- end_time¶
Timestamp representing the end of the trace.
- Type:
datetime.datetime | None
- spans¶
List of spans associated with the trace, representing individual processing parts or segments within the trace.
- feedback_scores¶
List of feedback scores associated with the trace.
- error_info¶
Optional dictionary containing information about errors encountered during the trace.
- Type:
opik.types.ErrorInfoDict | None
- thread_id¶
Optional identifier of the thread associated with the trace.
- Type:
str | None
- last_updated_at¶
Timestamp for when the trace was last updated.
- Type:
datetime.datetime | None
- id: str¶
- start_time: datetime¶
- name: str | None¶
- project_name: str¶
- input: Dict[str, Any] | None = None¶
- output: Dict[str, Any] | None = None¶
- tags: List[str] | None = None¶
- metadata: Dict[str, Any] | None = None¶
- end_time: datetime | None = None¶
- feedback_scores: List[FeedbackScoreModel]¶
- error_info: ErrorInfoDict | None = None¶
- thread_id: str | None = None¶
- last_updated_at: datetime | None = None¶