Message Processing Emulation Models

This module provides data models used for message processing emulation in Opik. These models represent the core data structures for traces, spans, and feedback scores that are used internally by the Opik SDK during evaluation.

Overview

The message processing emulation models are primarily used in evaluation contexts, particularly for task span evaluation where custom metrics need access to detailed execution information. These models provide a structured representation of:

  • Traces: Complete execution paths of requests or operations

  • Spans: Individual steps or operations within a trace

  • Feedback Scores: Evaluation results attached to traces and spans

Key Classes

Class Hierarchy

The models form a hierarchical relationship:

TraceModel
├── spans: List[SpanModel]
│   ├── spans: List[SpanModel]  (nested spans)
│   └── feedback_scores: List[FeedbackScoreModel]
└── feedback_scores: List[FeedbackScoreModel]

Quick Start

Import the models:

from opik.message_processing.emulation.models import (
    TraceModel,
    SpanModel,
    FeedbackScoreModel
)

Common Usage Patterns

Task Span Evaluation

The primary use case for these models is in task span evaluation, where custom metrics analyze span data:

from opik.evaluation.metrics import BaseMetric, score_result
from opik.message_processing.emulation.models import SpanModel

class CustomSpanMetric(BaseMetric):
    def score(self, task_span: SpanModel) -> score_result.ScoreResult:
        # Access span properties
        span_name = task_span.name
        input_data = task_span.input
        output_data = task_span.output

        # Perform evaluation logic
        score_value = self.evaluate_span(span_name, input_data, output_data)

        return score_result.ScoreResult(
            value=score_value,
            name=self.name,
            reason=f"Evaluated span: {span_name}"
        )

Analyzing Trace Structure

You can traverse and analyze the hierarchical structure of traces:

def analyze_trace_structure(trace: TraceModel):
    print(f"Trace: {trace.name}")
    print(f"Total spans: {len(trace.spans)}")

    for span in trace.spans:
        print(f"  Span: {span.name} (type: {span.type})")

        # Analyze nested spans
        for nested_span in span.spans:
            print(f"    Nested: {nested_span.name}")

Working with Feedback Scores

Both traces and spans can contain feedback scores from evaluations:

def collect_all_scores(trace: TraceModel):
    all_scores = []

    # Collect trace-level scores
    all_scores.extend(trace.feedback_scores)

    # Collect span-level scores
    for span in trace.spans:
        all_scores.extend(span.feedback_scores)

        # Recursively collect from nested spans
        for nested_span in span.spans:
            all_scores.extend(nested_span.feedback_scores)

    return all_scores

Integration with Evaluation System

These models are automatically populated and used by the Opik evaluation system:

  1. Trace Creation: When you run opik.evaluate(), traces are automatically created

  2. Span Population: Individual function calls become spans within the trace

  3. Task Span Evaluation: Metrics with task_span parameters receive SpanModel objects

  4. Score Attachment: Feedback scores are automatically attached to the appropriate traces and spans

You typically don’t need to create these models manually - they’re generated automatically during evaluation. However, understanding their structure is essential for writing effective task span evaluation metrics.

Use Cases

These models are commonly used for:

  • Custom Evaluation Metrics: Analyzing detailed execution data in custom metrics

  • Performance Analysis: Understanding execution patterns and performance characteristics

  • Debugging: Investigating issues in complex operations

  • Cost Tracking: Aggregating usage and cost information across operations

  • Quality Assessment: Evaluating the quality of individual steps and overall operations

Module Reference

FeedbackScoreModel

class opik.message_processing.emulation.models.FeedbackScoreModel(id: str, name: str, value: float, category_name: str | None = None, reason: str | None = None)

Bases: object

Represents a model for a feedback score used to evaluate specific spans or traces.

This class stores and manages feedback scores linked to defined criteria, including identifiers, names, values, categories, and explanations for each score.

id

Unique identifier for the feedback score.

Type:

str

name

Name associated with the feedback score.

Type:

str

value

The numerical value of the feedback score.

Type:

float

category_name

Category to which the feedback score belongs, if any.

Type:

str | None

reason

Reason or explanation for the feedback score, if available.

Type:

str | None

id: str
name: str
value: float
category_name: str | None = None
reason: str | None = None

SpanModel

class opik.message_processing.emulation.models.SpanModel(id: str, start_time: ~datetime.datetime, name: str | None = None, input: ~typing.Dict[str, ~typing.Any] | None = None, output: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] | None = None, metadata: ~typing.Dict[str, ~typing.Any] | None = None, type: str = 'general', usage: ~typing.Dict[str, ~typing.Any] | None = None, end_time: ~datetime.datetime | None = None, project_name: str = 'Default Project', spans: ~typing.List[~opik.message_processing.emulation.models.SpanModel] = <factory>, feedback_scores: ~typing.List[~opik.message_processing.emulation.models.FeedbackScoreModel] = <factory>, model: str | None = None, provider: str | None = None, error_info: ~opik.types.ErrorInfoDict | None = None, total_cost: float | None = None, last_updated_at: ~datetime.datetime | None = None)

