Skip to content

Commit 51aff27

Browse files
authored
Optionally raise an error if source file does not exist in GCSToGCSOperator (#21391)
1 parent a2abf66 commit 51aff27

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

β€Žairflow/providers/google/cloud/transfers/gcs_to_gcs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class GCSToGCSOperator(BaseOperator):
9090
If set as a sequence, the identities from the list must grant
9191
Service Account Token Creator IAM role to the directly preceding identity, with first
9292
account from the list granting this role to the originating account (templated).
93+
:param source_object_required: Whether you want to raise an exception when the source object
94+
doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
9395
9496
:Example:
9597
@@ -190,6 +192,7 @@ def __init__(
190192
maximum_modified_time=None,
191193
is_older_than=None,
192194
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
195+
source_object_required=False,
193196
**kwargs,
194197
):
195198
super().__init__(**kwargs)
@@ -216,6 +219,7 @@ def __init__(
216219
self.maximum_modified_time = maximum_modified_time
217220
self.is_older_than = is_older_than
218221
self.impersonation_chain = impersonation_chain
222+
self.source_object_required = source_object_required
219223

220224
def execute(self, context: 'Context'):
221225

@@ -313,6 +317,11 @@ def _copy_source_without_wildcard(self, hook, prefix):
313317
self._copy_single_object(
314318
hook=hook, source_object=prefix, destination_object=self.destination_object
315319
)
320+
elif self.source_object_required:
321+
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
322+
self.log.warning(msg)
323+
raise AirflowException(msg)
324+
316325
for source_obj in objects:
317326
if self.destination_object is None:
318327
destination_object = source_obj

β€Žtests/providers/google/cloud/transfers/test_gcs_to_gcs.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,3 +546,20 @@ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self,
546546
mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
547547
]
548548
mock_hook.return_value.list.assert_has_calls(mock_calls)
549+
550+
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
551+
def test_execute_source_object_required_flag_true(self, mock_hook):
552+
mock_hook.return_value.exists.return_value = False
553+
operator = GCSToGCSOperator(
554+
task_id=TASK_ID,
555+
source_bucket=TEST_BUCKET,
556+
source_objects=SOURCE_OBJECTS_SINGLE_FILE,
557+
destination_bucket=DESTINATION_BUCKET,
558+
destination_object=DESTINATION_OBJECT_PREFIX,
559+
source_object_required=True,
560+
)
561+
562+
with pytest.raises(
563+
AirflowException, match=f"{SOURCE_OBJECTS_SINGLE_FILE} does not exist in bucket {TEST_BUCKET}"
564+
):
565+
operator.execute(None)

0 commit comments

Comments
 (0)