Skip to content

Integrate with Ray

Ray is an open-source project that makes it simple to scale any compute-intensive Python workload — from deep learning to production model serving.

Instrument your runs with Comet to start managing experiments, create dataset versions and track hyperparameters for faster and easier reproducibility and collaboration.

Ray Train

Ray Train scales model training for popular ML frameworks such as Torch, XGBoost, TensorFlow, and more. It seamlessly integrates with other Ray libraries such as Tune and Predictors.

Open In Colab

Comet integrates with Ray Train by allowing you to easily monitor the resource usage of all of your workers, making sure you are fully using your expensive GPUs and that your CPUs are not the bottleneck in your training.

System Metrics tab

Connect Comet to your existing code by adding the following lines of code to your script or notebook:

import comet_ml.integration.ray

from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig, RunConfig


def train_func(config):
    from comet_ml.integration.ray import comet_worker_logger

    with comet_worker_logger(config) as experiment:
        # Your distributed training code here
        ...
        experiment.log_metric("accuracy", accuracy)


config = {"epochs": 42}

callback = comet_ml.integration.ray.CometTrainLoggerCallback(
    config, api_key="<Your API Key>", project_name="<Your Project Name>"
)

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=config,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu),
    run_config=RunConfig(callbacks=[callback]),
)

results = trainer.fit()

Note

There are other ways to configure Comet. See more here.

Log automatically

The Ray Train integration requires to use both the CometTrainLoggerCallback in the driver script and comet_worker_logger in each worker node.

On the driver script

On the driver script, the callback will log all environment details that are normally logged except for the system metrics.

Note

If you are using the Ray Tune Comet Callback CometLoggerCallback you need to replace it with the new Ray Train Comet Callback comet_ml.integration.ray.CometTrainLoggerCallback which is fully compatible except for the following two limitations:

  • It can only be used with Ray Train, not Ray Tune.
  • It won't create Offline experiments, only Online experiments.

You can easily turn the automatic logging on and off for any or all items. See Configure Comet for Ray Train for more details.

On each worker node

On each worker node, the system metrics are gonna be logged automatically and will appear on the UI with the worker rank as a prefix. The Ray Train integration is also integrated with Ray's reporting mechanism so you can continue logging data through it. The comet_worker_logger context manager returns an Experiment object which you can use to log additional data manually.

You can easily turn the automatic logging on and off for any or all items. See Configure Comet for Ray Train for more details.

Note

Don't see what you need to log here? We have your back. You can manually log any kind of data to Comet using the Experiment object. For example, use experiment.log_image to log images, or experiment.log_audio to log audio.

End-to-end example

Following is a basic example of using Comet with Ray Train.

If you can't wait, check out the results of this example Ray Train experiment for a preview of what's to come.

Install dependencies

python -m pip install comet_ml "ray[air]>=2.1.0" tensorflow

Run the example

# This example showcases how to use Tensorflow with Ray Train.
# Original code:
# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import atexit
import argparse
import json
import os

import comet_ml
import comet_ml.integration.ray

import numpy as np
import ray
from ray.air.config import RunConfig, ScalingConfig
from ray.air.integrations.keras import Callback as TrainCheckpointReportCallback
from ray.air.result import Result
from ray.train.tensorflow import TensorflowTrainer

import tensorflow as tf

comet_ml.init(project_name="comet-example-ray-train-keras")


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )
    return train_dataset


def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"],
    )
    return model


def train_func(config: dict):
    from comet_ml.integration.ray import comet_worker_logger
    from ray.air import session

    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    with comet_worker_logger(config) as experiment:
        tf_config = json.loads(os.environ["TF_CONFIG"])
        num_workers = len(tf_config["cluster"]["worker"])

        strategy = tf.distribute.MultiWorkerMirroredStrategy()

        global_batch_size = per_worker_batch_size * num_workers
        multi_worker_dataset = mnist_dataset(global_batch_size)

        with strategy.scope():
            # Model building/compiling need to be within `strategy.scope()`.
            multi_worker_model = build_cnn_model()
            learning_rate = config.get("lr", 0.001)
            multi_worker_model.compile(
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
                metrics=["accuracy"],
            )

        callbacks = []
        if session.get_world_rank() == 0:
            callbacks.append(experiment.get_callback("tf-keras"))

        history = multi_worker_model.fit(
            multi_worker_dataset,
            epochs=epochs,
            steps_per_epoch=steps_per_epoch,
            callbacks=callbacks,
        )
        results = history.history

    return results


