Skip to content

Commit eaf3471

Browse files
Fix Cloud Worflows system test (#33386)
* Fix Cloud Worflows system test --------- Co-authored-by: Maksim Moiseenkov <maksim_moiseenkov@epam.com>
1 parent 5ca8987 commit eaf3471

File tree

4 files changed

+24
-29
lines changed

4 files changed

+24
-29
lines changed

β€Žairflow/providers/google/cloud/links/workflows.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from airflow.models import BaseOperator
2727
from airflow.utils.context import Context
2828

29-
WORKFLOWS_BASE_LINK = "workflows"
29+
WORKFLOWS_BASE_LINK = "/workflows"
3030
WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}"
3131
WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}"
3232
EXECUTION_LINK = (

β€Žairflow/providers/google/cloud/operators/workflows.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class WorkflowsCreateWorkflowOperator(GoogleCloudBaseOperator):
5252
Creates a new workflow.
5353
5454
If a workflow with the specified name already exists in the specified
55-
project and location, the long running operation will return
55+
project and location, the long-running operation will return
5656
[ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error.
5757
5858
.. seealso::
@@ -606,7 +606,8 @@ class WorkflowsListExecutionsOperator(GoogleCloudBaseOperator):
606606
607607
:param workflow_id: Required. The ID of the workflow to be created.
608608
:param start_date_filter: If passed only executions older that this date will be returned.
609-
By default operators return executions from last 60 minutes
609+
By default, operators return executions from last 60 minutes.
610+
Note that datetime object must specify a time zone, e.g. ``datetime.timezone.utc``.
610611
:param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
611612
:param location: Required. The GCP region in which to handle the request.
612613
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
@@ -670,7 +671,7 @@ def execute(self, context: Context):
670671
return [
671672
Execution.to_dict(e)
672673
for e in execution_iter
673-
if e.start_time.ToDatetime(tzinfo=datetime.timezone.utc) > self.start_date_filter
674+
if e.start_time > self.start_date_filter # type: ignore
674675
]
675676

676677

β€Žtests/providers/google/cloud/operators/test_workflows.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,12 +334,9 @@ class TestWorkflowExecutionsListExecutionsOperator:
334334
@mock.patch(BASE_PATH.format("Execution"))
335335
@mock.patch(BASE_PATH.format("WorkflowsHook"))
336336
def test_execute(self, mock_hook, mock_object):
337-
timestamp = Timestamp()
338-
timestamp.FromDatetime(
339-
datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
340-
)
337+
start_date_filter = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=5)
341338
execution_mock = mock.MagicMock()
342-
execution_mock.start_time = timestamp
339+
execution_mock.start_time = start_date_filter
343340
mock_hook.return_value.list_executions.return_value = [execution_mock]
344341

345342
op = WorkflowsListExecutionsOperator(

β€Žtests/system/providers/google/cloud/workflows/example_workflows.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,29 @@
3838
from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
3939
from airflow.utils.trigger_rule import TriggerRule
4040

41-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
42-
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
41+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
42+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
4343

44-
DAG_ID = "cloud_workflows"
44+
DAG_ID = "example_cloud_workflows"
4545

4646
LOCATION = "us-central1"
47-
WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}"
47+
WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}".replace("_", "-")
4848

4949
# [START how_to_define_workflow]
5050
WORKFLOW_CONTENT = """
51-
- getCurrentTime:
52-
call: http.get
53-
args:
54-
url: https://us-central1-workflowsample.cloudfunctions.net/datetime
55-
result: currentTime
51+
- getLanguage:
52+
assign:
53+
- inputLanguage: "English"
5654
- readWikipedia:
5755
call: http.get
5856
args:
59-
url: https://en.wikipedia.org/w/api.php
57+
url: https://www.wikipedia.org/
6058
query:
6159
action: opensearch
62-
search: ${currentTime.body.dayOfTheWeek}
60+
search: ${inputLanguage}
6361
result: wikiResult
6462
- returnResult:
65-
return: ${wikiResult.body[1]}
63+
return: ${wikiResult}
6664
"""
6765

6866
WORKFLOW = {
@@ -74,7 +72,7 @@
7472

7573
EXECUTION = {"argument": ""}
7674

77-
SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}"
75+
SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}".replace("_", "-")
7876
SLEEP_WORKFLOW_CONTENT = """
7977
- someSleep:
8078
call: sys.sleep
@@ -94,6 +92,7 @@
9492
schedule="@once",
9593
start_date=datetime(2021, 1, 1),
9694
catchup=False,
95+
tags=["example", "workflows"],
9796
) as dag:
9897
# [START how_to_create_workflow]
9998
create_workflow = WorkflowsCreateWorkflowOperator(
@@ -131,10 +130,13 @@
131130

132131
# [START how_to_delete_workflow]
133132
delete_workflow = WorkflowsDeleteWorkflowOperator(
134-
task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
133+
task_id="delete_workflow",
134+
location=LOCATION,
135+
project_id=PROJECT_ID,
136+
workflow_id=WORKFLOW_ID,
137+
trigger_rule=TriggerRule.ALL_DONE,
135138
)
136139
# [END how_to_delete_workflow]
137-
delete_workflow.trigger_rule = TriggerRule.ALL_DONE
138140

139141
# [START how_to_create_execution]
140142
create_execution = WorkflowsCreateExecutionOperator(
@@ -223,11 +225,6 @@
223225

224226
[cancel_execution, list_executions] >> delete_workflow
225227

226-
# Task dependencies created via `XComArgs`:
227-
# create_execution >> wait_for_execution
228-
# create_execution >> get_execution
229-
# create_execution >> cancel_execution
230-
231228
# ### Everything below this line is not part of example ###
232229
# ### Just for system tests purpose ###
233230
from tests.system.utils.watcher import watcher

0 commit comments

Comments
 (0)