What Kubeflow Pipelines gives you¶
Kubeflow Pipelines (KFP) turns a Python function into a containerized, reproducible, schedulable workflow. Each component is a Docker container. A pipeline chains components into a directed acyclic graph (DAG). The entire run is recorded, versioned, and visible in the KFP UI.
On OpenShift AI (formerly RHOAI), KFP is available as a managed component — no separate installation required.
Install the KFP SDK¶
pip install kfp==2.7.0
Define a component¶
Components are Python functions decorated with @dsl.component:
from kfp import dsl
from kfp.dsl import Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn", "pandas"],
)
def preprocess_data(
raw_data_path: str,
processed_data: Output[Dataset],
) -> None:
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_csv(raw_data_path)
scaler = StandardScaler()
df[["f1", "f2"]] = scaler.fit_transform(df[["f1", "f2"]])
df.to_csv(processed_data.path, index=False)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn", "pandas", "mlflow"],
)
def train_model(
processed_data: Dataset,
model_output: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
) -> None:
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
df = pd.read_csv(processed_data.path)
X = df.drop("label", axis=1)
y = df["label"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
clf = RandomForestClassifier(n_estimators=n_estimators)
clf.fit(X_train, y_train)
accuracy = clf.score(X_test, y_test)
metrics.log_metric("accuracy", accuracy)
with open(model_output.path, "wb") as f:
pickle.dump(clf, f)
Compose the pipeline¶
@dsl.pipeline(
name="fraud-detection-training",
description="Preprocess data and train a fraud detection model",
)
def fraud_pipeline(
raw_data_path: str = "s3://my-bucket/data/raw.csv",
n_estimators: int = 100,
):
preprocess_task = preprocess_data(raw_data_path=raw_data_path)
train_task = train_model(
processed_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
)
train_task.after(preprocess_task)
Compile and upload¶
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=fraud_pipeline,
package_path="fraud_pipeline.yaml",
)
Upload to the KFP UI or use the client:
import kfp
client = kfp.Client(host="https://ds-pipeline-dspa.apps.cluster.example.com")
pipeline = client.upload_pipeline(
pipeline_package_path="fraud_pipeline.yaml",
pipeline_name="fraud-detection-training",
)
# Trigger a run
run = client.create_run_from_pipeline_package(
pipeline_file="fraud_pipeline.yaml",
arguments={"n_estimators": 200},
run_name="training-run-v1",
)
Schedule recurring runs¶
from datetime import datetime
client.create_recurring_run(
pipeline_id=pipeline.pipeline_id,
experiment_name="fraud-detection",
job_name="weekly-retraining",
cron_expression="0 2 * * 1", # every Monday at 2am
max_concurrency=1,
)
Monitor runs¶
# List recent runs via CLI
oc get pipelineruns -n my-project
# Stream logs from a component pod
oc logs -l pipeline/runid=<run-id> -n my-project -f
Key takeaways¶
- Each
@dsl.componentbecomes its own container — pinbase_imagefor reproducibility. - Use KFP artifact types (
Dataset,Model,Metrics) instead of raw paths for lineage tracking. - Compile to YAML, version it in git, and upload with the Python client or the UI.
- Scheduled recurring runs handle retraining cadence without external cron jobs.