def train_tensorflow_mnist(
    num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
) -> Result:
    config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}

    callback = comet_ml.integration.ray.CometTrainLoggerCallback(config)

    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        run_config=RunConfig(callbacks=[callback]),
    )
    results = trainer.fit()
    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--address", required=False, type=str, help="the address to use for Ray"
    )
    parser.add_argument(
        "--num-workers",
        "-n",
        type=int,
        default=2,
        help="Sets number of workers for training.",
    )
    parser.add_argument(
        "--use-gpu", action="store_true", default=False, help="Enables GPU training"
    )
    parser.add_argument(
        "--epochs", type=int, default=10, help="Number of epochs to train for."
    )
    parser.add_argument(
        "--smoke-test",
        action="store_true",
        default=False,
        help="Finish quickly for testing.",
    )

    args, _ = parser.parse_known_args()

    import ray

    if args.smoke_test:
        ray.init(num_cpus=4)
        train_tensorflow_mnist()
    else:
        ray.init(address=args.address)
        train_tensorflow_mnist(
            num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
        )

Try it out!

Don't just take our word for it, try it out for yourself.

Configure Comet for Ray Train

You can control which items are logged automatically.

On the driver script, you can pass additional Experiment arguments to the callback, for example here is how you can use a different project and disable logging the command line arguments as hyper-parameters:

callback = comet_ml.integration.ray.CometTrainLoggerCallback(
    config, project_name="scoring-model", parse_args=False
)

On each worker node, you can also pass Experiment parameters to the context manager, for example here is how you can change the reporting frequency for metrics:

with comet_worker_logger(config, auto_metric_step_rate=1) as experiment:
    # Your code here

For more information about configuring Comet, see Configure Comet.

Ray Tune

Ray Tune is a Python library for experiment execution and hyperparameter tuning at any scale.

import comet_ml

from ray import tune
from ray.air.integrations.comet import CometLoggerCallback

# Your code here...
tune.run(
    ...
    callbacks=[
        CometLoggerCallback()
    ])

Open In Colab

Log automatically

Following integration, Comet automatically logs the following items from Ray:

  • Hyperparameters
  • Metrics

When using the CometLoggerCallback with tune.run , Comet automatically logs parameters from the Ray config and metrics reported to Ray through tune.report()

When using rllib, the callback also logs episode-level metrics to Comet as curves.

End-to-end example

Install dependencies

python -m pip install -U "ray[tune]" comet_ml

Run the example

import argparse

import comet_ml
import numpy as np
from ray import tune
from ray.air.integrations.comet import CometLoggerCallback


def train_function(config, checkpoint_dir=None):
    for i in range(30):
        loss = config["mean"] + config["sd"] * np.random.randn()
        tune.report(loss=loss)


def tune_function():
    analysis = tune.run(
        train_function,
        name="comet-ray-demo",
        metric="loss",
        mode="min",
        callbacks=[CometLoggerCallback(tags=["my-trial"])],
        config={"mean": tune.grid_search([1, 2, 3]), "sd": tune.uniform(0.2, 0.8)},
    )
    return analysis.best_config


best_config = tune_function()

Try it out!

Here's an example Colab Notebook for using Comet with Ray.

Open In Colab

Configure Comet for Ray Tune

You can configure the CometLoggerCallback by passing the same configuration arguments as you would to the Experiment object. Learn more about CometLoggerCallback in the Ray documentation.

Apr. 18, 2024