Skip to content

Commit 057f3ae

Browse files
authored
[AIRFLOW-6670][depends on AIRFLOW-6669] Move contrib operators to providers package (#7286)
1 parent 1988a97 commit 057f3ae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2367
-1881
lines changed

β€Žairflow/contrib/example_dags/example_gcs_to_gdrive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import os
2323

2424
from airflow import models
25-
from airflow.contrib.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator
25+
from airflow.providers.google.suite.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator
2626
from airflow.utils.dates import days_ago
2727

2828
GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")

β€Žairflow/contrib/operators/file_to_wasb.py

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,14 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
#
20-
from airflow.models import BaseOperator
21-
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
22-
from airflow.utils.decorators import apply_defaults
23-
24-
25-
class FileToWasbOperator(BaseOperator):
26-
"""
27-
Uploads a file to Azure Blob Storage.
19+
"""This module is deprecated. Please use `airflow.providers.microsoft.azure.operators.file_to_wasb`."""
2820

29-
:param file_path: Path to the file to load. (templated)
30-
:type file_path: str
31-
:param container_name: Name of the container. (templated)
32-
:type container_name: str
33-
:param blob_name: Name of the blob. (templated)
34-
:type blob_name: str
35-
:param wasb_conn_id: Reference to the wasb connection.
36-
:type wasb_conn_id: str
37-
:param load_options: Optional keyword arguments that
38-
`WasbHook.load_file()` takes.
39-
:type load_options: dict
40-
"""
41-
template_fields = ('file_path', 'container_name', 'blob_name')
21+
import warnings
4222

43-
@apply_defaults
44-
def __init__(self, file_path, container_name, blob_name,
45-
wasb_conn_id='wasb_default', load_options=None, *args,
46-
**kwargs):
47-
super().__init__(*args, **kwargs)
48-
if load_options is None:
49-
load_options = {}
50-
self.file_path = file_path
51-
self.container_name = container_name
52-
self.blob_name = blob_name
53-
self.wasb_conn_id = wasb_conn_id
54-
self.load_options = load_options
23+
# pylint: disable=unused-import
24+
from airflow.providers.microsoft.azure.operators.file_to_wasb import FileToWasbOperator # noqa
5525

56-
def execute(self, context):
57-
"""Upload a file to Azure Blob Storage."""
58-
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
59-
self.log.info(
60-
'Uploading %s to wasb://%s as %s',
61-
self.file_path, self.container_name, self.blob_name,
62-
)
63-
hook.load_file(self.file_path, self.container_name,
64-
self.blob_name, **self.load_options)
26+
warnings.warn(
27+
"This module is deprecated. Please use `airflow.providers.microsoft.azure.operators.file_to_wasb`.",
28+
DeprecationWarning, stacklevel=2
29+
)

β€Žairflow/contrib/operators/gcs_to_gdrive_operator.py

Lines changed: 9 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -16,132 +16,15 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
"""
20-
This module contains a Google Cloud Storage operator.
21-
"""
22-
import tempfile
23-
from typing import Optional
19+
"""This module is deprecated. Please use `airflow.providers.google.suite.operators.gcs_to_gdrive_operator`."""
2420

25-
from airflow.exceptions import AirflowException
26-
from airflow.gcp.hooks.gcs import GCSHook
27-
from airflow.models import BaseOperator
28-
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
29-
from airflow.utils.decorators import apply_defaults
21+
import warnings
3022

31-
WILDCARD = "*"
23+
# pylint: disable=unused-import
24+
from airflow.providers.google.suite.operators.gcs_to_gdrive_operator import GCSToGoogleDriveOperator # noqa
3225

33-
34-
class GCSToGoogleDriveOperator(BaseOperator):
35-
"""
36-
Copies objects from a Google Cloud Storage service service to Google Drive service, with renaming
37-
if requested.
38-
39-
Using this operator requires the following OAuth 2.0 scope:
40-
41-
.. code-block:: none
42-
43-
https://www.googleapis.com/auth/drive
44-
45-
:param source_bucket: The source Google Cloud Storage bucket where the object is. (templated)
46-
:type source_bucket: str
47-
:param source_object: The source name of the object to copy in the Google cloud
48-
storage bucket. (templated)
49-
You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear
50-
inside the object name or at the end of the object name. Appending a wildcard to the bucket name
51-
is unsupported.
52-
:type source_object: str
53-
:param destination_object: The destination name of the object in the destination Google Drive
54-
service. (templated)
55-
If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended
56-
to the final destination objects' paths.
57-
Note that the source path's part before the wildcard will be removed;
58-
if it needs to be retained it should be appended to destination_object.
59-
For example, with prefix ``foo/*`` and destination_object ``blah/``, the file ``foo/baz`` will be
60-
copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in
61-
which case the copied file will be named ``blah/foo/baz``.
62-
:type destination_object: str
63-
:param move_object: When move object is True, the object is moved instead of copied to the new location.
64-
This is the equivalent of a mv command as opposed to a cp command.
65-
:type move_object: bool
66-
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
67-
:type gcp_conn_id: str
68-
:param delegate_to: The account to impersonate, if any.
69-
For this to work, the service account making the request must have domain-wide delegation enabled.
70-
:type delegate_to: str
71-
"""
72-
73-
template_fields = ("source_bucket", "source_object", "destination_object")
74-
ui_color = "#f0eee4"
75-
76-
@apply_defaults
77-
def __init__(
78-
self,
79-
source_bucket: str,
80-
source_object: str,
81-
destination_object: Optional[str] = None,
82-
move_object: bool = False,
83-
gcp_conn_id: str = "google_cloud_default",
84-
delegate_to: Optional[str] = None,
85-
*args,
86-
**kwargs
87-
):
88-
super().__init__(*args, **kwargs)
89-
90-
self.source_bucket = source_bucket
91-
self.source_object = source_object
92-
self.destination_object = destination_object
93-
self.move_object = move_object
94-
self.gcp_conn_id = gcp_conn_id
95-
self.delegate_to = delegate_to
96-
self.gcs_hook = None # type: Optional[GCSHook]
97-
self.gdrive_hook = None # type: Optional[GoogleDriveHook]
98-
99-
def execute(self, context):
100-
101-
self.gcs_hook = GCSHook(
102-
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
103-
)
104-
self.gdrive_hook = GoogleDriveHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
105-
106-
if WILDCARD in self.source_object:
107-
total_wildcards = self.source_object.count(WILDCARD)
108-
if total_wildcards > 1:
109-
error_msg = (
110-
"Only one wildcard '*' is allowed in source_object parameter. "
111-
"Found {} in {}.".format(total_wildcards, self.source_object)
112-
)
113-
114-
raise AirflowException(error_msg)
115-
116-
prefix, delimiter = self.source_object.split(WILDCARD, 1)
117-
objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter)
118-
119-
for source_object in objects:
120-
if self.destination_object is None:
121-
destination_object = source_object
122-
else:
123-
destination_object = source_object.replace(prefix, self.destination_object, 1)
124-
125-
self._copy_single_object(source_object=source_object, destination_object=destination_object)
126-
else:
127-
self._copy_single_object(
128-
source_object=self.source_object, destination_object=self.destination_object
129-
)
130-
131-
def _copy_single_object(self, source_object, destination_object):
132-
self.log.info(
133-
"Executing copy of gs://%s/%s to gdrive://%s",
134-
self.source_bucket,
135-
source_object,
136-
destination_object,
137-
)
138-
139-
with tempfile.NamedTemporaryFile() as file:
140-
filename = file.name
141-
self.gcs_hook.download(
142-
bucket_name=self.source_bucket, object_name=source_object, filename=filename
143-
)
144-
self.gdrive_hook.upload_file(local_location=filename, remote_location=destination_object)
145-
146-
if self.move_object:
147-
self.gcs_hook.delete(self.source_bucket, source_object)
26+
warnings.warn(
27+
"This module is deprecated. "
28+
"Please use `airflow.providers.google.suite.operators.gcs_to_gdrive_operator`.",
29+
DeprecationWarning, stacklevel=2
30+
)

