Skip to content

Commit b733667

Browse files
authored
openlineage, gcs: add openlineage methods for GcsToGcsOperator (#31350)
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent 5aa62de commit b733667

File tree

5 files changed

+120
-13
lines changed

5 files changed

+120
-13
lines changed

β€Žairflow/providers/google/cloud/transfers/gcs_to_gcs.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ def __init__(
233233
self.source_object_required = source_object_required
234234
self.exact_match = exact_match
235235
self.match_glob = match_glob
236+
self.resolved_source_objects: set[str] = set()
237+
self.resolved_target_objects: set[str] = set()
236238

237239
def execute(self, context: Context):
238240

@@ -540,7 +542,34 @@ def _copy_single_object(self, hook, source_object, destination_object):
540542
destination_object,
541543
)
542544

545+
self.resolved_source_objects.add(source_object)
546+
if not destination_object:
547+
self.resolved_target_objects.add(source_object)
548+
else:
549+
self.resolved_target_objects.add(destination_object)
550+
543551
hook.rewrite(self.source_bucket, source_object, self.destination_bucket, destination_object)
544552

545553
if self.move_object:
546554
hook.delete(self.source_bucket, source_object)
555+
556+
def get_openlineage_events_on_complete(self, task_instance):
557+
"""
558+
Implementing _on_complete because execute method does preprocessing on internals.
559+
This means we won't have to normalize self.source_object and self.source_objects,
560+
destination bucket and so on.
561+
"""
562+
from openlineage.client.run import Dataset
563+
564+
from airflow.providers.openlineage.extractors import OperatorLineage
565+
566+
return OperatorLineage(
567+
inputs=[
568+
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
569+
for source in sorted(self.resolved_source_objects)
570+
],
571+
outputs=[
572+
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
573+
for target in sorted(self.resolved_target_objects)
574+
],
575+
)

β€Žairflow/providers/openlineage/extractors/base.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def get_operator_classnames(cls) -> list[str]:
8383
return []
8484

8585
def extract(self) -> OperatorLineage | None:
86+
# OpenLineage methods are optional - if there's no method, return None
8687
try:
8788
return self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) # type: ignore
8889
except AttributeError:
@@ -100,19 +101,20 @@ def extract_on_complete(self, task_instance) -> OperatorLineage | None:
100101

101102
def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage | None:
102103
try:
103-
facets = get_facets_method(*args)
104+
facets: OperatorLineage = get_facets_method(*args)
105+
# "rewrite" OperatorLineage to safeguard against different version of the same class
106+
# that was existing in openlineage-airflow package outside of Airflow repo
107+
return OperatorLineage(
108+
inputs=facets.inputs,
109+
outputs=facets.outputs,
110+
run_facets=facets.run_facets,
111+
job_facets=facets.job_facets,
112+
)
104113
except ImportError:
105114
self.log.exception(
106115
"OpenLineage provider method failed to import OpenLineage integration. "
107116
"This should not happen."
108117
)
109118
except Exception:
110119
self.log.exception("OpenLineage provider method failed to extract data from provider. ")
111-
else:
112-
return OperatorLineage(
113-
inputs=facets.inputs,
114-
outputs=facets.outputs,
115-
run_facets=facets.run_facets,
116-
job_facets=facets.job_facets,
117-
)
118120
return None

