Skip to content

Integrate with PySparkΒΆ

Comet integrates with Apache PySpark.

PySpark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance.

When integrated with Spark, Comet tracks machine learning training runs.

End-to-end exampleΒΆ

from comet_ml import Experiment

from import Vectors
from import LogisticRegression
from import BinaryClassificationEvaluator

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

def run_logistic_regression(training_data, test_data):
    experiment = Experiment(project_name='pyspark-example')

    # models
    lr = LogisticRegression(

    model =
    training_summary = model.summary

    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

    metrics = {
        'train_auc_score': training_summary.areaUnderROC,
        'train_accuracy': training_summary.accuracy,
        'test_auc_roc_score': evaluator.evaluate(predictions),
        'test_auc_pr_score': evaluator.evaluate(
            predictions, {evaluator.metricName: "areaUnderPR"})

    experiment.log_parameters(lr._input_kwargs) #logging hyperparams to Comet
    experiment.log_metrics(metrics) #logging metric to Comet

def main():
    df ='com.databricks.spark.csv').options(
        header='true', inferschema='true').load('./data/breast_cancer.csv')

    # Spliting in train and test set. Beware : It sorts the dataset
    (train_df, test_df) = df.randomSplit([0.7, 0.3])
    training_data = x: (
        Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
    test_data = x: (
        Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])

    run_logistic_regression(training_data, test_data)

if __name__ == '__main__':
Nov. 29, 2022