Skip to content

Commit 5c4cfea

Browse files
TobKedpotiuk
authored andcommitted
[AIRFLOW-5718] Add SFTPToGoogleCloudStorageOperator (#6393)
1 parent 8e5e9c1 commit 5c4cfea

File tree

7 files changed

+774
-0
lines changed

7 files changed

+774
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
Example Airflow DAG for Google Cloud Storage to SFTP transfer operators.
21+
"""
22+
23+
import os
24+
25+
import airflow
26+
from airflow import models
27+
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGoogleCloudStorageOperator
28+
29+
default_args = {"start_date": airflow.utils.dates.days_ago(1)}
30+
31+
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-sftp-gcs")
32+
33+
TMP_PATH = "/tmp"
34+
DIR = "tests_sftp_hook_dir"
35+
SUBDIR = "subdir"
36+
37+
OBJECT_SRC_1 = "parent-1.bin"
38+
OBJECT_SRC_2 = "parent-2.bin"
39+
OBJECT_SRC_3 = "parent-3.txt"
40+
41+
42+
with models.DAG(
43+
"example_sftp_to_gcs", default_args=default_args, schedule_interval=None
44+
) as dag:
45+
# [START howto_operator_sftp_to_gcs_copy_single_file]
46+
copy_file_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator(
47+
task_id="file-copy-sftp-to-gcs",
48+
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1),
49+
destination_bucket=BUCKET_SRC,
50+
)
51+
# [END howto_operator_sftp_to_gcs_copy_single_file]
52+
53+
# [START howto_operator_sftp_to_gcs_move_single_file_destination]
54+
move_file_from_sftp_to_gcs_destination = SFTPToGoogleCloudStorageOperator(
55+
task_id="file-move-sftp-to-gcs-destination",
56+
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2),
57+
destination_bucket=BUCKET_SRC,
58+
destination_path="destination_dir/destination_filename.bin",
59+
move_object=True,
60+
)
61+
# [END howto_operator_sftp_to_gcs_move_single_file_destination]
62+
63+
# [START howto_operator_sftp_to_gcs_copy_directory]
64+
copy_directory_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator(
65+
task_id="dir-copy-sftp-to-gcs",
66+
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"),
67+
destination_bucket=BUCKET_SRC,
68+
)
69+
# [END howto_operator_sftp_to_gcs_copy_directory]
70+
71+
# [START howto_operator_sftp_to_gcs_move_specific_files]
72+
move_specific_files_from_gcs_to_sftp = SFTPToGoogleCloudStorageOperator(
73+
task_id="dir-move-specific-files-sftp-to-gcs",
74+
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"),
75+
destination_bucket=BUCKET_SRC,
76+
destination_path="specific_files/",
77+
move_object=True,
78+
)
79+
# [END howto_operator_sftp_to_gcs_move_specific_files]
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
This module contains SFTP to Google Cloud Storage operator.
20+
"""
21+
import os
22+
from tempfile import NamedTemporaryFile
23+
from typing import Optional, Union
24+
25+
from airflow import AirflowException
26+
from airflow.contrib.hooks.sftp_hook import SFTPHook
27+
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
28+
from airflow.models import BaseOperator
29+
from airflow.utils.decorators import apply_defaults
30+
31+
WILDCARD = "*"
32+
33+
34+
class SFTPToGoogleCloudStorageOperator(BaseOperator):
35+
"""
36+
Transfer files to Google Cloud Storage from SFTP server.
37+
38+
.. seealso::
39+
For more information on how to use this operator, take a look at the guide:
40+
:ref:`howto/operator:SFTPToGoogleCloudStorageOperator`
41+
42+
:param source_path: The sftp remote path. This is the specified file path
43+
for downloading the single file or multiple files from the SFTP server.
44+
You can use only one wildcard within your path. The wildcard can appear
45+
inside the path or at the end of the path.
46+
:type source_path: str
47+
:param destination_bucket: The bucket to upload to.
48+
:type destination_bucket: str
49+
:param destination_path: The destination name of the object in the
50+
destination Google Cloud Storage bucket.
51+
If destination_path is not provided file/files will be placed in the
52+
main bucket path.
53+
If a wildcard is supplied in the destination_path argument, this is the
54+
prefix that will be prepended to the final destination objects' paths.
55+
:type destination_path: str
56+
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
57+
:type gcp_conn_id: str
58+
:param sftp_conn_id: The sftp connection id. The name or identifier for
59+
establishing a connection to the SFTP server.
60+
:type sftp_conn_id: str
61+
:param delegate_to: The account to impersonate, if any
62+
:type delegate_to: str
63+
:param mime_type: The mime-type string
64+
:type mime_type: str
65+
:param gzip: Allows for file to be compressed and uploaded as gzip
66+
:type gzip: bool
67+
:param move_object: When move object is True, the object is moved instead
68+
of copied to the new location. This is the equivalent of a mv command
69+
as opposed to a cp command.
70+
:type move_object: bool
71+
"""
72+
73+
template_fields = ("source_path", "destination_path", "destination_bucket")
74+
75+
@apply_defaults
76+
def __init__(
77+
self,
78+
source_path: str,
79+
destination_bucket: str,
80+
destination_path: Optional[str] = None,
81+
gcp_conn_id: str = "google_cloud_default",
82+
sftp_conn_id: str = "ssh_default",
83+
delegate_to: Optional[str] = None,
84+
mime_type: str = "application/octet-stream",
85+
gzip: bool = False,
86+
move_object: bool = False,
87+
*args,
88+
**kwargs
89+
) -> None:
90+
super().__init__(*args, **kwargs)
91+
92+
self.source_path = source_path
93+
self.destination_path = self._set_destination_path(destination_path)
94+
self.destination_bucket = self._set_bucket_name(destination_bucket)
95+
self.gcp_conn_id = gcp_conn_id
96+
self.mime_type = mime_type
97+
self.delegate_to = delegate_to
98+
self.gzip = gzip
99+
self.sftp_conn_id = sftp_conn_id
100+
self.move_object = move_object
101+
102+
def execute(self, context):
103+
gcs_hook = GoogleCloudStorageHook(
104+
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
105+
)
106+
107+
sftp_hook = SFTPHook(self.sftp_conn_id)
108+
109+
if WILDCARD in self.source_path:
110+
total_wildcards = self.source_path.count(WILDCARD)
111+
if total_wildcards > 1:
112+
raise AirflowException(
113+
"Only one wildcard '*' is allowed in source_path parameter. "
114+
"Found {} in {}.".format(total_wildcards, self.source_path)
115+
)
116+
117+
prefix, delimiter = self.source_path.split(WILDCARD, 1)
118+
base_path = os.path.dirname(prefix)
119+
120+
files, _, _ = sftp_hook.get_tree_map(
121+
base_path, prefix=prefix, delimiter=delimiter
122+
)
123+
124+
for file in files:
125+
destination_path = file.replace(base_path, self.destination_path, 1)
126+
self._copy_single_object(gcs_hook, sftp_hook, file, destination_path)
127+
128+
else:
129+
destination_object = (
130+
self.destination_path
131+
if self.destination_path
132+
else self.source_path.rsplit("/", 1)[1]
133+
)
134+
self._copy_single_object(
135+
gcs_hook, sftp_hook, self.source_path, destination_object
136+
)
137+
138+
def _copy_single_object(
139+
self,
140+
gcs_hook: GoogleCloudStorageHook,
141+
sftp_hook: SFTPHook,
142+
source_path: str,
143+
destination_object: str,
144+
) -> None:
145+
"""
146+
Helper function to copy single object.
147+
"""
148+
self.log.info(
149+
"Executing copy of %s to gs://%s/%s",
150+
source_path,
151+
self.destination_bucket,
152+
destination_object,
153+
)
154+
155+
with NamedTemporaryFile("w") as tmp:
156+
sftp_hook.retrieve_file(source_path, tmp.name)
157+
158+
gcs_hook.upload(
159+
bucket_name=self.destination_bucket,
160+
object_name=destination_object,
161+
filename=tmp.name,
162+
mime_type=self.mime_type,
163+
)
164+
165+
if self.move_object:
166+
self.log.info("Executing delete of %s", source_path)
167+
sftp_hook.delete_file(source_path)
168+
169+
@staticmethod
170+
def _set_destination_path(path: Union[str, None]) -> str:
171+
if path is not None:
172+
return path.lstrip("/") if path.startswith("/") else path
173+
return ""
174+
175+
@staticmethod
176+
def _set_bucket_name(name: str) -> str:
177+
bucket = name if not name.startswith("gs://") else name[5:]
178+
return bucket.strip("/")
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
19+
SFTP to Google Cloud Storage Transfer Operator
20+
==============================================
21+
22+
Google has a service `Google Cloud Storage <https://cloud.google.com/storage/>`__. This service is
23+
used to store large data from various applications.
24+
SFTP (SSH File Transfer Protocol) is a secure file transfer protocol.
25+
It runs over the SSH protocol. It supports the full security and authentication functionality of the SSH.
26+
27+
28+
.. contents::
29+
:depth: 1
30+
:local:
31+
32+
Prerequisite Tasks
33+
^^^^^^^^^^^^^^^^^^
34+
35+
.. include:: _partials/prerequisite_tasks.rst
36+
37+
.. _howto/operator:SFTPToGoogleCloudStorageOperator:
38+
39+
Operator
40+
^^^^^^^^
41+
42+
Transfer files between SFTP and Google Storage is performed with the
43+
:class:`~airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator` operator.
44+
45+
Use :ref:`Jinja templating <jinja-templating>` with
46+
:template-fields:`airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator`
47+
to define values dynamically.
48+
49+
Copying single files
50+
--------------------
51+
52+
The following Operator copies a single file.
53+
54+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py
55+
:language: python
56+
:dedent: 4
57+
:start-after: [START howto_operator_sftp_to_gcs_copy_single_file]
58+
:end-before: [END howto_operator_sftp_to_gcs_copy_single_file]
59+
60+
Moving a single file
61+
--------------------
62+
63+
To move the file use the ``move_object`` parameter. Once the file is copied to Google Storage,
64+
the original file from the SFTP is deleted.
65+
The ``destination_path`` parameter defines the full path of the file in the bucket.
66+
67+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py
68+
:language: python
69+
:dedent: 4
70+
:start-after: [START howto_operator_sftp_to_gcs_move_single_file_destination]
71+
:end-before: [END howto_operator_sftp_to_gcs_move_single_file_destination]
72+
73+
74+
Copying directory
75+
-----------------
76+
77+
Use the ``wildcard`` in ``source_path`` parameter to copy the directory.
78+
79+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py
80+
:language: python
81+
:dedent: 4
82+
:start-after: [START howto_operator_sftp_to_gcs_copy_directory]
83+
:end-before: [END howto_operator_sftp_to_gcs_copy_directory]
84+
85+
Moving specific files
86+
---------------------
87+
88+
Use the ``wildcard`` in ``source_path`` parameter to move the specific files.
89+
You can use only one wildcard within your path.
90+
The ``destination_path`` defines the path that is prefixed to all copied files,
91+
e.g. ``tests_sftp_hook_dir/subdir/parent-1.bin`` is copied to ``specific_files/parent-1.bin``,
92+
and ``tests_sftp_hook_dir/subdir/parent-2.bin`` is copied to ``specific_files/parent-2.bin`` .
93+
``tests_sftp_hook_dir/subdir/parent-3.txt`` is skipped.
94+
95+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py
96+
:language: python
97+
:dedent: 4
98+
:start-after: [START howto_operator_sftp_to_gcs_move_specific_files]
99+
:end-before: [END howto_operator_sftp_to_gcs_move_specific_files]
100+
101+
Reference
102+
^^^^^^^^^
103+
104+
For more information, see
105+
106+
* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__

β€Ždocs/operators-and-hooks-ref.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,11 @@ These integrations allow you to copy data from/to Google Cloud Platform.
777777
-
778778
- :mod:`airflow.operators.postgres_to_gcs`
779779

780+
* - SFTP
781+
- `Google Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
782+
- :doc:`How to use <howto/operator/gcp/sftp_to_gcs>`
783+
- :mod:`airflow.providers.google.cloud.operators.sftp_to_gcs`
784+
780785
* - SQL
781786
- `Cloud Storage (GCS) <https://cloud.google.com/gcs/>`__
782787
-

0 commit comments

Comments
 (0)