Bases: object

Represents a span model used to describe specific points in a process, their metadata, and associated data.

This class is used to store and manipulate structured data for events or spans, including metadata, time markers, associated input/output, tags, and additional properties. It serves as a representative structure for recording and organizing event-specific information, often used in applications like logging, distributed tracing, or data processing pipelines.

id

Unique identifier for the span.

Type:

str

start_time

Start time of the span.

Type:

datetime.datetime

name

Name of the span, if provided.

Type:

str | None

input

Input data associated with the span, if any.

Type:

Dict[str, Any] | None

output

Output data associated with the span, if any.

Type:

Dict[str, Any] | None

tags

List of tags linked to the span.

Type:

List[str] | None

metadata

Additional metadata for the span.

Type:

Dict[str, Any] | None

type

Type of the span, defaulting to “general”.

Type:

str

usage

Usage-related information for the span.

Type:

Dict[str, Any] | None

end_time

End time of the span, if available.

Type:

datetime.datetime | None

project_name

Name of the project the span is associated with, defaulting to a predefined project name.

Type:

str

spans

List of nested spans related to this span.

Type:

List[opik.message_processing.emulation.models.SpanModel]

feedback_scores

List of feedback scores associated with the span.

Type:

List[opik.message_processing.emulation.models.FeedbackScoreModel]

model

Model identification used, if applicable.

Type:

str | None

provider

Provider of the span or associated services, if any.

Type:

str | None

error_info

Error information or diagnostics for the span, if applicable.

Type:

opik.types.ErrorInfoDict | None

total_cost

Total cost incurred associated with this span, if relevant.

Type:

float | None

last_updated_at

Timestamp of when the span was last updated, if available.

Type:

datetime.datetime | None

id: str
start_time: datetime
name: str | None = None
input: Dict[str, Any] | None = None
output: Dict[str, Any] | None = None
tags: List[str] | None = None
metadata: Dict[str, Any] | None = None
type: str = 'general'
usage: Dict[str, Any] | None = None
end_time: datetime | None = None
project_name: str = 'Default Project'
spans: List[SpanModel]
feedback_scores: List[FeedbackScoreModel]
model: str | None = None
provider: str | None = None
error_info: ErrorInfoDict | None = None
total_cost: float | None = None
last_updated_at: datetime | None = None

TraceModel

class opik.message_processing.emulation.models.TraceModel(id: str, start_time: ~datetime.datetime, name: str | None, project_name: str, input: ~typing.Dict[str, ~typing.Any] | None = None, output: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] | None = None, metadata: ~typing.Dict[str, ~typing.Any] | None = None, end_time: ~datetime.datetime | None = None, spans: ~typing.List[~opik.message_processing.emulation.models.SpanModel] = <factory>, feedback_scores: ~typing.List[~opik.message_processing.emulation.models.FeedbackScoreModel] = <factory>, error_info: ~opik.types.ErrorInfoDict | None = None, thread_id: str | None = None, last_updated_at: ~datetime.datetime | None = None)

Bases: object

Represents a trace model that encapsulates data about a trace, its related metadata, and associated spans. It is used for tracking and analyzing data during execution or processing tasks.

This class provides a structure to represent trace information, including the start and end times, associated project details, input/output data, feedback scores, error information, and thread association. It is designed to handle optional fields for flexible use across various scenarios.

id

Unique identifier for the trace.

Type:

str

start_time

Timestamp representing the start of the trace.

Type:

datetime.datetime

name

Optional name for the trace, which can provide a descriptive label.

Type:

str | None

project_name

Name of the project associated with the trace.

Type:

str

input

Optional dictionary containing the input data associated with the trace.

Type:

Dict[str, Any] | None

output

Optional dictionary containing the output data generated by the trace.

Type:

Dict[str, Any] | None

tags

Optional list of tags associated with the trace for classification or filtering purposes.

Type:

List[str] | None

metadata

Optional metadata providing additional information about the trace.

Type:

Dict[str, Any] | None

end_time

Timestamp representing the end of the trace.

Type:

datetime.datetime | None

spans

List of spans associated with the trace, representing individual processing parts or segments within the trace.

Type:

List[opik.message_processing.emulation.models.SpanModel]

feedback_scores

List of feedback scores associated with the trace.

Type:

List[opik.message_processing.emulation.models.FeedbackScoreModel]

error_info

Optional dictionary containing information about errors encountered during the trace.

Type:

opik.types.ErrorInfoDict | None

thread_id

Optional identifier of the thread associated with the trace.

Type:

str | None

last_updated_at

Timestamp for when the trace was last updated.

Type:

datetime.datetime | None

id: str
start_time: datetime
name: str | None
project_name: str
input: Dict[str, Any] | None = None
output: Dict[str, Any] | None = None
tags: List[str] | None = None
metadata: Dict[str, Any] | None = None
end_time: datetime | None = None
spans: List[SpanModel]
feedback_scores: List[FeedbackScoreModel]
error_info: ErrorInfoDict | None = None
thread_id: str | None = None
last_updated_at: datetime | None = None