April 14, 2026

Machine Learning Pipeline mit Vertex AI Pipelines

Basierend auf dem ersten Beitrag zu Vertex AI Pipelines wird in diesem Beitrag die Erstellung einer kompletten Machine Learning Pipeline mit Vertex AI Pipelines erklärt. Am Ende haben wir eine Pipeline, die:

  • Daten lädt (Iris-Dataset)
  • Feature Engineering durchführt
  • Ein ML-Modell trainiert
  • Das Modell in der Vertex AI Model Registry speichert
  • Einen Endpoint deployt

Das Projekt wird zur Vorbereitung genau so konfiguriert wie hier beschrieben.

1. Pipeline-Komponente: Daten laden

Wir verwenden das Iris-Dataset direkt aus sklearn und laden es in einer Pipelinekomponente.

Folgende Anmerkungen hierzu: dataset.path wird automatisch von Kubeflow Pipelines (KFP) erzeugt, indem der Parameter mit dataset: Output[Dataset] annotiert wird. KFP erzeugt dann zur Laufzeit einen physischen Pfad (lokal im Container + angebunden an GCS), und stellt ihn dir über dataset.path zur Verfügung.

# c_load_data.py
from kfp import dsl
from kfp.dsl import Output, Dataset

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn==1.5.0", "pandas==2.2.3"]
)
def load_data(dataset: Output[Dataset]):
    import pandas as pd
    from sklearn.datasets import load_iris

    iris = load_iris(as_frame=True)
    df = iris.frame  

    df.to_csv(dataset.path, index=False)

    print("Saved dataset to:", dataset.path)
    print(df.head())

2. Pipeline-Komponente: Feature Engineering

Ähnlich in der zweiten Komponente. Hierbei wird das Feature Engineering vorgenommen. Insbesondere wird Trainings- und Testdatensätze erzeugt und jeweils als csv Dateien gespeichert.

# c_feature_engineering.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.3"]
)
def feature_engineering(
    dataset: Input[Dataset],
    x_train: Output[Dataset],
    y_train: Output[Dataset],
    x_test: Output[Dataset],
    y_test: Output[Dataset],
):
    import pandas as pd

    df = pd.read_csv(dataset.path)

    train = df.sample(frac=0.8, random_state=42)
    test = df.drop(train.index)

    X_train = train.drop(columns=["target"])
    y_train_df = train[["target"]]     
    X_test = test.drop(columns=["target"])
    y_test_df = test[["target"]]

    X_train.to_csv(x_train.path, index=False)
    y_train_df.to_csv(y_train.path, index=False)
    X_test.to_csv(x_test.path, index=False)
    y_test_df.to_csv(y_test.path, index=False)

    print("Wrote:", x_train.path, y_train.path, x_test.path, y_test.path)

3. Pipeline-Komponente: Modell trainieren

In der dritten Komponente wird das Modell anschließend trainiert und als joblib gespeichert.

# c_train_model.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn==1.5.0", "joblib==1.4.2", "pandas==2.2.3"]
)
def train_model(
    x_train: Input[Dataset],
    y_train: Input[Dataset],
    model: Output[Model],
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    import os

    X = pd.read_csv(x_train.path)
    y = pd.read_csv(y_train.path)

    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X, y.values.ravel())

    os.makedirs(model.path, exist_ok=True)
    joblib.dump(clf, f"{model.path}/model.joblib")

    print("Model written to:", model.path)

4. Pipeline-Komponente: Modell hochladen

In der vierten Komponente wird das Modell anschließend in die Vertex AI Model Registry hochgeladen.

from kfp import dsl
from kfp.dsl import Input, Model


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["google-cloud-aiplatform==1.38.1"]
)
def upload_model(
    model: Input[Model],     # <-- Artifact Input
    project: str,
    region: str
) -> str:
    from google.cloud import aiplatform
    import datetime
    aiplatform.init(project=project, location=region)

    uploaded = aiplatform.Model.upload(
        display_name=f"iris-rf-model-{datetime.datetime.now(datetime.timezone.utc).strftime('%Y%m%d-%H%M%S')}",
        artifact_uri=model.uri,  # <-- Ordner in GCS / managed storage
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest",
        sync=True,
    )
    return uploaded.resource_name

5. Pipeline-Komponente: Modell deployen

In der letzten Komponente wird das Modell anschließend als Endpoint in Vertex AI deployed. Dieser Schritt kann ca. 20 bis 25 Minuten dauern.

from kfp import dsl

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["google-cloud-aiplatform==1.38.1"]
)
def deploy_model(
    model_resource_name: str,
    project: str,
    region: str
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    model = aiplatform.Model(model_resource_name)
    endpoint = aiplatform.Endpoint.create(display_name="iris-endpoint")

    model.deploy(endpoint=endpoint, machine_type="n1-standard-2")

Pipeline-Definition

from c_load_data import load_data
from c_feature_engineering import feature_engineering
from c_train_model import train_model
from c_deploy_model import deploy_model
from c_upload_model import upload_model
from kfp import dsl
import yaml

with open("config.yaml") as f:
    cfg = yaml.safe_load(f)

PROJECT_ID = cfg["project_id"]

@dsl.pipeline(
    name="iris-ml-pipeline",
    description="End-to-end ML Pipeline mit Iris Daten inkl. Upload + Deploy"
)
def iris_pipeline(
    project: str = PROJECT_ID,
    region: str = "us-central1",
):
    # 1) Daten laden
    load_task = load_data()

    # 2) Feature Engineering
    fe_task = feature_engineering(dataset=load_task.outputs["dataset"])

    # 3) Training
    train_task = train_model(
        x_train=fe_task.outputs["x_train"],
        y_train=fe_task.outputs["y_train"],
    )

    # 4) Upload in Model Registry
    upload_task = upload_model(
        model=train_task.output,
        project=project,
        region=region
    )

    # 5) Deploy zu Endpoint
    deploy_task = deploy_model(
        model_resource_name=upload_task.output,
        project=project,
        region=region
    )

Pipeline kompilieren

from kfp import compiler
from define_pipeline import iris_pipeline

compiler.Compiler().compile(
    pipeline_func=iris_pipeline,
    package_path="iris_pipeline.json"
)

Pipeline ausführen

from google.cloud import aiplatform
import yaml

with open("config.yaml") as f:
    cfg = yaml.safe_load(f)

PROJECT_ID = cfg["project_id"]
REGION = cfg["region"]
BUCKET = cfg["bucket"]
SERVICE_ACCOUNT = cfg["service_account"]

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET,
)

job = aiplatform.PipelineJob(
    display_name="iris-ml-pipeline-run",
    template_path="iris_pipeline.json",
    pipeline_root=f"{BUCKET}/pipeline_root",
)

job.submit(
    service_account=SERVICE_ACCOUNT
)

Beispielabfrage

Für eine Beispielanfrage in der GCP Console kann folgende JSON verwendet werden:

{
  "instances": [
    [5.1, 3.5, 1.4, 0.2]
  ]
}

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert