Skip to content

Commit 2571367

Browse files
authored
Moves AzureBlobStorageToGCSOperator from Azure to Google provider (#32306)
* moved AzureBlobStorageToGCSOperator to google provider
1 parent 566bc1b commit 2571367

File tree

10 files changed

+211
-168
lines changed

10 files changed

+211
-168
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
from __future__ import annotations
19+
20+
import tempfile
21+
from typing import TYPE_CHECKING, Sequence
22+
23+
from airflow.models import BaseOperator
24+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
25+
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
26+
27+
if TYPE_CHECKING:
28+
from airflow.utils.context import Context
29+
30+
31+
class AzureBlobStorageToGCSOperator(BaseOperator):
32+
"""
33+
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage.
34+
35+
.. seealso::
36+
For more information on how to use this operator, take a look at the guide:
37+
:ref:`howto/operator:AzureBlobStorageToGCSOperator`
38+
39+
:param wasb_conn_id: Reference to the wasb connection.
40+
:param gcp_conn_id: The connection ID to use when fetching connection info.
41+
:param blob_name: Name of the blob
42+
:param container_name: Name of the container
43+
:param bucket_name: The bucket to upload to
44+
:param object_name: The object name to set when uploading the file
45+
:param filename: The local file path to the file to be uploaded
46+
:param gzip: Option to compress local file or file data for upload
47+
:param impersonation_chain: Optional service account to impersonate using short-term
48+
credentials, or chained list of accounts required to get the access_token
49+
of the last account in the list, which will be impersonated in the request.
50+
If set as a string, the account must grant the originating account
51+
the Service Account Token Creator IAM role.
52+
If set as a sequence, the identities from the list must grant
53+
Service Account Token Creator IAM role to the directly preceding identity, with first
54+
account from the list granting this role to the originating account.
55+
"""
56+
57+
def __init__(
58+
self,
59+
*,
60+
wasb_conn_id="wasb_default",
61+
gcp_conn_id: str = "google_cloud_default",
62+
blob_name: str,
63+
container_name: str,
64+
bucket_name: str,
65+
object_name: str,
66+
filename: str,
67+
gzip: bool,
68+
impersonation_chain: str | Sequence[str] | None = None,
69+
**kwargs,
70+
) -> None:
71+
super().__init__(**kwargs)
72+
self.wasb_conn_id = wasb_conn_id
73+
self.gcp_conn_id = gcp_conn_id
74+
self.blob_name = blob_name
75+
self.container_name = container_name
76+
self.bucket_name = bucket_name
77+
self.object_name = object_name
78+
self.filename = filename
79+
self.gzip = gzip
80+
self.impersonation_chain = impersonation_chain
81+
82+
template_fields: Sequence[str] = (
83+
"blob_name",
84+
"container_name",
85+
"bucket_name",
86+
"object_name",
87+
"filename",
88+
)
89+
90+
def execute(self, context: Context) -> str:
91+
azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
92+
gcs_hook = GCSHook(
93+
gcp_conn_id=self.gcp_conn_id,
94+
impersonation_chain=self.impersonation_chain,
95+
)
96+
97+
with tempfile.NamedTemporaryFile() as temp_file:
98+
self.log.info("Downloading data from blob: %s", self.blob_name)
99+
azure_hook.get_file(
100+
file_path=temp_file.name,
101+
container_name=self.container_name,
102+
blob_name=self.blob_name,
103+
)
104+
self.log.info(
105+
"Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name
106+
)
107+
gcs_hook.upload(
108+
bucket_name=self.bucket_name,
109+
object_name=self.object_name,
110+
filename=temp_file.name,
111+
gzip=self.gzip,
112+
)
113+
self.log.info(
114+
"Resources have been uploaded from blob: %s to GCS bucket:%s",
115+
self.blob_name,
116+
self.bucket_name,
117+
)
118+
return f"gs://{self.bucket_name}/{self.object_name}"

β€Žairflow/providers/google/provider.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,10 @@ transfers:
10071007
target-integration-name: Google Cloud Storage (GCS)
10081008
python-module: airflow.providers.google.cloud.transfers.mssql_to_gcs
10091009
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mssql_to_gcs.rst
1010-
1010+
- source-integration-name: Microsoft Azure Blob Storage
1011+
target-integration-name: Google Cloud Storage (GCS)
1012+
python-module: airflow.providers.google.cloud.transfers.azure_blob_to_gcs
1013+
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/azure_blob_to_gcs.rst
10111014

10121015
connection-types:
10131016
- hook-class-name: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

β€Žairflow/providers/microsoft/azure/provider.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,15 @@ transfers:
242242
python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb
243243
- source-integration-name: Microsoft Azure Blob Storage
244244
target-integration-name: Google Cloud Storage (GCS)
245-
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst
246245
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
247246
- source-integration-name: SSH File Transfer Protocol (SFTP)
248247
target-integration-name: Microsoft Azure Blob Storage
249248
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst
250249
python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb
250+
- source-integration-name: Microsoft Azure Blob Storage
251+
target-integration-name: Google Cloud Storage (GCS)
252+
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst
253+
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
251254

252255

253256
connection-types:

β€Žairflow/providers/microsoft/azure/transfers/azure_blob_to_gcs.py

Lines changed: 16 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -17,102 +17,26 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
import tempfile
21-
from typing import TYPE_CHECKING, Sequence
20+
import warnings
2221

23-
from airflow.models import BaseOperator
24-
from airflow.providers.google.cloud.hooks.gcs import GCSHook
25-
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
22+
from airflow.exceptions import AirflowProviderDeprecationWarning
23+
from airflow.providers.google.cloud.transfers.azure_blob_to_gcs import (
24+
AzureBlobStorageToGCSOperator as AzureBlobStorageToGCSOperatorFromGoogleProvider,
25+
)
2626

27-
if TYPE_CHECKING:
28-
from airflow.utils.context import Context
2927

30-
31-
class AzureBlobStorageToGCSOperator(BaseOperator):
28+
class AzureBlobStorageToGCSOperator(AzureBlobStorageToGCSOperatorFromGoogleProvider):
3229
"""
33-
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage.
34-
35-
.. seealso::
36-
For more information on how to use this operator, take a look at the guide:
37-
:ref:`howto/operator:AzureBlobStorageToGCSOperator`
38-
39-
:param wasb_conn_id: Reference to the wasb connection.
40-
:param gcp_conn_id: The connection ID to use when fetching connection info.
41-
:param blob_name: Name of the blob
42-
:param container_name: Name of the container
43-
:param bucket_name: The bucket to upload to
44-
:param object_name: The object name to set when uploading the file
45-
:param filename: The local file path to the file to be uploaded
46-
:param gzip: Option to compress local file or file data for upload
47-
:param impersonation_chain: Optional service account to impersonate using short-term
48-
credentials, or chained list of accounts required to get the access_token
49-
of the last account in the list, which will be impersonated in the request.
50-
If set as a string, the account must grant the originating account
51-
the Service Account Token Creator IAM role.
52-
If set as a sequence, the identities from the list must grant
53-
Service Account Token Creator IAM role to the directly preceding identity, with first
54-
account from the list granting this role to the originating account.
30+
This class is deprecated.
31+
Please use `airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.
5532
"""
5633

57-
def __init__(
58-
self,
59-
*,
60-
wasb_conn_id="wasb_default",
61-
gcp_conn_id: str = "google_cloud_default",
62-
blob_name: str,
63-
container_name: str,
64-
bucket_name: str,
65-
object_name: str,
66-
filename: str,
67-
gzip: bool,
68-
impersonation_chain: str | Sequence[str] | None = None,
69-
**kwargs,
70-
) -> None:
71-
super().__init__(**kwargs)
72-
self.wasb_conn_id = wasb_conn_id
73-
self.gcp_conn_id = gcp_conn_id
74-
self.blob_name = blob_name
75-
self.container_name = container_name
76-
self.bucket_name = bucket_name
77-
self.object_name = object_name
78-
self.filename = filename
79-
self.gzip = gzip
80-
self.impersonation_chain = impersonation_chain
81-
82-
template_fields: Sequence[str] = (
83-
"blob_name",
84-
"container_name",
85-
"bucket_name",
86-
"object_name",
87-
"filename",
88-
)
89-
90-
def execute(self, context: Context) -> str:
91-
azure_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
92-
gcs_hook = GCSHook(
93-
gcp_conn_id=self.gcp_conn_id,
94-
impersonation_chain=self.impersonation_chain,
34+
def __init__(self, *args, **kwargs):
35+
warnings.warn(
36+
"""This class is deprecated.
37+
Please use
38+
`airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`.""",
39+
AirflowProviderDeprecationWarning,
40+
stacklevel=2,
9541
)
96-
97-
with tempfile.NamedTemporaryFile() as temp_file:
98-
self.log.info("Downloading data from blob: %s", self.blob_name)
99-
azure_hook.get_file(
100-
file_path=temp_file.name,
101-
container_name=self.container_name,
102-
blob_name=self.blob_name,
103-
)
104-
self.log.info(
105-
"Uploading data from blob's: %s into GCP bucket: %s", self.object_name, self.bucket_name
106-
)
107-
gcs_hook.upload(
108-
bucket_name=self.bucket_name,
109-
object_name=self.object_name,
110-
filename=temp_file.name,
111-
gzip=self.gzip,
112-
)
113-
self.log.info(
114-
"Resources have been uploaded from blob: %s to GCS bucket:%s",
115-
self.blob_name,
116-
self.bucket_name,
117-
)
118-
return f"gs://{self.bucket_name}/{self.object_name}"
42+
super().__init__(*args, **kwargs)

β€Ždocs/apache-airflow-providers-google/connections/gcp.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,4 +298,4 @@ Note that as domain-wide delegation is currently supported by most of the Google
298298

299299
* All of Google Cloud operators and hooks.
300300
* Firebase hooks.
301-
* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs`.
301+
* All transfer operators that involve Google cloud in different providers, for example: :class:`airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator`.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
2+
.. Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
.. http://www.apache.org/licenses/LICENSE-2.0
11+
12+
.. Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
19+
Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator
20+
==================================================================
21+
The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) is used to store large data from various applications.
22+
This is also the same with `Azure Blob Storage <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`__.
23+
This page shows how to transfer data from Azure Blob Storage to GCS.
24+
25+
Prerequisite Tasks
26+
^^^^^^^^^^^^^^^^^^
27+
28+
.. include:: ../_partials/prerequisite_tasks.rst
29+
30+
.. _howto/operator:AzureBlobStorageToGCSOperator:
31+
32+
Transfer Data from Azure Blob Storage to Google Cloud Storage
33+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
34+
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage
35+
36+
Use the :class:`~airflow.providers.google.cloud.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`
37+
to transfer data from Azure Blob Storage to Google Cloud Storage.
38+
39+
Example usage:
40+
41+
.. exampleinclude:: /../../tests/system/providers/google/cloud/azure/example_azure_blob_to_gcs.py
42+
:language: python
43+
:start-after: [START how_to_azure_blob_to_gcs]
44+
:end-before: [END how_to_azure_blob_to_gcs]
45+
46+
Reference
47+
^^^^^^^^^
48+
49+
For further information, look at:
50+
51+
* `GCS Client Library Documentation <https://googleapis.dev/python/storage/latest/index.html>`__
52+
* `GCS Product Documentation <https://cloud.google.com/storage/docs/>`__
53+
* `Azure Blob Storage Client Library Documentation <https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python>`__
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
connections/index.rst connections/azure.rst
22
secrets-backends/index.rst secrets-backends/azure-key-vault-secrets-backend.rst
33
logging.rst logging/index.rst
4+
transfer/azure_blob_to_gcs.rst ../../apache-airflow-providers-google/latest/operators/transfer/azure_blob_to_gcs.rst
Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
.. Licensed to the Apache Software Foundation (ASF) under one
32
or more contributor license agreements. See the NOTICE file
43
distributed with this work for additional information
@@ -16,45 +15,6 @@
1615
specific language governing permissions and limitations
1716
under the License.
1817
19-
Azure Blob Storage to Google Cloud Storage (GCS) Transfer Operator
20-
==================================================================
21-
The Blob service stores text and binary data as objects in the cloud.
22-
The Blob service offers the following three resources: the storage account, containers, and blobs.
23-
Within your storage account, containers provide a way to organize sets of blobs.
24-
For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.
25-
26-
Before you begin
27-
^^^^^^^^^^^^^^^^
28-
Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
29-
Please follow Azure
30-
`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
31-
to do it.
32-
33-
TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text.
34-
You can check `how to do such connection <https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/index.html#editing-a-connection-with-the-ui>`_.
35-
36-
See following example.
37-
Set values for these fields:
38-
39-
.. code-block::
40-
41-
Connection Id: wasb_default
42-
Login: Storage Account Name
43-
Password: KEY1
44-
Extra: {"sas_token": "TOKEN"}
45-
46-
.. _howto/operator:AzureBlobStorageToGCSOperator:
47-
48-
Transfer Data from Blob Storage to Google Cloud Storage
49-
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
50-
Operator transfers data from Azure Blob Storage to specified bucket in Google Cloud Storage
51-
52-
To get information about jobs within a Azure Blob Storage use:
53-
:class:`~airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs.AzureBlobStorageToGCSOperator`
54-
55-
Example usage:
5618
57-
.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_blob_to_gcs.py
58-
:language: python
59-
:start-after: [START how_to_azure_blob_to_gcs]
60-
:end-before: [END how_to_azure_blob_to_gcs]
19+
Upload data from Azure Blob Storage to Google Cloud Storage (Moved to Google Providers)
20+
=======================================================================================

0 commit comments

Comments
 (0)