๐Ÿ”„
ยท Deep Dive

Managed โ€” declarative ML workflows that scale out from self-hosted patterns to fully-managed orchestration with experiment tracking, integration, and event-triggered execution.

TL;DR

What it is: Managed runtime for . A pipeline is a of containerized components โ€” typically Python functions decorated with @component. Inputs and outputs flow as typed Artifacts between steps.

Native integrations: Experiments tracks parameters / metrics / artifacts. versions output models. Cloud Scheduler triggers pipeline runs on cron. Pub/Sub triggers event-driven runs. All composes without writing glue code.

Direct alternatives: Cloud Composer (managed Airflow), GitHub Actions (CI/CD-native), custom Dagster deployments, Prefect Cloud. wins for ML-specific workflows where Experiments + Registry + Endpoints integration matters.

The Component Model

@dsl.component(packages_to_install=["scikit-learn", "pandas"])
def train_model(
    train_data: Input[Dataset],
    test_data: Input[Dataset],
    n_estimators: int,
    model_output: Output[Model],
    metrics_output: Output[Metrics],
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    df_train = pd.read_csv(train_data.path)
    df_test = pd.read_csv(test_data.path)
    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(df_train.drop("y", axis=1), df_train["y"])
    accuracy = model.score(df_test.drop("y", axis=1), df_test["y"])
    metrics_output.log_metric("accuracy", accuracy)
    # ... save model to model_output.path

@dsl.pipeline(name="nightly-fraud-retrain")
def fraud_pipeline(n_estimators: int = 100):
    data = load_data()
    train, test = split_data(data=data.outputs["data"])
    drift_check = drift_detection(new_data=train.outputs["train"])
    with dsl.If(drift_check.outputs["drift_detected"] == True):
        trained = train_model(
            train_data=train.outputs["train"],
            test_data=test.outputs["test"],
            n_estimators=n_estimators,
        )
        register = register_model(model=trained.outputs["model_output"])
        deploy = canary_deploy(model_version=register.outputs["version"])

Each @dsl.component compiles to a Docker container; the @dsl.pipeline compiles to a Kubeflow YAML; both run on 's managed compute when submitted. Conditionals, loops, and parallel branches are all first-class.

Production Use Cases

Use Case 1 โ€” Nightly model retraining with drift gate

Cloud Scheduler triggers the pipeline at 2am. Steps: extract last-24h data โ†’ drift detection vs baseline โ†’ if drift detected, train new model on v5e โ†’ evaluate against held-out test โ†’ register in โ†’ 5% canary deploy to Endpoints โ†’ monitor for 24h via Cloud Monitoring โ†’ if metrics hold, promote to 100%. Whole loop is declarative โ€” no Airflow to maintain.

Use Case 2 โ€” Batch ingestion at scale

50M documents need nightly re- for the enterprise index. Pipeline steps: list_new_docs (Cloud Storage diff) โ†’ parallel (sharded across 20 workers) โ†’ parallel via API โ†’ upsert to . Parallelism is declared in the โ€” Pipelines handles worker allocation. Cost-effective because compute scales to zero between runs.

Use Case 3 โ€” workflow that spans hours

The AI Factory's 30-hour autonomous run โ€” generating 1,600+ AI-authored solutions across 3 languages โ€” maps to a Pipeline. Each generation is a component; parallel branches per language; as a downstream step; self-correction loop via dsl.While; final artifact registry. The same code that runs on local for dev scales to Pipelines for production.

Use Case 4 โ€” Event-driven incident response pipeline

Cloud Pub/Sub event from PagerDuty triggers the pipeline. Steps: parse incident โ†’ retrieve similar past incidents from Vector Search โ†’ fetch current system metrics from BigQuery โ†’ ask for diagnostic hypothesis โ†’ if confidence high, auto-execute remediation runbook โ†’ else page on-call with the analysis. Audit-compliant โ€” every step logged to Cloud Logging.

Migrating from / Airflow

From : Each thread-pool task becomes a @dsl.component. Replace shared in-memory state with typed Artifacts passed between components. The hard part isn't the code โ€” it's defining the right artifact contracts so components can run independently. Plan a 2-3 week port for a complex flow.

From Airflow: @task functions become @dsl.component. PythonOperator becomes the component decorator. XCom is replaced by typed Artifacts. Sensors map to Cloud Pub/Sub triggers. The conceptual model is similar; the syntax is cleaner because of the typed Artifact contracts.

When NOT to migrate: if your workflow is non-ML and you already have a healthy Airflow / Dagster / Prefect deployment, the migration cost rarely justifies the move. shines specifically for ML/ workloads where Experiments + Registry + Endpoints integration matters.

Glossary

Related Reading