Skip to content

integration.vertex

initialize_comet_logger

comet_ml.integration.vertex.initialize_comet_logger(
    experiment: BaseExperiment, pipeline_job_name: str,
    pipeline_task_name: str, pipeline_task_uuid: str) -> BaseExperiment

Logs the Vertex task identifiers needed to track your pipeline status in Comet.ml. You need to call this function from every component you want to track with Comet.ml.

Deprecated: If you are using CometVertexPipelineLogger.track_task, you don't need to use this function anymore.

Args:

  • experiment: An already created Experiment object. pipeline_job_name: string. The Vertex pipeline job name, see below how to get it automatically from Vertex. pipeline_task_name: string. The Vertex task name, see below how to get it automatically from Vertex.
  • pipeline_task_uuid: string. The Vertex task unique id, see below how to get it automatically from Vertex.

For example: ```python @kfp.dsl.v2.component def my_component() -> None: import comet_ml.integration.vertex

experiment = comet_ml.Experiment() pipeline_run_name = "{{$.pipeline_job_name}}"
pipeline_task_name = "{{$.pipeline_task_name}}" pipeline_task_id =
"{{$.pipeline_task_uuid}}"

comet_ml.integration.Vertex.initialize_comet_logger(experiment, pipeline_run_name,
pipeline_task_name, pipeline_task_id)

```

comet_logger_component

python comet_ml.integration.vertex.comet_logger_component( api_key: Optional[str] = None, project_name: Optional[str] = None, workspace: Optional[str] = None, packages_to_install: Optional[List[str]] = None, base_image: Optional[str] = None, custom_experiment: Optional[BaseExperiment] = None) -> Any

Inject the Comet Logger component which continuously track and report the current pipeline status to Comet.ml.

Deprecated: Use CometVertexPipelineLogger instead.

Args:

  • api_key: string, optional. Your Comet API Key, if not provided, the value set in the configuration system will be used.

  • project_name: string, optional. The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • workspace: string, optional. The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • packages_to_install: List of string, optional. Which packages to install, given directly to kfp.components.create_component_from_func. Default is ["google-cloud-aiplatform", "comet_ml"].

  • base_image: string, optional. Which docker image to use. If not provided, the default Kubeflow base image will be used.

  • custom_experiment: Experiment, optional. The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.

Example:

@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    comet_ml.integration.vertex.comet_logger_component()

CometVertexPipelineLogger

class comet_ml.integration.vertex.CometVertexPipelineLogger(self, api_key: Optional[str] = None, workspace: Optional[str] = None, project_name: Optional[str] = None, packages_to_install: Optional[List[str]] = None, base_image: Optional[str] = None, custom_experiment: Optional["comet_ml.BaseExperiment"] = None, share_api_key_to_workers: bool = False)

Creates a local experiment for tracking vertex pipeline work and provides an API to track vertex tasks with their own Comet experiments.

comet_ml.integration.vertex.CometVertexPipelineLogger.init

__init__(api_key: Optional[str] = None, workspace: Optional[str] = None,
    project_name: Optional[str] = None,
    packages_to_install: Optional[List[str]] = None,
    base_image: Optional[str] = None,
    custom_experiment: Optional["comet_ml.BaseExperiment"] = None,
    share_api_key_to_workers: bool = False) -> None

Args:

  • api_key: str (optional). Your Comet API Key, if not provided, the value set in the configuration system will be used.

  • project_name: str (optional). The project name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • workspace: str (optional). The workspace name where all pipeline tasks are logged. If not provided, the value set in the configuration system will be used.

  • packages_to_install: List[str] (optional). Which packages to install, given directly to kfp.components.create_component_from_func. Default is ["google-cloud-aiplatform", "comet_ml"].

  • base_image: str (optional). Which docker image to use. If not provided, the default Kubeflow base image will be used.

  • custom_experiment: Experiment (optional). The Comet Experiment with custom configuration which you can provide to be used instead of Experiment which would be implicitly created with default options.

  • share_api_key_to_workers: boolean (optional), if True, Comet API key will be shared with workers by setting COMET_API_KEY environment variable. This is an unsafe solution and we recommend you to use a more secure way to set up your API Key in your cluster.

Example:

@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    logger = comet_ml.integration.vertex.CometVertexPipelineLogger()

    data_preprocessing_op = components.load_component_from_file("data_preprocessing.yaml")

comet_ml.integration.vertex.CometVertexPipelineLogger.track_task

track_task(task: Any, additional_environment: Optional[Dict[str,
    str]] = None) -> Any

Inject all required information to track the given Vertex task with Comet. You still need to create an experiment inside that task.

Args:

  • task: (required) The Vertex task to be tracked with Comet.

  • additional_environment: Dict[str, str] (optional) A dictionary of additional environment variables to be set up in the tracked task.

Example:

def data_preprocessing(input: str) -> str:
    from comet_ml import Experiment

    # The `track_task` method automatically injects the workspace name and project name.
    # If `share_api_key_to_workers` is set to True, the Comet API Key can also be injected
    # by `CometVertexPipelineLogger`.
    # All Vertex information is automatically logged to the Experiment when the task is
    # wrapped with the `track_task` method.
    experiment = Experiment()

    for i in range(60):
        experiment.log_metric("accuracy", math.log(i + random.random())) time.sleep(0.1)
    experiment.end()

    return input

@dsl.pipeline(name='ML training pipeline')
def ml_training_pipeline():
    import comet_ml.integration.vertex

    logger = comet_ml.integration.vertex.CometVertexPipelineLogger()

    data_preprocessing_op = kfp.components.create_component_from_func(
        func=data_preprocessing, packages_to_install=["comet_ml"]
    )

    task_1 = logger.track_task(data_preprocessing_op("test"))
Apr. 25, 2024