21
21
which allows you to connect to Google Cloud Tasks service,
22
22
performing actions to queues or tasks.
23
23
"""
24
-
25
24
from typing import Dict , List , Optional , Sequence , Tuple , Union
26
25
27
26
from google .api_core .retry import Retry
28
- from google .cloud .tasks_v2 import CloudTasksClient
29
- from google .cloud .tasks_v2 .types import Queue , Task
30
- from google .protobuf .field_mask_pb2 import FieldMask
27
+ from google .cloud .tasks_v2 import CloudTasksClient , enums
28
+ from google .cloud .tasks_v2 .types import FieldMask , Queue , Task
31
29
32
30
from airflow .exceptions import AirflowException
33
31
from airflow .providers .google .common .hooks .base_google import GoogleBaseHook
@@ -122,19 +120,20 @@ def create_queue(
122
120
client = self .get_conn ()
123
121
124
122
if queue_name :
125
- full_queue_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } "
123
+ full_queue_name = CloudTasksClient . queue_path ( project_id , location , queue_name )
126
124
if isinstance (task_queue , Queue ):
127
125
task_queue .name = full_queue_name
128
126
elif isinstance (task_queue , dict ):
129
127
task_queue ['name' ] = full_queue_name
130
128
else :
131
129
raise AirflowException ('Unable to set queue_name.' )
132
- full_location_path = f"projects/ { project_id } /locations/ { location } "
130
+ full_location_path = CloudTasksClient . location_path ( project_id , location )
133
131
return client .create_queue (
134
- request = {'parent' : full_location_path , 'queue' : task_queue },
132
+ parent = full_location_path ,
133
+ queue = task_queue ,
135
134
retry = retry ,
136
135
timeout = timeout ,
137
- metadata = metadata or () ,
136
+ metadata = metadata ,
138
137
)
139
138
140
139
@GoogleBaseHook .fallback_to_default_project_id
@@ -168,7 +167,7 @@ def update_queue(
168
167
:param update_mask: A mast used to specify which fields of the queue are being updated.
169
168
If empty, then all fields will be updated.
170
169
If a dict is provided, it must be of the same form as the protobuf message.
171
- :type update_mask: dict or google.protobuf.field_mask_pb2 .FieldMask
170
+ :type update_mask: dict or google.cloud.tasks_v2.types .FieldMask
172
171
:param retry: (Optional) A retry object used to retry requests.
173
172
If None is specified, requests will not be retried.
174
173
:type retry: google.api_core.retry.Retry
@@ -183,18 +182,19 @@ def update_queue(
183
182
client = self .get_conn ()
184
183
185
184
if queue_name and location :
186
- full_queue_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } "
185
+ full_queue_name = CloudTasksClient . queue_path ( project_id , location , queue_name )
187
186
if isinstance (task_queue , Queue ):
188
187
task_queue .name = full_queue_name
189
188
elif isinstance (task_queue , dict ):
190
189
task_queue ['name' ] = full_queue_name
191
190
else :
192
191
raise AirflowException ('Unable to set queue_name.' )
193
192
return client .update_queue (
194
- request = {'queue' : task_queue , 'update_mask' : update_mask },
193
+ queue = task_queue ,
194
+ update_mask = update_mask ,
195
195
retry = retry ,
196
196
timeout = timeout ,
197
- metadata = metadata or () ,
197
+ metadata = metadata ,
198
198
)
199
199
200
200
@GoogleBaseHook .fallback_to_default_project_id
@@ -230,10 +230,8 @@ def get_queue(
230
230
"""
231
231
client = self .get_conn ()
232
232
233
- full_queue_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } "
234
- return client .get_queue (
235
- request = {'name' : full_queue_name }, retry = retry , timeout = timeout , metadata = metadata or ()
236
- )
233
+ full_queue_name = CloudTasksClient .queue_path (project_id , location , queue_name )
234
+ return client .get_queue (name = full_queue_name , retry = retry , timeout = timeout , metadata = metadata )
237
235
238
236
@GoogleBaseHook .fallback_to_default_project_id
239
237
def list_queues (
@@ -272,12 +270,14 @@ def list_queues(
272
270
"""
273
271
client = self .get_conn ()
274
272
275
- full_location_path = f"projects/ { project_id } /locations/ { location } "
273
+ full_location_path = CloudTasksClient . location_path ( project_id , location )
276
274
queues = client .list_queues (
277
- request = {'parent' : full_location_path , 'filter' : results_filter , 'page_size' : page_size },
275
+ parent = full_location_path ,
276
+ filter_ = results_filter ,
277
+ page_size = page_size ,
278
278
retry = retry ,
279
279
timeout = timeout ,
280
- metadata = metadata or () ,
280
+ metadata = metadata ,
281
281
)
282
282
return list (queues )
283
283
@@ -313,10 +313,8 @@ def delete_queue(
313
313
"""
314
314
client = self .get_conn ()
315
315
316
- full_queue_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } "
317
- client .delete_queue (
318
- request = {'name' : full_queue_name }, retry = retry , timeout = timeout , metadata = metadata or ()
319
- )
316
+ full_queue_name = CloudTasksClient .queue_path (project_id , location , queue_name )
317
+ client .delete_queue (name = full_queue_name , retry = retry , timeout = timeout , metadata = metadata )
320
318
321
319
@GoogleBaseHook .fallback_to_default_project_id
322
320
def purge_queue (
@@ -351,10 +349,8 @@ def purge_queue(
351
349
"""
352
350
client = self .get_conn ()
353
351
354
- full_queue_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } "
355
- return client .purge_queue (
356
- request = {'name' : full_queue_name }, retry = retry , timeout = timeout , metadata = metadata or ()
357
- )
352
+ full_queue_name = CloudTasksClient .queue_path (project_id , location , queue_name )
353
+ return client .purge_queue (name = full_queue_name , retry = retry , timeout = timeout , metadata = metadata )
358
354
359
355
@GoogleBaseHook .fallback_to_default_project_id
360
356
def pause_queue (
@@ -389,10 +385,8 @@ def pause_queue(
389
385
"""
390
386
client = self .get_conn ()
391
387
392
- full_queue_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } "
393
- return client .pause_queue (
394
- request = {'name' : full_queue_name }, retry = retry , timeout = timeout , metadata = metadata or ()
395
- )
388
+ full_queue_name = CloudTasksClient .queue_path (project_id , location , queue_name )
389
+ return client .pause_queue (name = full_queue_name , retry = retry , timeout = timeout , metadata = metadata )
396
390
397
391
@GoogleBaseHook .fallback_to_default_project_id
398
392
def resume_queue (
@@ -427,10 +421,8 @@ def resume_queue(
427
421
"""
428
422
client = self .get_conn ()
429
423
430
- full_queue_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } "
431
- return client .resume_queue (
432
- request = {'name' : full_queue_name }, retry = retry , timeout = timeout , metadata = metadata or ()
433
- )
424
+ full_queue_name = CloudTasksClient .queue_path (project_id , location , queue_name )
425
+ return client .resume_queue (name = full_queue_name , retry = retry , timeout = timeout , metadata = metadata )
434
426
435
427
@GoogleBaseHook .fallback_to_default_project_id
436
428
def create_task (
@@ -440,7 +432,7 @@ def create_task(
440
432
task : Union [Dict , Task ],
441
433
project_id : str ,
442
434
task_name : Optional [str ] = None ,
443
- response_view : Optional = None ,
435
+ response_view : Optional [ enums . Task . View ] = None ,
444
436
retry : Optional [Retry ] = None ,
445
437
timeout : Optional [float ] = None ,
446
438
metadata : Optional [Sequence [Tuple [str , str ]]] = None ,
@@ -463,7 +455,7 @@ def create_task(
463
455
:type task_name: str
464
456
:param response_view: (Optional) This field specifies which subset of the Task will
465
457
be returned.
466
- :type response_view: google.cloud.tasks_v2.Task.View
458
+ :type response_view: google.cloud.tasks_v2.enums. Task.View
467
459
:param retry: (Optional) A retry object used to retry requests.
468
460
If None is specified, requests will not be retried.
469
461
:type retry: google.api_core.retry.Retry
@@ -478,21 +470,21 @@ def create_task(
478
470
client = self .get_conn ()
479
471
480
472
if task_name :
481
- full_task_name = (
482
- f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } /tasks/{ task_name } "
483
- )
473
+ full_task_name = CloudTasksClient .task_path (project_id , location , queue_name , task_name )
484
474
if isinstance (task , Task ):
485
475
task .name = full_task_name
486
476
elif isinstance (task , dict ):
487
477
task ['name' ] = full_task_name
488
478
else :
489
479
raise AirflowException ('Unable to set task_name.' )
490
- full_queue_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } "
480
+ full_queue_name = CloudTasksClient . queue_path ( project_id , location , queue_name )
491
481
return client .create_task (
492
- request = {'parent' : full_queue_name , 'task' : task , 'response_view' : response_view },
482
+ parent = full_queue_name ,
483
+ task = task ,
484
+ response_view = response_view ,
493
485
retry = retry ,
494
486
timeout = timeout ,
495
- metadata = metadata or () ,
487
+ metadata = metadata ,
496
488
)
497
489
498
490
@GoogleBaseHook .fallback_to_default_project_id
@@ -502,7 +494,7 @@ def get_task(
502
494
queue_name : str ,
503
495
task_name : str ,
504
496
project_id : str ,
505
- response_view : Optional = None ,
497
+ response_view : Optional [ enums . Task . View ] = None ,
506
498
retry : Optional [Retry ] = None ,
507
499
timeout : Optional [float ] = None ,
508
500
metadata : Optional [Sequence [Tuple [str , str ]]] = None ,
@@ -521,7 +513,7 @@ def get_task(
521
513
:type project_id: str
522
514
:param response_view: (Optional) This field specifies which subset of the Task will
523
515
be returned.
524
- :type response_view: google.cloud.tasks_v2.Task.View
516
+ :type response_view: google.cloud.tasks_v2.enums. Task.View
525
517
:param retry: (Optional) A retry object used to retry requests.
526
518
If None is specified, requests will not be retried.
527
519
:type retry: google.api_core.retry.Retry
@@ -535,12 +527,13 @@ def get_task(
535
527
"""
536
528
client = self .get_conn ()
537
529
538
- full_task_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } /tasks/ { task_name } "
530
+ full_task_name = CloudTasksClient . task_path ( project_id , location , queue_name , task_name )
539
531
return client .get_task (
540
- request = {'name' : full_task_name , 'response_view' : response_view },
532
+ name = full_task_name ,
533
+ response_view = response_view ,
541
534
retry = retry ,
542
535
timeout = timeout ,
543
- metadata = metadata or () ,
536
+ metadata = metadata ,
544
537
)
545
538
546
539
@GoogleBaseHook .fallback_to_default_project_id
@@ -549,7 +542,7 @@ def list_tasks(
549
542
location : str ,
550
543
queue_name : str ,
551
544
project_id : str ,
552
- response_view : Optional = None ,
545
+ response_view : Optional [ enums . Task . View ] = None ,
553
546
page_size : Optional [int ] = None ,
554
547
retry : Optional [Retry ] = None ,
555
548
timeout : Optional [float ] = None ,
@@ -567,7 +560,7 @@ def list_tasks(
567
560
:type project_id: str
568
561
:param response_view: (Optional) This field specifies which subset of the Task will
569
562
be returned.
570
- :type response_view: google.cloud.tasks_v2.Task.View
563
+ :type response_view: google.cloud.tasks_v2.enums. Task.View
571
564
:param page_size: (Optional) The maximum number of resources contained in the
572
565
underlying API response.
573
566
:type page_size: int
@@ -583,12 +576,14 @@ def list_tasks(
583
576
:rtype: list[google.cloud.tasks_v2.types.Task]
584
577
"""
585
578
client = self .get_conn ()
586
- full_queue_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } "
579
+ full_queue_name = CloudTasksClient . queue_path ( project_id , location , queue_name )
587
580
tasks = client .list_tasks (
588
- request = {'parent' : full_queue_name , 'response_view' : response_view , 'page_size' : page_size },
581
+ parent = full_queue_name ,
582
+ response_view = response_view ,
583
+ page_size = page_size ,
589
584
retry = retry ,
590
585
timeout = timeout ,
591
- metadata = metadata or () ,
586
+ metadata = metadata ,
592
587
)
593
588
return list (tasks )
594
589
@@ -627,10 +622,8 @@ def delete_task(
627
622
"""
628
623
client = self .get_conn ()
629
624
630
- full_task_name = f"projects/{ project_id } /locations/{ location } /queues/{ queue_name } /tasks/{ task_name } "
631
- client .delete_task (
632
- request = {'name' : full_task_name }, retry = retry , timeout = timeout , metadata = metadata or ()
633
- )
625
+ full_task_name = CloudTasksClient .task_path (project_id , location , queue_name , task_name )
626
+ client .delete_task (name = full_task_name , retry = retry , timeout = timeout , metadata = metadata )
634
627
635
628
@GoogleBaseHook .fallback_to_default_project_id
636
629
def run_task (
@@ -639,7 +632,7 @@ def run_task(
639
632
queue_name : str ,
640
633
task_name : str ,
641
634
project_id : str ,
642
- response_view : Optional = None ,
635
+ response_view : Optional [ enums . Task . View ] = None ,
643
636
retry : Optional [Retry ] = None ,
644
637
timeout : Optional [float ] = None ,
645
638
metadata : Optional [Sequence [Tuple [str , str ]]] = None ,
@@ -658,7 +651,7 @@ def run_task(
658
651
:type project_id: str
659
652
:param response_view: (Optional) This field specifies which subset of the Task will
660
653
be returned.
661
- :type response_view: google.cloud.tasks_v2.Task.View
654
+ :type response_view: google.cloud.tasks_v2.enums. Task.View
662
655
:param retry: (Optional) A retry object used to retry requests.
663
656
If None is specified, requests will not be retried.
664
657
:type retry: google.api_core.retry.Retry
@@ -672,10 +665,11 @@ def run_task(
672
665
"""
673
666
client = self .get_conn ()
674
667
675
- full_task_name = f"projects/ { project_id } /locations/ { location } /queues/ { queue_name } /tasks/ { task_name } "
668
+ full_task_name = CloudTasksClient . task_path ( project_id , location , queue_name , task_name )
676
669
return client .run_task (
677
- request = {'name' : full_task_name , 'response_view' : response_view },
670
+ name = full_task_name ,
671
+ response_view = response_view ,
678
672
retry = retry ,
679
673
timeout = timeout ,
680
- metadata = metadata or () ,
674
+ metadata = metadata ,
681
675
)
0 commit comments