β€Ždev/breeze/tests/test_selective_checks.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ def test_expected_output_full_tests_needed(
539539
{
540540
"affected-providers-list-as-string": "amazon apache.beam apache.cassandra cncf.kubernetes "
541541
"common.sql facebook google hashicorp microsoft.azure microsoft.mssql "
542-
"mysql oracle postgres presto salesforce sftp ssh trino",
542+
"mysql openlineage oracle postgres presto salesforce sftp ssh trino",
543543
"all-python-versions": "['3.8']",
544544
"all-python-versions-list-as-string": "3.8",
545545
"needs-helm-tests": "false",
@@ -564,8 +564,8 @@ def test_expected_output_full_tests_needed(
564564
{
565565
"affected-providers-list-as-string": "amazon apache.beam apache.cassandra "
566566
"cncf.kubernetes common.sql facebook google "
567-
"hashicorp microsoft.azure microsoft.mssql mysql oracle postgres presto "
568-
"salesforce sftp ssh trino",
567+
"hashicorp microsoft.azure microsoft.mssql mysql openlineage oracle postgres "
568+
"presto salesforce sftp ssh trino",
569569
"all-python-versions": "['3.8']",
570570
"all-python-versions-list-as-string": "3.8",
571571
"image-build": "true",
@@ -666,7 +666,7 @@ def test_expected_output_pull_request_v2_3(
666666
"affected-providers-list-as-string": "amazon apache.beam apache.cassandra "
667667
"cncf.kubernetes common.sql "
668668
"facebook google hashicorp microsoft.azure microsoft.mssql mysql "
669-
"oracle postgres presto salesforce sftp ssh trino",
669+
"openlineage oracle postgres presto salesforce sftp ssh trino",
670670
"all-python-versions": "['3.8']",
671671
"all-python-versions-list-as-string": "3.8",
672672
"image-build": "true",
@@ -685,6 +685,7 @@ def test_expected_output_pull_request_v2_3(
685685
"--package-filter apache-airflow-providers-microsoft-azure "
686686
"--package-filter apache-airflow-providers-microsoft-mssql "
687687
"--package-filter apache-airflow-providers-mysql "
688+
"--package-filter apache-airflow-providers-openlineage "
688689
"--package-filter apache-airflow-providers-oracle "
689690
"--package-filter apache-airflow-providers-postgres "
690691
"--package-filter apache-airflow-providers-presto "
@@ -697,7 +698,7 @@ def test_expected_output_pull_request_v2_3(
697698
"skip-provider-tests": "false",
698699
"parallel-test-types-list-as-string": "Providers[amazon] Always CLI "
699700
"Providers[apache.beam,apache.cassandra,cncf.kubernetes,common.sql,facebook,"
700-
"hashicorp,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,"
701+
"hashicorp,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,postgres,presto,"
701702
"salesforce,sftp,ssh,trino] Providers[google]",
702703
},
703704
id="CLI tests and Google-related provider tests should run if cli/chart files changed",
@@ -965,6 +966,7 @@ def test_upgrade_to_newer_dependencies(files: tuple[str, ...], expected_outputs:
965966
"--package-filter apache-airflow-providers-microsoft-azure "
966967
"--package-filter apache-airflow-providers-microsoft-mssql "
967968
"--package-filter apache-airflow-providers-mysql "
969+
"--package-filter apache-airflow-providers-openlineage "
968970
"--package-filter apache-airflow-providers-oracle "
969971
"--package-filter apache-airflow-providers-postgres "
970972
"--package-filter apache-airflow-providers-presto "

β€Žgenerated/provider_dependencies.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@
467467
"microsoft.azure",
468468
"microsoft.mssql",
469469
"mysql",
470+
"openlineage",
470471
"oracle",
471472
"postgres",
472473
"presto",

β€Žtests/providers/google/cloud/transfers/test_gcs_to_gcs.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from unittest import mock
2222

2323
import pytest
24+
from openlineage.client.run import Dataset
2425

2526
from airflow.exceptions import AirflowException
2627
from airflow.providers.google.cloud.transfers.gcs_to_gcs import WILDCARD, GCSToGCSOperator
@@ -827,3 +828,75 @@ def test_copy_files_into_a_folder(
827828
for src, dst in zip(expected_source_objects, expected_destination_objects)
828829
]
829830
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
831+
832+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
833+
def test_execute_simple_reports_openlineage(self, mock_hook):
834+
operator = GCSToGCSOperator(
835+
task_id=TASK_ID,
836+
source_bucket=TEST_BUCKET,
837+
source_object=SOURCE_OBJECTS_SINGLE_FILE[0],
838+
destination_bucket=DESTINATION_BUCKET,
839+
)
840+
841+
operator.execute(None)
842+
843+
lineage = operator.get_openlineage_events_on_complete(None)
844+
assert len(lineage.inputs) == 1
845+
assert len(lineage.outputs) == 1
846+
assert lineage.inputs[0] == Dataset(
847+
namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
848+
)
849+
assert lineage.outputs[0] == Dataset(
850+
namespace=f"gs://{DESTINATION_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
851+
)
852+
853+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
854+
def test_execute_multiple_reports_openlineage(self, mock_hook):
855+
operator = GCSToGCSOperator(
856+
task_id=TASK_ID,
857+
source_bucket=TEST_BUCKET,
858+
source_objects=SOURCE_OBJECTS_LIST,
859+
destination_bucket=DESTINATION_BUCKET,
860+
destination_object=DESTINATION_OBJECT,
861+
)
862+
863+
operator.execute(None)
864+
865+
lineage = operator.get_openlineage_events_on_complete(None)
866+
assert len(lineage.inputs) == 3
867+
assert len(lineage.outputs) == 1
868+
assert lineage.inputs == [
869+
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[0]),
870+
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[1]),
871+
Dataset(namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_LIST[2]),
872+
]
873+
assert lineage.outputs[0] == Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT)
874+
875+
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
876+
def test_execute_wildcard_reports_openlineage(self, mock_hook):
877+
mock_hook.return_value.list.return_value = [
878+
"test_object1.txt",
879+
"test_object2.txt",
880+
]
881+
882+
operator = GCSToGCSOperator(
883+
task_id=TASK_ID,
884+
source_bucket=TEST_BUCKET,
885+
source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
886+
destination_bucket=DESTINATION_BUCKET,
887+
destination_object=DESTINATION_OBJECT,
888+
)
889+
890+
operator.execute(None)
891+
892+
lineage = operator.get_openlineage_events_on_complete(None)
893+
assert len(lineage.inputs) == 2
894+
assert len(lineage.outputs) == 2
895+
assert lineage.inputs == [
896+
Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object1.txt"),
897+
Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object2.txt"),
898+
]
899+
assert lineage.outputs == [
900+
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/1.txt"),
901+
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="foo/bar/2.txt"),
902+
]

0 commit comments

Comments
 (0)