Skip to content

comet_ml.integration.vertex ¶

CometVertexPipelineLogger ¶

CometVertexPipelineLogger(
    api_key: Optional[str] = None,
    workspace: Optional[str] = None,
    project_name: Optional[str] = None,
    packages_to_install: Optional[List[str]] = None,
    base_image: Optional[str] = None,
    custom_experiment: Optional[comet_ml.CometExperiment] = None,
    share_api_key_to_workers: bool = False,
)

Creates a local experiment for tracking vertex pipeline work and provides an API to track vertex tasks with their own Comet experiments.

Parameters:

  • api_key (Optional[str], default: None ) –

    Your Comet API Key, if not provided, the value set in the configuration system will be used.

  • project_name (Optional[str], default: None ) –

    The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • workspace (Optional[str], default: None ) –

    The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • packages_to_install (Optional[List[str]], default: None ) –

    Which packages to install, given directly to kfp.components.create_component_from_func. Default is ["google-cloud-aiplatform", "comet_ml"].

  • base_image (Optional[str], default: None ) –

    Which docker image to use. If not provided, the default Kubeflow base image will be used.

  • custom_experiment (Optional[CometExperiment], default: None ) –

    The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.

  • share_api_key_to_workers (bool, default: False ) –

    If True, Comet API key will be shared with workers by setting COMET_API_KEY environment variable. This is an unsafe solution and we recommend you to use a more secure way to set up your API Key in your cluster.

Example
1
2
3
4
5
6
7
@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    logger = comet_ml.integration.vertex.CometVertexPipelineLogger()

    data_preprocessing_op = components.load_component_from_file("data_preprocessing.yaml")

track_task ¶

track_task(
    task: Any, additional_environment: Optional[Dict[str, str]] = None
) -> Any

Inject all required information to track the given Vertex task with Comet. You still need to create an experiment inside that task.

Parameters:

  • task (Any) –

    The Vertex task to be tracked with Comet.

  • additional_environment (Optional[Dict[str, str]], default: None ) –

    A dictionary of additional environment variables to be set up in the tracked task.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def data_preprocessing(input: str) -> str:
    from comet_ml import Experiment

    # The `track_task` method automatically injects the workspace name and project name.
    # If `share_api_key_to_workers` is set to True, the Comet API Key can also be injected
    # by `CometVertexPipelineLogger`.
    # All Vertex information is automatically logged to the Experiment when the task is
    # wrapped with the `track_task` method.
    experiment = Experiment()

    for i in range(60):
        experiment.log_metric("accuracy", math.log(i + random.random())) time.sleep(0.1)
    experiment.end()

    return input

@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    logger = comet_ml.integration.vertex.CometVertexPipelineLogger()

    data_preprocessing_op = kfp.components.create_component_from_func(
        func=data_preprocessing, packages_to_install=["comet_ml"]
    )

    task_1 = logger.track_task(data_preprocessing_op("test"))

comet_logger_component ¶

comet_logger_component(
    api_key: Optional[str] = None,
    project_name: Optional[str] = None,
    workspace: Optional[str] = None,
    packages_to_install: Optional[List[str]] = None,
    base_image: Optional[str] = None,
    custom_experiment: Optional[CometExperiment] = None,
) -> Any

Inject the Comet Logger component which continuously track and report the current pipeline status to Comet.ml.

Deprecated: Use comet_ml.integration.vertex.CometVertexPipelineLogger instead.

Parameters:

  • api_key (Optional[str], default: None ) –

    string, optional. Your Comet API Key, if not provided, the value set in the configuration system will be used.

  • project_name (Optional[str], default: None ) –

    string, optional. The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • workspace (Optional[str], default: None ) –

    string, optional. The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • packages_to_install (Optional[List[str]], default: None ) –

    List of string, optional. Which packages to install, given directly to kfp.components.create_component_from_func. Default is ["google-cloud-aiplatform", "comet_ml"].

  • base_image (Optional[str], default: None ) –

    string, optional. Which docker image to use. If not provided, the default Kubeflow base image will be used.

  • custom_experiment (Optional[CometExperiment], default: None ) –

    Experiment, optional. The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.

@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    comet_ml.integration.vertex.comet_logger_component()

initialize_comet_logger ¶

initialize_comet_logger(
    experiment: CometExperiment,
    pipeline_job_name: str,
    pipeline_task_name: str,
    pipeline_task_uuid: str,
) -> CometExperiment

Logs the Vertex task identifiers needed to track your pipeline status in Comet.ml. You need to call this function from every component you want to track with Comet.ml.

Deprecated: If you are using comet_ml.integration.vertex.CometVertexPipelineLogger.track_task, you don't need to use this function anymore.

Parameters:

  • experiment (CometExperiment) –

    An already created Experiment object.

  • pipeline_job_name (str) –

    The Vertex pipeline job name, see below how to get it automatically from Vertex.

  • pipeline_task_name (str) –

    The Vertex task name, see below how to get it automatically from Vertex.

  • pipeline_task_uuid (str) –

    The Vertex task unique id, see below how to get it automatically from Vertex.

Example
1
2
3
4
5
6
7
8
import comet_ml.integration.vertex

experiment = comet_ml.Experiment() pipeline_run_name = "{{$.pipeline_job_name}}"
pipeline_task_name = "{{$.pipeline_task_name}}" pipeline_task_id =
"{{$.pipeline_task_uuid}}"

comet_ml.integration.Vertex.initialize_comet_logger(experiment, pipeline_run_name,
pipeline_task_name, pipeline_task_id)
Dec. 2, 2024