Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Google Cloud Run OperatorsΒΆ

Cloud Run is used to build and deploy scalable containerized apps written in any language (including Go, Python, Java, Node.js, .NET, and Ruby) on a fully managed platform.

For more information about the service visit Google Cloud Run documentation.

Prerequisite TasksΒΆ

To use these operators, you must do a few things:

Create a jobΒΆ

Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit Google Cloud Run Job description

A simple job configuration can be created with a Job object:

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_instance() -> Job:
    """
    Create a Cloud Run job configuration with google.cloud.run_v2.Job object.

    As a minimum the configuration must contain a container image name in its template.
    The rest of the configuration parameters are optional and will be populated with default values if not set.
    """
    job = Job()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
    container.resources.limits = {"cpu": "2", "memory": "1Gi"}
    job.template.template.containers.append(container)
    return job


or with a Python dictionary:

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_dict() -> dict:
    """
    Create a Cloud Run job configuration with a Python dict.

    As a minimum the configuration must contain a container image name in its template.
    """
    return {
        "template": {
            "template": {
                "containers": [
                    {
                        "image": "us-docker.pkg.dev/cloudrun/container/job:latest",
                        "resources": {
                            "limits": {"cpu": "1", "memory": "512Mi"},
                            "cpu_idle": False,
                            "startup_cpu_boost": False,
                        },
                        "name": "",
                        "command": [],
                        "args": [],
                        "env": [],
                        "ports": [],
                        "volume_mounts": [],
                        "working_dir": "",
                        "depends_on": [],
                    }
                ],
                "volumes": [],
                "execution_environment": 0,
                "encryption_key": "",
            },
            "labels": {},
            "annotations": {},
            "parallelism": 0,
            "task_count": 0,
        },
        "name": "",
        "uid": "",
        "generation": "0",
        "labels": {},
        "annotations": {},
        "creator": "",
        "last_modifier": "",
        "client": "",
        "client_version": "",
        "launch_stage": 0,
        "observed_generation": "0",
        "conditions": [],
        "execution_count": 0,
        "reconciling": False,
        "satisfies_pzs": False,
        "etag": "",
    }


You can create a Cloud Run Job with any of these configurations : CloudRunCreateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

create1 = CloudRunCreateJobOperator(
    task_id=create1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance(),
    dag=dag,
)

Note that this operator only creates the job without executing it. The Job’s dictionary representation is pushed to XCom.

Create a serviceΒΆ

Before you create a service in Cloud Run, you need to define it. For more information about the Service object fields, visit Google Cloud Run Service description

A simple service configuration can look as follows:

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

def _create_service():
    service = Service()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/placeholder:latest"
    service.template.containers.append(container)
    return service


With this configuration we can create the service: CloudRunCreateServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

create_cloud_run_service = CloudRunCreateServiceOperator(
    task_id="create-cloud-run-service",
    project_id=PROJECT_ID,
    region="us-central1",
    service=_create_service(),
    service_name="cloudrun-system-test-service",
)

Note that this operator only creates the service without executing it. The Service’s dictionary representation is pushed to XCom.

Delete a serviceΒΆ

With this configuration we can delete the service: CloudRunDeleteServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

delete_cloud_run_service = CloudRunDeleteServiceOperator(
    task_id="delete-cloud-run-service",
    project_id=PROJECT_ID,
    region="us-central1",
    service_name="cloudrun-system-test-service",
    dag=dag,
)

Note this operator waits for the service to be deleted, and the deleted Service’s dictionary representation is pushed to XCom.

Execute a jobΒΆ

To execute a job, you can use:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

execute1 = CloudRunExecuteJobOperator(
    task_id=execute1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    deferrable=False,
)

or you can define the same operator in the deferrable mode:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

execute2 = CloudRunExecuteJobOperator(
    task_id=execute2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    dag=dag,
    deferrable=True,
)

You can also specify overrides that allow you to give a new entrypoint command to the job and more:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

overrides = {
    "container_overrides": [
        {
            "name": "job",
            "args": ["python", "main.py"],
            "env": [{"name": "ENV_VAR", "value": "value"}],
            "clear_args": False,
        }
    ],
    "task_count": 1,
    "timeout": "60s",
}

execute3 = CloudRunExecuteJobOperator(
    task_id=execute3_task_name,
    project_id=PROJECT_ID,
    region=region,
    overrides=overrides,
    job_name=job3_name,
    dag=dag,
    deferrable=False,
)

Update a jobΒΆ

To update a job, you can use:

CloudRunUpdateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

update_job1 = CloudRunUpdateJobOperator(
    task_id=update_job1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance_with_label(),
    dag=dag,
)

The Job’s dictionary representation is pushed to XCom.

List jobsΒΆ

To list the jobs, you can use:

CloudRunListJobsOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

list_jobs = CloudRunListJobsOperator(
    task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)

The operator takes two optional parameters: β€œlimit” to limit the number of tasks returned, and β€œshow_deleted” to include deleted jobs in the result.

Delete a jobΒΆ

To delete a job you can use:

CloudRunDeleteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

delete_job1 = CloudRunDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

Note this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.

Was this entry helpful?