Basierend auf dem ersten Beitrag zu Vertex AI Pipelines wird in diesem Beitrag die Erstellung einer kompletten Machine Learning Pipeline mit Vertex AI Pipelines erklärt. Am Ende haben wir eine Pipeline, die:
- Daten lädt (Iris-Dataset)
- Feature Engineering durchführt
- Ein ML-Modell trainiert
- Das Modell in der Vertex AI Model Registry speichert
- Einen Endpoint deployt
Das Projekt wird zur Vorbereitung genau so konfiguriert wie hier beschrieben.
1. Pipeline-Komponente: Daten laden
Wir verwenden das Iris-Dataset direkt aus sklearn und laden es in einer Pipelinekomponente.
Folgende Anmerkungen hierzu: dataset.path wird automatisch von Kubeflow Pipelines (KFP) erzeugt, indem der Parameter mit dataset: Output[Dataset] annotiert wird. KFP erzeugt dann zur Laufzeit einen physischen Pfad (lokal im Container + angebunden an GCS), und stellt ihn dir über dataset.path zur Verfügung.
# c_load_data.py
from kfp import dsl
from kfp.dsl import Output, Dataset
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn==1.5.0", "pandas==2.2.3"]
)
def load_data(dataset: Output[Dataset]):
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
df = iris.frame
df.to_csv(dataset.path, index=False)
print("Saved dataset to:", dataset.path)
print(df.head())2. Pipeline-Komponente: Feature Engineering
Ähnlich in der zweiten Komponente. Hierbei wird das Feature Engineering vorgenommen. Insbesondere wird Trainings- und Testdatensätze erzeugt und jeweils als csv Dateien gespeichert.
# c_feature_engineering.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.3"]
)
def feature_engineering(
dataset: Input[Dataset],
x_train: Output[Dataset],
y_train: Output[Dataset],
x_test: Output[Dataset],
y_test: Output[Dataset],
):
import pandas as pd
df = pd.read_csv(dataset.path)
train = df.sample(frac=0.8, random_state=42)
test = df.drop(train.index)
X_train = train.drop(columns=["target"])
y_train_df = train[["target"]]
X_test = test.drop(columns=["target"])
y_test_df = test[["target"]]
X_train.to_csv(x_train.path, index=False)
y_train_df.to_csv(y_train.path, index=False)
X_test.to_csv(x_test.path, index=False)
y_test_df.to_csv(y_test.path, index=False)
print("Wrote:", x_train.path, y_train.path, x_test.path, y_test.path)
3. Pipeline-Komponente: Modell trainieren
In der dritten Komponente wird das Modell anschließend trainiert und als joblib gespeichert.
# c_train_model.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn==1.5.0", "joblib==1.4.2", "pandas==2.2.3"]
)
def train_model(
x_train: Input[Dataset],
y_train: Input[Dataset],
model: Output[Model],
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
import os
X = pd.read_csv(x_train.path)
y = pd.read_csv(y_train.path)
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X, y.values.ravel())
os.makedirs(model.path, exist_ok=True)
joblib.dump(clf, f"{model.path}/model.joblib")
print("Model written to:", model.path)
4. Pipeline-Komponente: Modell hochladen
In der vierten Komponente wird das Modell anschließend in die Vertex AI Model Registry hochgeladen.
from kfp import dsl
from kfp.dsl import Input, Model
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["google-cloud-aiplatform==1.38.1"]
)
def upload_model(
model: Input[Model], # <-- Artifact Input
project: str,
region: str
) -> str:
from google.cloud import aiplatform
import datetime
aiplatform.init(project=project, location=region)
uploaded = aiplatform.Model.upload(
display_name=f"iris-rf-model-{datetime.datetime.now(datetime.timezone.utc).strftime('%Y%m%d-%H%M%S')}",
artifact_uri=model.uri, # <-- Ordner in GCS / managed storage
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest",
sync=True,
)
return uploaded.resource_name
5. Pipeline-Komponente: Modell deployen
In der letzten Komponente wird das Modell anschließend als Endpoint in Vertex AI deployed. Dieser Schritt kann ca. 20 bis 25 Minuten dauern.
from kfp import dsl
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["google-cloud-aiplatform==1.38.1"]
)
def deploy_model(
model_resource_name: str,
project: str,
region: str
):
from google.cloud import aiplatform
aiplatform.init(project=project, location=region)
model = aiplatform.Model(model_resource_name)
endpoint = aiplatform.Endpoint.create(display_name="iris-endpoint")
model.deploy(endpoint=endpoint, machine_type="n1-standard-2")Pipeline-Definition
from c_load_data import load_data
from c_feature_engineering import feature_engineering
from c_train_model import train_model
from c_deploy_model import deploy_model
from c_upload_model import upload_model
from kfp import dsl
import yaml
with open("config.yaml") as f:
cfg = yaml.safe_load(f)
PROJECT_ID = cfg["project_id"]
@dsl.pipeline(
name="iris-ml-pipeline",
description="End-to-end ML Pipeline mit Iris Daten inkl. Upload + Deploy"
)
def iris_pipeline(
project: str = PROJECT_ID,
region: str = "us-central1",
):
# 1) Daten laden
load_task = load_data()
# 2) Feature Engineering
fe_task = feature_engineering(dataset=load_task.outputs["dataset"])
# 3) Training
train_task = train_model(
x_train=fe_task.outputs["x_train"],
y_train=fe_task.outputs["y_train"],
)
# 4) Upload in Model Registry
upload_task = upload_model(
model=train_task.output,
project=project,
region=region
)
# 5) Deploy zu Endpoint
deploy_task = deploy_model(
model_resource_name=upload_task.output,
project=project,
region=region
)
Pipeline kompilieren
from kfp import compiler
from define_pipeline import iris_pipeline
compiler.Compiler().compile(
pipeline_func=iris_pipeline,
package_path="iris_pipeline.json"
)Pipeline ausführen
from google.cloud import aiplatform
import yaml
with open("config.yaml") as f:
cfg = yaml.safe_load(f)
PROJECT_ID = cfg["project_id"]
REGION = cfg["region"]
BUCKET = cfg["bucket"]
SERVICE_ACCOUNT = cfg["service_account"]
aiplatform.init(
project=PROJECT_ID,
location=REGION,
staging_bucket=BUCKET,
)
job = aiplatform.PipelineJob(
display_name="iris-ml-pipeline-run",
template_path="iris_pipeline.json",
pipeline_root=f"{BUCKET}/pipeline_root",
)
job.submit(
service_account=SERVICE_ACCOUNT
)Beispielabfrage
Für eine Beispielanfrage in der GCP Console kann folgende JSON verwendet werden:
{
"instances": [
[5.1, 3.5, 1.4, 0.2]
]
}