|
22 | 22 |
|
23 | 23 | import os
|
24 | 24 | from datetime import datetime
|
| 25 | +from pathlib import Path |
25 | 26 |
|
26 | 27 | from airflow import models
|
| 28 | +from airflow.models.baseoperator import chain |
| 29 | +from airflow.operators.bash import BashOperator |
| 30 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
27 | 31 | from airflow.providers.google.cloud.transfers.sftp_to_gcs import SFTPToGCSOperator
|
| 32 | +from airflow.utils.trigger_rule import TriggerRule |
28 | 33 |
|
29 |
| -BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-sftp-gcs") |
| 34 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 35 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
30 | 36 |
|
31 |
| -TMP_PATH = "/tmp" |
| 37 | +DAG_ID = "example_sftp_to_gcs" |
| 38 | +BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}" |
| 39 | + |
| 40 | +TMP_PATH = "tmp" |
32 | 41 | DIR = "tests_sftp_hook_dir"
|
33 | 42 | SUBDIR = "subdir"
|
34 | 43 |
|
35 | 44 | OBJECT_SRC_1 = "parent-1.bin"
|
36 | 45 | OBJECT_SRC_2 = "parent-2.bin"
|
37 |
| -OBJECT_SRC_3 = "parent-3.txt" |
| 46 | + |
| 47 | +CURRENT_FOLDER = Path(__file__).parent |
| 48 | +LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources") |
| 49 | + |
| 50 | +FILE_LOCAL_PATH = str(Path(LOCAL_PATH) / TMP_PATH / DIR) |
| 51 | +FILE_NAME = "tmp.tar.gz" |
38 | 52 |
|
39 | 53 |
|
40 | 54 | with models.DAG(
|
41 |
| - "example_sftp_to_gcs", |
| 55 | + DAG_ID, |
| 56 | + schedule="@once", |
42 | 57 | start_date=datetime(2021, 1, 1),
|
43 | 58 | catchup=False,
|
44 | 59 | ) as dag:
|
| 60 | + |
| 61 | + create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) |
| 62 | + |
| 63 | + unzip_file = BashOperator( |
| 64 | + task_id="unzip_data_file", bash_command=f"tar xvf {LOCAL_PATH}/{FILE_NAME} -C {LOCAL_PATH}" |
| 65 | + ) |
| 66 | + |
45 | 67 | # [START howto_operator_sftp_to_gcs_copy_single_file]
|
46 | 68 | copy_file_from_sftp_to_gcs = SFTPToGCSOperator(
|
47 | 69 | task_id="file-copy-sftp-to-gcs",
|
48 |
| - source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1), |
49 |
| - destination_bucket=BUCKET_SRC, |
| 70 | + source_path=f"{FILE_LOCAL_PATH}/{OBJECT_SRC_1}", |
| 71 | + destination_bucket=BUCKET_NAME, |
50 | 72 | )
|
51 | 73 | # [END howto_operator_sftp_to_gcs_copy_single_file]
|
52 | 74 |
|
53 | 75 | # [START howto_operator_sftp_to_gcs_move_single_file_destination]
|
54 | 76 | move_file_from_sftp_to_gcs_destination = SFTPToGCSOperator(
|
55 | 77 | task_id="file-move-sftp-to-gcs-destination",
|
56 |
| - source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2), |
57 |
| - destination_bucket=BUCKET_SRC, |
| 78 | + source_path=f"{FILE_LOCAL_PATH}/{OBJECT_SRC_2}", |
| 79 | + destination_bucket=BUCKET_NAME, |
58 | 80 | destination_path="destination_dir/destination_filename.bin",
|
59 | 81 | move_object=True,
|
60 | 82 | )
|
|
63 | 85 | # [START howto_operator_sftp_to_gcs_copy_directory]
|
64 | 86 | copy_directory_from_sftp_to_gcs = SFTPToGCSOperator(
|
65 | 87 | task_id="dir-copy-sftp-to-gcs",
|
66 |
| - source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"), |
67 |
| - destination_bucket=BUCKET_SRC, |
| 88 | + source_path=f"{FILE_LOCAL_PATH}/{SUBDIR}/*", |
| 89 | + destination_bucket=BUCKET_NAME, |
68 | 90 | )
|
69 | 91 | # [END howto_operator_sftp_to_gcs_copy_directory]
|
70 | 92 |
|
71 | 93 | # [START howto_operator_sftp_to_gcs_move_specific_files]
|
72 | 94 | move_specific_files_from_gcs_to_sftp = SFTPToGCSOperator(
|
73 | 95 | task_id="dir-move-specific-files-sftp-to-gcs",
|
74 |
| - source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"), |
75 |
| - destination_bucket=BUCKET_SRC, |
| 96 | + source_path=f"{FILE_LOCAL_PATH}/{SUBDIR}/*.bin", |
| 97 | + destination_bucket=BUCKET_NAME, |
76 | 98 | destination_path="specific_files/",
|
77 | 99 | move_object=True,
|
78 | 100 | )
|
79 | 101 | # [END howto_operator_sftp_to_gcs_move_specific_files]
|
| 102 | + |
| 103 | + delete_bucket = GCSDeleteBucketOperator( |
| 104 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 105 | + ) |
| 106 | + |
| 107 | + chain( |
| 108 | + # TEST SETUP |
| 109 | + create_bucket, |
| 110 | + unzip_file, |
| 111 | + # TEST BODY |
| 112 | + copy_file_from_sftp_to_gcs, |
| 113 | + move_file_from_sftp_to_gcs_destination, |
| 114 | + copy_directory_from_sftp_to_gcs, |
| 115 | + move_specific_files_from_gcs_to_sftp, |
| 116 | + # TEST TEARDOWN |
| 117 | + delete_bucket, |
| 118 | + ) |
| 119 | + |
| 120 | + from tests.system.utils.watcher import watcher |
| 121 | + |
| 122 | + # This test needs watcher in order to properly mark success/failure |
| 123 | + # when "tearDown" task with trigger rule is part of the DAG |
| 124 | + list(dag.tasks) >> watcher() |
| 125 | + |
| 126 | + |
| 127 | +from tests.system.utils import get_test_run # noqa: E402 |
| 128 | + |
| 129 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 130 | +test_run = get_test_run(dag) |
0 commit comments