β€Žairflow/contrib/operators/oracle_to_azure_data_lake_transfer.py

Lines changed: 17 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -16,101 +16,20 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
20-
import os
21-
from tempfile import TemporaryDirectory
22-
23-
import unicodecsv as csv
24-
25-
from airflow.models import BaseOperator
26-
from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
27-
from airflow.providers.oracle.hooks.oracle import OracleHook
28-
from airflow.utils.decorators import apply_defaults
29-
30-
31-
class OracleToAzureDataLakeTransfer(BaseOperator):
32-
"""
33-
Moves data from Oracle to Azure Data Lake. The operator runs the query against
34-
Oracle and stores the file locally before loading it into Azure Data Lake.
35-
36-
37-
:param filename: file name to be used by the csv file.
38-
:type filename: str
39-
:param azure_data_lake_conn_id: destination azure data lake connection.
40-
:type azure_data_lake_conn_id: str
41-
:param azure_data_lake_path: destination path in azure data lake to put the file.
42-
:type azure_data_lake_path: str
43-
:param oracle_conn_id: source Oracle connection.
44-
:type oracle_conn_id: str
45-
:param sql: SQL query to execute against the Oracle database. (templated)
46-
:type sql: str
47-
:param sql_params: Parameters to use in sql query. (templated)
48-
:type sql_params: str
49-
:param delimiter: field delimiter in the file.
50-
:type delimiter: str
51-
:param encoding: encoding type for the file.
52-
:type encoding: str
53-
:param quotechar: Character to use in quoting.
54-
:type quotechar: str
55-
:param quoting: Quoting strategy. See unicodecsv quoting for more information.
56-
:type quoting: str
57-
"""
58-
59-
template_fields = ('filename', 'sql', 'sql_params')
60-
ui_color = '#e08c8c'
61-
62-
@apply_defaults
63-
def __init__(
64-
self,
65-
filename,
66-
azure_data_lake_conn_id,
67-
azure_data_lake_path,
68-
oracle_conn_id,
69-
sql,
70-
sql_params=None,
71-
delimiter=",",
72-
encoding="utf-8",
73-
quotechar='"',
74-
quoting=csv.QUOTE_MINIMAL,
75-
*args, **kwargs):
76-
super().__init__(*args, **kwargs)
77-
if sql_params is None:
78-
sql_params = {}
79-
self.filename = filename
80-
self.oracle_conn_id = oracle_conn_id
81-
self.sql = sql
82-
self.sql_params = sql_params
83-
self.azure_data_lake_conn_id = azure_data_lake_conn_id
84-
self.azure_data_lake_path = azure_data_lake_path
85-
self.delimiter = delimiter
86-
self.encoding = encoding
87-
self.quotechar = quotechar
88-
self.quoting = quoting
89-
90-
def _write_temp_file(self, cursor, path_to_save):
91-
with open(path_to_save, 'wb') as csvfile:
92-
csv_writer = csv.writer(csvfile, delimiter=self.delimiter,
93-
encoding=self.encoding, quotechar=self.quotechar,
94-
quoting=self.quoting)
95-
csv_writer.writerow(map(lambda field: field[0], cursor.description))
96-
csv_writer.writerows(cursor)
97-
csvfile.flush()
98-
99-
def execute(self, context):
100-
oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
101-
azure_data_lake_hook = AzureDataLakeHook(
102-
azure_data_lake_conn_id=self.azure_data_lake_conn_id)
103-
104-
self.log.info("Dumping Oracle query results to local file")
105-
conn = oracle_hook.get_conn()
106-
cursor = conn.cursor()
107-
cursor.execute(self.sql, self.sql_params)
108-
109-
with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
110-
self._write_temp_file(cursor, os.path.join(temp, self.filename))
111-
self.log.info("Uploading local file to Azure Data Lake")
112-
azure_data_lake_hook.upload_file(os.path.join(temp, self.filename),
113-
os.path.join(self.azure_data_lake_path,
114-
self.filename))
115-
cursor.close()
116-
conn.close()
19+
"""
20+
This module is deprecated.
21+
Please use `airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer`.
22+
"""
23+
24+
import warnings
25+
26+
# pylint: disable=unused-import
27+
from airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer import ( # noqa
28+
OracleToAzureDataLakeTransfer,
29+
)
30+
31+
warnings.warn(
32+
"This module is deprecated. "
33+
"Please use `airflow.providers.microsoft.azure.operators.oracle_to_azure_data_lake_transfer`.",
34+
DeprecationWarning, stacklevel=2
35+
)

0 commit comments

Comments
 (0)