Skip to content

Commit ce5eebd

Browse files
authored
Fix system test for MetastoreHivePartitionSensor (#32861)
1 parent c422920 commit ce5eebd

File tree

3 files changed

+190
-14
lines changed

3 files changed

+190
-14
lines changed

β€Žairflow/providers/google/provider.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ dependencies:
9494
- google-cloud-dataform>=0.5.0
9595
- google-cloud-dataplex>=1.4.2
9696
- google-cloud-dataproc>=5.4.0
97-
- google-cloud-dataproc-metastore>=1.10.0
97+
- google-cloud-dataproc-metastore>=1.12.0
9898
- google-cloud-dlp>=3.12.0
9999
- google-cloud-kms>=2.15.0
100100
- google-cloud-language>=2.9.0

β€Žgenerated/provider_dependencies.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@
426426
"google-cloud-dataflow-client>=0.8.2",
427427
"google-cloud-dataform>=0.5.0",
428428
"google-cloud-dataplex>=1.4.2",
429-
"google-cloud-dataproc-metastore>=1.10.0",
429+
"google-cloud-dataproc-metastore>=1.12.0",
430430
"google-cloud-dataproc>=5.4.0",
431431
"google-cloud-dlp>=3.12.0",
432432
"google-cloud-kms>=2.15.0",

β€Žtests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py

Lines changed: 188 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,221 @@
1919
Example Airflow DAG that show how to check Hive partitions existence
2020
using Dataproc Metastore Sensor.
2121
22-
Note that Metastore service must be configured to use gRPC endpoints,
22+
Note that Metastore service must be configured to use gRPC endpoints.
2323
"""
2424
from __future__ import annotations
2525

2626
import datetime
2727
import os
2828

2929
from airflow import models
30+
from airflow.decorators import task
31+
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
32+
from airflow.providers.google.cloud.operators.dataproc import (
33+
DataprocCreateClusterOperator,
34+
DataprocDeleteClusterOperator,
35+
DataprocSubmitJobOperator,
36+
)
37+
from airflow.providers.google.cloud.operators.dataproc_metastore import (
38+
DataprocMetastoreCreateServiceOperator,
39+
DataprocMetastoreDeleteServiceOperator,
40+
)
41+
from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator
3042
from airflow.providers.google.cloud.sensors.dataproc_metastore import MetastoreHivePartitionSensor
43+
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
44+
from airflow.utils.trigger_rule import TriggerRule
3145

32-
DAG_ID = "dataproc_metastore_hive_partition_sensor"
33-
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
34-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
46+
DAG_ID = "hive_partition_sensor"
47+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
48+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
49+
REGION = "us-central1"
50+
NETWORK = "default"
3551

36-
SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
37-
REGION = "europe-west1"
38-
TABLE_NAME = "test_table"
39-
PARTITION_1 = "column1=value1"
40-
PARTITION_2 = "column2=value2/column3=value3"
52+
METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
53+
METASTORE_TIMEOUT = 2400
54+
METASTORE_SERVICE = {
55+
"name": METASTORE_SERVICE_ID,
56+
"hive_metastore_config": {
57+
"endpoint_protocol": "GRPC",
58+
},
59+
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
60+
}
61+
METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
62+
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
63+
DATAPROC_CLUSTER_CONFIG = {
64+
"master_config": {
65+
"num_instances": 1,
66+
"machine_type_uri": "n1-standard-2",
67+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
68+
},
69+
"worker_config": {
70+
"num_instances": 2,
71+
"machine_type_uri": "n1-standard-2",
72+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
73+
},
74+
"metastore_config": {
75+
"dataproc_metastore_service": METASTORE_SERVICE_QFN,
76+
},
77+
"gce_cluster_config": {
78+
"service_account_scopes": [
79+
"https://www.googleapis.com/auth/cloud-platform",
80+
],
81+
},
82+
}
4183

84+
TABLE_NAME = "transactions_partitioned"
85+
COLUMN = "TransactionType"
86+
PARTITION_1 = f"{COLUMN}=credit".lower()
87+
PARTITION_2 = f"{COLUMN}=debit".lower()
88+
SOURCE_DATA_BUCKET = "airflow-system-tests-resources"
89+
SOURCE_DATA_PATH = "dataproc/hive"
90+
SOURCE_DATA_FILE_NAME = "part-00000.parquet"
91+
EXTERNAL_TABLE_BUCKET = "{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task', key='bucket')}}"
92+
QUERY_CREATE_EXTERNAL_TABLE = f"""
93+
CREATE EXTERNAL TABLE IF NOT EXISTS transactions
94+
(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
95+
STORED AS PARQUET
96+
LOCATION 'gs://{EXTERNAL_TABLE_BUCKET}/{SOURCE_DATA_PATH}';
97+
"""
98+
QUERY_CREATE_PARTITIONED_TABLE = f"""
99+
CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME}
100+
(SubmissionDate DATE, TransactionAmount DOUBLE)
101+
PARTITIONED BY ({COLUMN} STRING);
102+
"""
103+
QUERY_COPY_DATA_WITH_PARTITIONS = f"""
104+
SET hive.exec.dynamic.partition.mode=nonstrict;
105+
INSERT INTO TABLE {TABLE_NAME} PARTITION ({COLUMN})
106+
SELECT SubmissionDate,TransactionAmount,TransactionType FROM transactions;
107+
"""
42108

43109
with models.DAG(
44110
DAG_ID,
45111
start_date=datetime.datetime(2021, 1, 1),
46112
schedule="@once",
47113
catchup=False,
48-
tags=["example", "dataproc", "metastore"],
114+
tags=["example", "dataproc", "metastore", "partition", "hive", "sensor"],
49115
) as dag:
50116

117+
create_metastore_service = DataprocMetastoreCreateServiceOperator(
118+
task_id="create_metastore_service",
119+
region=REGION,
120+
project_id=PROJECT_ID,
121+
service=METASTORE_SERVICE,
122+
service_id=METASTORE_SERVICE_ID,
123+
timeout=METASTORE_TIMEOUT,
124+
)
125+
126+
create_cluster = DataprocCreateClusterOperator(
127+
task_id="create_cluster",
128+
cluster_name=DATAPROC_CLUSTER_NAME,
129+
project_id=PROJECT_ID,
130+
cluster_config=DATAPROC_CLUSTER_CONFIG,
131+
region=REGION,
132+
)
133+
134+
@task(task_id="get_hive_warehouse_bucket_task")
135+
def get_hive_warehouse_bucket(**kwargs):
136+
"""Returns Hive Metastore Warehouse GCS bucket name."""
137+
ti = kwargs["ti"]
138+
metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service")
139+
config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"]
140+
destination_dir: str = config_overrides["hive.metastore.warehouse.dir"]
141+
bucket, _ = _parse_gcs_url(destination_dir)
142+
ti.xcom_push(key="bucket", value=bucket)
143+
144+
get_hive_warehouse_bucket_task = get_hive_warehouse_bucket()
145+
146+
copy_source_data = GCSToGCSOperator(
147+
task_id="copy_source_data",
148+
source_bucket=SOURCE_DATA_BUCKET,
149+
source_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
150+
destination_bucket=EXTERNAL_TABLE_BUCKET,
151+
destination_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
152+
)
153+
154+
create_external_table = DataprocSubmitJobOperator(
155+
task_id="create_external_table",
156+
job={
157+
"reference": {"project_id": PROJECT_ID},
158+
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
159+
"hive_job": {"query_list": {"queries": [QUERY_CREATE_EXTERNAL_TABLE]}},
160+
},
161+
region=REGION,
162+
project_id=PROJECT_ID,
163+
)
164+
165+
create_partitioned_table = DataprocSubmitJobOperator(
166+
task_id="create_partitioned_table",
167+
job={
168+
"reference": {"project_id": PROJECT_ID},
169+
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
170+
"hive_job": {"query_list": {"queries": [QUERY_CREATE_PARTITIONED_TABLE]}},
171+
},
172+
region=REGION,
173+
project_id=PROJECT_ID,
174+
)
175+
176+
partition_data = DataprocSubmitJobOperator(
177+
task_id="partition_data",
178+
job={
179+
"reference": {"project_id": PROJECT_ID},
180+
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
181+
"hive_job": {"query_list": {"queries": [QUERY_COPY_DATA_WITH_PARTITIONS]}},
182+
},
183+
region=REGION,
184+
project_id=PROJECT_ID,
185+
)
186+
51187
# [START how_to_cloud_dataproc_metastore_hive_partition_sensor]
52-
sensor = MetastoreHivePartitionSensor(
188+
hive_partition_sensor = MetastoreHivePartitionSensor(
53189
task_id="hive_partition_sensor",
54-
service_id=SERVICE_ID,
190+
service_id=METASTORE_SERVICE_ID,
55191
region=REGION,
56192
table=TABLE_NAME,
57193
partitions=[PARTITION_1, PARTITION_2],
58194
)
59195
# [END how_to_cloud_dataproc_metastore_hive_partition_sensor]
60196

197+
delete_dataproc_cluster = DataprocDeleteClusterOperator(
198+
task_id="delete_dataproc_cluster",
199+
cluster_name=DATAPROC_CLUSTER_NAME,
200+
project_id=PROJECT_ID,
201+
region=REGION,
202+
trigger_rule=TriggerRule.ALL_DONE,
203+
)
204+
205+
delete_metastore_service = DataprocMetastoreDeleteServiceOperator(
206+
task_id="delete_metastore_service",
207+
service_id=METASTORE_SERVICE_ID,
208+
project_id=PROJECT_ID,
209+
region=REGION,
210+
trigger_rule=TriggerRule.ALL_DONE,
211+
)
212+
213+
delete_warehouse_bucket = GCSDeleteBucketOperator(
214+
task_id="delete_warehouse_bucket",
215+
bucket_name=EXTERNAL_TABLE_BUCKET,
216+
trigger_rule=TriggerRule.ALL_DONE,
217+
)
218+
219+
# TEST SETUP
220+
(
221+
create_metastore_service
222+
>> create_cluster
223+
>> get_hive_warehouse_bucket_task
224+
>> copy_source_data
225+
>> create_external_table
226+
>> create_partitioned_table
227+
>> partition_data
228+
)
229+
(
230+
create_metastore_service
231+
# TEST BODY
232+
>> hive_partition_sensor
233+
# TEST TEARDOWN
234+
>> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]
235+
)
236+
61237
from tests.system.utils.watcher import watcher
62238

63239
# This test needs watcher in order to properly mark success/failure

0 commit comments

Comments
 (0)