gcpdiag.queries.bigquery
66def get_project_policy(project_id: str): 67 """Fetches the IAM policy object for a project.""" 68 root_logger = logging.getLogger() 69 original_level = root_logger.level 70 71 try: 72 root_logger.setLevel(logging.ERROR) 73 policy = iam.get_project_policy(project_id, raise_error_if_fails=False) 74 return policy 75 except utils.GcpApiError: 76 return None 77 finally: 78 root_logger.setLevel(original_level)
Fetches the IAM policy object for a project.
81def get_organization_policy(organization_id: str): 82 """Fetches the IAM policy object for an organization.""" 83 root_logger = logging.getLogger() 84 original_level = root_logger.level 85 86 try: 87 root_logger.setLevel(logging.ERROR) 88 policy = iam.get_organization_policy(organization_id, 89 raise_error_if_fails=False) 90 return policy 91 except utils.GcpApiError as err: 92 if 'doesn\'t have access to' in err.message.lower( 93 ) or 'denied on resource' in err.message.lower(): 94 op.info( 95 'User does not have access to the organization policy. Investigation' 96 ' completeness and accuracy might depend on the presence of' 97 ' organization level permissions.') 98 return None 99 finally: 100 root_logger.setLevel(original_level)
Fetches the IAM policy object for an organization.
103def check_permissions_for_principal( 104 policy: PolicyObject, principal: str, 105 permissions_to_check: Set[str]) -> Dict[str, bool]: 106 """Uses a policy object to check a set of permissions for a principal. 107 108 Returns a dictionary mapping each permission to a boolean indicating its 109 presence. 110 """ 111 return { 112 permission: policy.has_permission(principal, permission) 113 for permission in permissions_to_check 114 }
Uses a policy object to check a set of permissions for a principal.
Returns a dictionary mapping each permission to a boolean indicating its presence.
117def get_missing_permissions(required_permissions: Set[str], 118 actual_permissions: Dict[str, bool]) -> Set[str]: 119 """Compares a set of required permissions against a dictionary of actual 120 121 permissions and returns the set of missing ones. 122 """ 123 return { 124 perm for perm in required_permissions if not actual_permissions.get(perm) 125 }
Compares a set of required permissions against a dictionary of actual
permissions and returns the set of missing ones.
128class BigQueryTable: 129 """Represents a BigQuery Table object.""" 130 131 project_id: str 132 dataset_id: str 133 table_id: str 134 135 def __init__(self, project_id: str, dataset_id: str, table_id: str): 136 self.project_id = project_id 137 self.dataset_id = dataset_id 138 self.table_id = table_id 139 140 @property 141 def table_identifier(self) -> str: 142 return f'{self.project_id}:{self.dataset_id}.{self.table_id}'
Represents a BigQuery Table object.
145class BigQueryRoutine: 146 """Represents a BigQuery Routine object.""" 147 148 project_id: str 149 dataset_id: str 150 routine_id: str 151 152 def __init__(self, project_id: str, dataset_id: str, routine_id: str): 153 self.project_id = project_id 154 self.dataset_id = dataset_id 155 self.routine_id = routine_id 156 157 @property 158 def routine_identifier(self) -> str: 159 return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'
Represents a BigQuery Routine object.
162class BigQueryJob(models.Resource): 163 """Represents a BigQuery Job object.""" 164 165 _job_api_resource_data: dict[str, Any] 166 _information_schema_job_metadata: dict[str, Any] 167 project_id: str 168 169 def __init__( 170 self, 171 project_id: str, 172 job_api_resource_data: dict[str, Any], 173 information_schema_job_metadata: dict[str, str], 174 ): 175 super().__init__(project_id) 176 self._job_api_resource_data = job_api_resource_data 177 self._information_schema_job_metadata = (information_schema_job_metadata or 178 {}) 179 180 @property 181 def full_path(self) -> str: 182 # returns 'https://content-bigquery.googleapis.com/bigquery/v2/ 183 # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>' 184 return self._job_api_resource_data.get('selfLink', '') 185 186 @property 187 def id(self) -> str: 188 # returns <PROJECT>:<REGION>.<JobID> 189 return self._job_api_resource_data.get('id', '') 190 191 @property 192 def short_path(self) -> str: 193 # returns <PROJECT>:<REGION>.<JobID> 194 return self.id 195 196 @property 197 def user_email(self) -> str: 198 return self._job_api_resource_data.get('user_email', '') 199 200 @property 201 def _job_configuration(self) -> dict[str, Any]: 202 return self._job_api_resource_data.get('configuration', {}) 203 204 @property 205 def _query(self) -> dict[str, Any]: 206 return self._job_configuration.get('query', {}) 207 208 @property 209 def _stats(self) -> dict[str, Any]: 210 """Safely access the 'statistics' dictionary.""" 211 return self._job_api_resource_data.get('statistics', {}) 212 213 @property 214 def _query_stats(self) -> dict[str, Any]: 215 """Safely access the 'statistics.query' dictionary.""" 216 return self._stats.get('query', {}) 217 218 @property 219 def _query_info(self) -> dict[str, Any]: 220 return self._query_stats.get('queryInfo', {}) 221 222 @property 223 def _status(self) -> dict[str, Any]: 224 return self._job_api_resource_data.get('status', {}) 225 226 @property 227 def job_type(self) -> str: 228 return self._job_configuration.get('jobType', '') 229 230 @property 231 def query_sql(self) -> str: 232 return self._query.get('query', '') 233 234 @property 235 def use_legacy_sql(self) -> bool: 236 return self._query.get('useLegacySql', False) 237 238 @property 239 def priority(self) -> str: 240 return self._query.get('priority', '') 241 242 @property 243 def edition(self) -> str: 244 edition_value = self._query.get('edition') 245 return str(edition_value) if edition_value else '' 246 247 @property 248 def creation_time(self) -> Optional[int]: 249 time_str = self._stats.get('creationTime') 250 return (int(time_str) 251 if isinstance(time_str, str) and time_str.isdigit() else None) 252 253 @property 254 def start_time(self) -> Optional[int]: 255 time_str = self._stats.get('startTime') 256 return (int(time_str) 257 if isinstance(time_str, str) and time_str.isdigit() else None) 258 259 @property 260 def end_time(self) -> Optional[int]: 261 time_str = self._stats.get('endTime') 262 return (int(time_str) 263 if isinstance(time_str, str) and time_str.isdigit() else None) 264 265 @property 266 def total_bytes_processed(self) -> int: 267 bytes_str = self._stats.get('totalBytesProcessed', '0') 268 return (int(bytes_str) 269 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 270 271 @property 272 def total_bytes_billed(self) -> int: 273 bytes_str = self._query_stats.get('totalBytesBilled', '0') 274 return (int(bytes_str) 275 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 276 277 @property 278 def total_slot_ms(self) -> int: 279 ms_str = self._stats.get('totalSlotMs', '0') 280 return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0 281 282 @property 283 def cache_hit(self) -> bool: 284 return self._query_stats.get('cacheHit') is True 285 286 @property 287 def quota_deferments(self) -> list[str]: 288 deferments_dict = self._stats.get('quotaDeferments', {}) 289 if isinstance(deferments_dict, dict): 290 deferment_list = deferments_dict.get('', []) 291 if isinstance(deferment_list, list) and all( 292 isinstance(s, str) for s in deferment_list): 293 return deferment_list 294 return [] 295 296 @property 297 def query_plan(self) -> list[dict[str, Any]]: 298 plan = self._query_stats.get('queryPlan', []) 299 return plan if isinstance(plan, list) else [] 300 301 @property 302 def total_partitions_processed(self) -> int: 303 partitions_str = self._query_stats.get('totalPartitionsProcessed', '0') 304 return (int(partitions_str) if isinstance(partitions_str, str) and 305 partitions_str.isdigit() else 0) 306 307 @property 308 def referenced_tables(self) -> list[BigQueryTable]: 309 tables_list = self._query_stats.get('referencedTables', []) 310 referenced_tables = [] 311 if isinstance(tables_list, list): 312 for item in tables_list: 313 if isinstance(item, dict): 314 project_id = item.get('projectId') 315 dataset_id = item.get('datasetId') 316 table_id = item.get('tableId') 317 if (isinstance(project_id, str) and project_id and 318 isinstance(dataset_id, str) and dataset_id and 319 isinstance(table_id, str) and table_id): 320 referenced_tables.append( 321 BigQueryTable(project_id, dataset_id, table_id)) 322 return referenced_tables 323 324 @property 325 def referenced_routines(self) -> list[BigQueryRoutine]: 326 routines_list = self._query_stats.get('referencedRoutines', []) 327 referenced_routines = [] 328 if isinstance(routines_list, list): 329 for item in routines_list: 330 if isinstance(item, dict): 331 project_id = item.get('projectId') 332 dataset_id = item.get('datasetId') 333 routine_id = item.get('routineId') 334 if (isinstance(project_id, str) and project_id and 335 isinstance(dataset_id, str) and dataset_id and 336 isinstance(routine_id, str) and routine_id): 337 referenced_routines.append( 338 BigQueryRoutine(project_id, dataset_id, routine_id)) 339 return referenced_routines 340 341 @property 342 def num_affected_dml_rows(self) -> int: 343 rows_str = self._query_stats.get('numDmlAffectedRows', '0') 344 return (int(rows_str) 345 if isinstance(rows_str, str) and rows_str.isdigit() else 0) 346 347 @property 348 def dml_stats(self) -> dict[str, int]: 349 stats = self._query_stats.get('dmlStats') 350 if not isinstance(stats, dict): 351 return {} 352 inserted_str = stats.get('insertedRowCount', '0') 353 deleted_str = stats.get('deletedRowCount', '0') 354 updated_str = stats.get('updatedRowCount', '0') 355 return { 356 'insertedRowCount': 357 (int(inserted_str) if isinstance(inserted_str, str) and 358 inserted_str.isdigit() else 0), 359 'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and 360 deleted_str.isdigit() else 0), 361 'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and 362 updated_str.isdigit() else 0), 363 } 364 365 @property 366 def statement_type(self) -> str: 367 stype = self._query_stats.get('statementType', '') 368 return stype if isinstance(stype, str) else '' 369 370 @property 371 def bi_engine_statistics(self) -> dict[str, Any]: 372 stats = self._query_stats.get('biEngineStatistics') 373 if not isinstance(stats, dict): 374 return {} 375 reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', []) 376 bi_engine_reasons = [] 377 if isinstance(reasons_list, list): 378 for item in reasons_list: 379 if isinstance(item, dict): 380 bi_engine_reasons.append({ 381 'code': str(item.get('code', '')), 382 'message': item.get('message', ''), 383 }) 384 return { 385 'biEngineMode': str(stats.get('biEngineMode', '')), 386 'accelerationMode': str(stats.get('accelerationMode', '')), 387 'biEngineReasons': bi_engine_reasons, 388 } 389 390 @property 391 def vector_search_statistics(self) -> dict[str, Any]: 392 stats = self._query_stats.get('vectorSearchStatistics') 393 if not isinstance(stats, dict): 394 return {} 395 reasons_list = stats.get('indexUnusedReasons', []) 396 index_unused_reasons = [] 397 if isinstance(reasons_list, list): 398 for item in reasons_list: 399 if isinstance(item, dict): 400 base_table_data = item.get('baseTable') 401 base_table_obj = None 402 if isinstance(base_table_data, dict): 403 project_id = base_table_data.get('projectId') 404 dataset_id = base_table_data.get('datasetId') 405 table_id = base_table_data.get('tableId') 406 if (isinstance(project_id, str) and project_id and 407 isinstance(dataset_id, str) and dataset_id and 408 isinstance(table_id, str) and table_id): 409 base_table_obj = BigQueryTable(project_id, dataset_id, table_id) 410 index_unused_reasons.append({ 411 'code': str(item.get('code', '')), 412 'message': item.get('message', ''), 413 'indexName': item.get('indexName', ''), 414 'baseTable': base_table_obj, 415 }) 416 return { 417 'indexUsageMode': str(stats.get('indexUsageMode', '')), 418 'indexUnusedReasons': index_unused_reasons, 419 } 420 421 @property 422 def performance_insights(self) -> dict[str, Any]: 423 insights = self._query_stats.get('performanceInsights') 424 if not isinstance(insights, dict): 425 return {} 426 standalone_list = insights.get('stagePerformanceStandaloneInsights', []) 427 stage_performance_standalone_insights = [] 428 if isinstance(standalone_list, list): 429 for item in standalone_list: 430 if isinstance(item, dict): 431 stage_performance_standalone_insights.append({ 432 'stageId': item.get('stageId', ''), 433 }) 434 change_list = insights.get('stagePerformanceChangeInsights', []) 435 stage_performance_change_insights = [] 436 if isinstance(change_list, list): 437 for item in change_list: 438 if isinstance(item, dict): 439 stage_performance_change_insights.append({ 440 'stageId': item.get('stageId', ''), 441 }) 442 avg_ms_str = insights.get('avgPreviousExecutionMs', '0') 443 return { 444 'avgPreviousExecutionMs': 445 (int(avg_ms_str) 446 if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0), 447 'stagePerformanceStandaloneInsights': 448 (stage_performance_standalone_insights), 449 'stagePerformanceChangeInsights': stage_performance_change_insights, 450 } 451 452 @property 453 def optimization_details(self) -> Any: 454 return self._query_info.get('optimizationDetails') 455 456 @property 457 def export_data_statistics(self) -> dict[str, int]: 458 stats = self._query_stats.get('exportDataStatistics') 459 if not isinstance(stats, dict): 460 return {} 461 file_count_str = stats.get('fileCount', '0') 462 row_count_str = stats.get('rowCount', '0') 463 return { 464 'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and 465 file_count_str.isdigit() else 0), 466 'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and 467 row_count_str.isdigit() else 0), 468 } 469 470 @property 471 def load_query_statistics(self) -> dict[str, int]: 472 stats = self._query_stats.get('loadQueryStatistics') 473 if not isinstance(stats, dict): 474 return {} 475 input_files_str = stats.get('inputFiles', '0') 476 input_bytes_str = stats.get('inputFileBytes', '0') 477 output_rows_str = stats.get('outputRows', '0') 478 output_bytes_str = stats.get('outputBytes', '0') 479 bad_records_str = stats.get('badRecords', '0') 480 return { 481 'inputFiles': 482 (int(input_files_str) if isinstance(input_files_str, str) and 483 input_files_str.isdigit() else 0), 484 'inputFileBytes': 485 (int(input_bytes_str) if isinstance(input_bytes_str, str) and 486 input_bytes_str.isdigit() else 0), 487 'outputRows': 488 (int(output_rows_str) if isinstance(output_rows_str, str) and 489 output_rows_str.isdigit() else 0), 490 'outputBytes': 491 (int(output_bytes_str) if isinstance(output_bytes_str, str) and 492 output_bytes_str.isdigit() else 0), 493 'badRecords': 494 (int(bad_records_str) if isinstance(bad_records_str, str) and 495 bad_records_str.isdigit() else 0), 496 } 497 498 @property 499 def spark_statistics(self) -> dict[str, Any]: 500 stats = self._query_stats.get('sparkStatistics') 501 if not isinstance(stats, dict): 502 return {} 503 logging_info_dict = stats.get('loggingInfo', {}) 504 logging_info = ({ 505 'resourceType': logging_info_dict.get('resourceType', ''), 506 'projectId': logging_info_dict.get('projectId', ''), 507 } if isinstance(logging_info_dict, dict) else {}) 508 return { 509 'endpoints': stats.get('endpoints', {}), 510 'sparkJobId': stats.get('sparkJobId', ''), 511 'sparkJobLocation': stats.get('sparkJobLocation', ''), 512 'kmsKeyName': stats.get('kmsKeyName', ''), 513 'gcsStagingBucket': stats.get('gcsStagingBucket', ''), 514 'loggingInfo': logging_info, 515 } 516 517 @property 518 def transferred_bytes(self) -> int: 519 bytes_str = self._query_stats.get('transferredBytes', '0') 520 return (int(bytes_str) 521 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 522 523 @property 524 def reservation_id(self) -> str: 525 res_id = self._stats.get('reservation_id', '') 526 return res_id if isinstance(res_id, str) else '' 527 528 @property 529 def reservation_admin_project_id(self) -> Optional[str]: 530 if not self.reservation_id: 531 return None 532 try: 533 parts = self.reservation_id.split('/') 534 if parts[0] == 'projects' and len(parts) >= 2: 535 return parts[1] 536 else: 537 logging.warning( 538 'Could not parse project ID from reservation_id: %s', 539 self.reservation_id, 540 ) 541 return None 542 except (IndexError, AttributeError): 543 logging.warning( 544 'Could not parse project ID from reservation_id: %s', 545 self.reservation_id, 546 ) 547 return None 548 549 @property 550 def num_child_jobs(self) -> int: 551 num_str = self._stats.get('numChildJobs', '0') 552 return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0 553 554 @property 555 def parent_job_id(self) -> str: 556 parent_id = self._stats.get('parentJobId', '') 557 return parent_id if isinstance(parent_id, str) else '' 558 559 @property 560 def row_level_security_applied(self) -> bool: 561 rls_stats = self._stats.get('RowLevelSecurityStatistics', {}) 562 return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance( 563 rls_stats, dict) else False) 564 565 @property 566 def data_masking_applied(self) -> bool: 567 masking_stats = self._stats.get('dataMaskingStatistics', {}) 568 return (masking_stats.get('dataMaskingApplied') is True if isinstance( 569 masking_stats, dict) else False) 570 571 @property 572 def session_id(self) -> str: 573 session_info = self._stats.get('sessionInfo', {}) 574 session_id_val = (session_info.get('sessionId', '') if isinstance( 575 session_info, dict) else '') 576 return session_id_val if isinstance(session_id_val, str) else '' 577 578 @property 579 def final_execution_duration_ms(self) -> int: 580 duration_str = self._stats.get('finalExecutionDurationMs', '0') 581 return (int(duration_str) 582 if isinstance(duration_str, str) and duration_str.isdigit() else 0) 583 584 @property 585 def job_state(self) -> str: 586 state = self._status.get('state', '') 587 return state if isinstance(state, str) else '' 588 589 @property 590 def job_error_result(self) -> dict[str, Optional[str]]: 591 error_result = self._status.get('errorResult') 592 if not isinstance(error_result, dict): 593 return {} 594 return { 595 'reason': error_result.get('reason'), 596 'location': error_result.get('location'), 597 'debugInfo': error_result.get('debugInfo'), 598 'message': error_result.get('message'), 599 } 600 601 @property 602 def job_errors(self) -> list[dict[str, Optional[str]]]: 603 errors_list = self._status.get('errors', []) 604 errors_iterable = [] 605 if isinstance(errors_list, list): 606 for item in errors_list: 607 if isinstance(item, dict): 608 errors_iterable.append({ 609 'reason': item.get('reason'), 610 'location': item.get('location'), 611 'debugInfo': item.get('debugInfo'), 612 'message': item.get('message'), 613 }) 614 return errors_iterable 615 616 @property 617 def materialized_view_statistics(self) -> dict[str, Any]: 618 stats_list = self._query_stats.get('materializedViewStatistics') 619 materialized_view = [] 620 if isinstance(stats_list, list): 621 for item in stats_list: 622 if isinstance(item, dict): 623 table_ref_data = item.get('tableReference') 624 table_ref_obj = None 625 if isinstance(table_ref_data, dict): 626 project_id = table_ref_data.get('projectId') 627 dataset_id = table_ref_data.get('datasetId') 628 table_id = table_ref_data.get('tableId') 629 if (isinstance(project_id, str) and project_id and 630 isinstance(dataset_id, str) and dataset_id and 631 isinstance(table_id, str) and table_id): 632 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 633 chosen = item.get('chosen') is True 634 saved_str = item.get('estimatedBytesSaved', '0') 635 estimated_bytes_saved = (int(saved_str) 636 if isinstance(saved_str, str) and 637 saved_str.isdigit() else 0) 638 rejected_reason = str(item.get('rejectedReason', '')) 639 materialized_view.append({ 640 'chosen': chosen, 641 'estimatedBytesSaved': estimated_bytes_saved, 642 'rejectedReason': rejected_reason, 643 'tableReference': table_ref_obj, 644 }) 645 return {'materializedView': materialized_view} 646 647 @property 648 def metadata_cache_statistics(self) -> dict[str, Any]: 649 stats_list = self._query_stats.get('metadataCacheStatistics') 650 metadata_cache = [] 651 if isinstance(stats_list, list): 652 for item in stats_list: 653 if isinstance(item, dict): 654 table_ref_data = item.get('tableReference') 655 table_ref_obj = None 656 if isinstance(table_ref_data, dict): 657 project_id = table_ref_data.get('projectId') 658 dataset_id = table_ref_data.get('datasetId') 659 table_id = table_ref_data.get('tableId') 660 if (isinstance(project_id, str) and project_id and 661 isinstance(dataset_id, str) and dataset_id and 662 isinstance(table_id, str) and table_id): 663 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 664 metadata_cache.append({ 665 'explanation': item.get('explanation', ''), 666 'unusedReason': str(item.get('unusedReason', '')), 667 'tableReference': table_ref_obj, 668 }) 669 return {'tableMetadataCacheUsage': metadata_cache} 670 671 # Properties derived from _information_schema_job_metadata 672 @property 673 def information_schema_user_email(self) -> str | None: 674 if not self._information_schema_job_metadata: 675 return C_NOT_AVAILABLE 676 return self._information_schema_job_metadata.get('user_email') 677 678 @property 679 def information_schema_start_time_str(self) -> str | None: 680 if not self._information_schema_job_metadata: 681 return C_NOT_AVAILABLE 682 return self._information_schema_job_metadata.get('start_time_str') 683 684 @property 685 def information_schema_end_time_str(self) -> str | None: 686 if not self._information_schema_job_metadata: 687 return C_NOT_AVAILABLE 688 return self._information_schema_job_metadata.get('end_time_str') 689 690 @property 691 def information_schema_query(self) -> str | None: 692 if not self._information_schema_job_metadata: 693 return C_NOT_AVAILABLE 694 return self._information_schema_job_metadata.get('query') 695 696 @property 697 def information_schema_total_modified_partitions(self) -> Union[int, str]: 698 """The total number of partitions the job modified. 699 700 This field is populated for LOAD and QUERY jobs. 701 """ 702 if not self._information_schema_job_metadata: 703 return C_NOT_AVAILABLE 704 try: 705 total_modified_partitions = self._information_schema_job_metadata[ 706 'total_modified_partitions'] 707 return total_modified_partitions 708 except KeyError: 709 return C_NOT_AVAILABLE 710 711 @property 712 def information_schema_resource_warning(self) -> str: 713 """The warning message that appears if the resource usage during query 714 715 processing is above the internal threshold of the system. 716 """ 717 if not self._information_schema_job_metadata: 718 return C_NOT_AVAILABLE 719 try: 720 resource_warning = self._information_schema_job_metadata['query_info'][ 721 'resource_warning'] 722 return resource_warning 723 except KeyError: 724 return C_NOT_AVAILABLE 725 726 @property 727 def information_schema_normalized_literals(self) -> str: 728 """Contains the hashes of the query.""" 729 try: 730 query_hashes = self._information_schema_job_metadata['query_info'][ 731 'query_hashes']['normalized_literals'] 732 return query_hashes 733 except KeyError: 734 return C_NOT_AVAILABLE
Represents a BigQuery Job object.
169 def __init__( 170 self, 171 project_id: str, 172 job_api_resource_data: dict[str, Any], 173 information_schema_job_metadata: dict[str, str], 174 ): 175 super().__init__(project_id) 176 self._job_api_resource_data = job_api_resource_data 177 self._information_schema_job_metadata = (information_schema_job_metadata or 178 {})
249 @property 250 def project_id(self) -> str: 251 """Project id (not project number).""" 252 return self._project_id
Project id (not project number).
180 @property 181 def full_path(self) -> str: 182 # returns 'https://content-bigquery.googleapis.com/bigquery/v2/ 183 # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>' 184 return self._job_api_resource_data.get('selfLink', '')
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
191 @property 192 def short_path(self) -> str: 193 # returns <PROJECT>:<REGION>.<JobID> 194 return self.id
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
286 @property 287 def quota_deferments(self) -> list[str]: 288 deferments_dict = self._stats.get('quotaDeferments', {}) 289 if isinstance(deferments_dict, dict): 290 deferment_list = deferments_dict.get('', []) 291 if isinstance(deferment_list, list) and all( 292 isinstance(s, str) for s in deferment_list): 293 return deferment_list 294 return []
307 @property 308 def referenced_tables(self) -> list[BigQueryTable]: 309 tables_list = self._query_stats.get('referencedTables', []) 310 referenced_tables = [] 311 if isinstance(tables_list, list): 312 for item in tables_list: 313 if isinstance(item, dict): 314 project_id = item.get('projectId') 315 dataset_id = item.get('datasetId') 316 table_id = item.get('tableId') 317 if (isinstance(project_id, str) and project_id and 318 isinstance(dataset_id, str) and dataset_id and 319 isinstance(table_id, str) and table_id): 320 referenced_tables.append( 321 BigQueryTable(project_id, dataset_id, table_id)) 322 return referenced_tables
324 @property 325 def referenced_routines(self) -> list[BigQueryRoutine]: 326 routines_list = self._query_stats.get('referencedRoutines', []) 327 referenced_routines = [] 328 if isinstance(routines_list, list): 329 for item in routines_list: 330 if isinstance(item, dict): 331 project_id = item.get('projectId') 332 dataset_id = item.get('datasetId') 333 routine_id = item.get('routineId') 334 if (isinstance(project_id, str) and project_id and 335 isinstance(dataset_id, str) and dataset_id and 336 isinstance(routine_id, str) and routine_id): 337 referenced_routines.append( 338 BigQueryRoutine(project_id, dataset_id, routine_id)) 339 return referenced_routines
347 @property 348 def dml_stats(self) -> dict[str, int]: 349 stats = self._query_stats.get('dmlStats') 350 if not isinstance(stats, dict): 351 return {} 352 inserted_str = stats.get('insertedRowCount', '0') 353 deleted_str = stats.get('deletedRowCount', '0') 354 updated_str = stats.get('updatedRowCount', '0') 355 return { 356 'insertedRowCount': 357 (int(inserted_str) if isinstance(inserted_str, str) and 358 inserted_str.isdigit() else 0), 359 'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and 360 deleted_str.isdigit() else 0), 361 'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and 362 updated_str.isdigit() else 0), 363 }
370 @property 371 def bi_engine_statistics(self) -> dict[str, Any]: 372 stats = self._query_stats.get('biEngineStatistics') 373 if not isinstance(stats, dict): 374 return {} 375 reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', []) 376 bi_engine_reasons = [] 377 if isinstance(reasons_list, list): 378 for item in reasons_list: 379 if isinstance(item, dict): 380 bi_engine_reasons.append({ 381 'code': str(item.get('code', '')), 382 'message': item.get('message', ''), 383 }) 384 return { 385 'biEngineMode': str(stats.get('biEngineMode', '')), 386 'accelerationMode': str(stats.get('accelerationMode', '')), 387 'biEngineReasons': bi_engine_reasons, 388 }
390 @property 391 def vector_search_statistics(self) -> dict[str, Any]: 392 stats = self._query_stats.get('vectorSearchStatistics') 393 if not isinstance(stats, dict): 394 return {} 395 reasons_list = stats.get('indexUnusedReasons', []) 396 index_unused_reasons = [] 397 if isinstance(reasons_list, list): 398 for item in reasons_list: 399 if isinstance(item, dict): 400 base_table_data = item.get('baseTable') 401 base_table_obj = None 402 if isinstance(base_table_data, dict): 403 project_id = base_table_data.get('projectId') 404 dataset_id = base_table_data.get('datasetId') 405 table_id = base_table_data.get('tableId') 406 if (isinstance(project_id, str) and project_id and 407 isinstance(dataset_id, str) and dataset_id and 408 isinstance(table_id, str) and table_id): 409 base_table_obj = BigQueryTable(project_id, dataset_id, table_id) 410 index_unused_reasons.append({ 411 'code': str(item.get('code', '')), 412 'message': item.get('message', ''), 413 'indexName': item.get('indexName', ''), 414 'baseTable': base_table_obj, 415 }) 416 return { 417 'indexUsageMode': str(stats.get('indexUsageMode', '')), 418 'indexUnusedReasons': index_unused_reasons, 419 }
421 @property 422 def performance_insights(self) -> dict[str, Any]: 423 insights = self._query_stats.get('performanceInsights') 424 if not isinstance(insights, dict): 425 return {} 426 standalone_list = insights.get('stagePerformanceStandaloneInsights', []) 427 stage_performance_standalone_insights = [] 428 if isinstance(standalone_list, list): 429 for item in standalone_list: 430 if isinstance(item, dict): 431 stage_performance_standalone_insights.append({ 432 'stageId': item.get('stageId', ''), 433 }) 434 change_list = insights.get('stagePerformanceChangeInsights', []) 435 stage_performance_change_insights = [] 436 if isinstance(change_list, list): 437 for item in change_list: 438 if isinstance(item, dict): 439 stage_performance_change_insights.append({ 440 'stageId': item.get('stageId', ''), 441 }) 442 avg_ms_str = insights.get('avgPreviousExecutionMs', '0') 443 return { 444 'avgPreviousExecutionMs': 445 (int(avg_ms_str) 446 if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0), 447 'stagePerformanceStandaloneInsights': 448 (stage_performance_standalone_insights), 449 'stagePerformanceChangeInsights': stage_performance_change_insights, 450 }
456 @property 457 def export_data_statistics(self) -> dict[str, int]: 458 stats = self._query_stats.get('exportDataStatistics') 459 if not isinstance(stats, dict): 460 return {} 461 file_count_str = stats.get('fileCount', '0') 462 row_count_str = stats.get('rowCount', '0') 463 return { 464 'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and 465 file_count_str.isdigit() else 0), 466 'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and 467 row_count_str.isdigit() else 0), 468 }
470 @property 471 def load_query_statistics(self) -> dict[str, int]: 472 stats = self._query_stats.get('loadQueryStatistics') 473 if not isinstance(stats, dict): 474 return {} 475 input_files_str = stats.get('inputFiles', '0') 476 input_bytes_str = stats.get('inputFileBytes', '0') 477 output_rows_str = stats.get('outputRows', '0') 478 output_bytes_str = stats.get('outputBytes', '0') 479 bad_records_str = stats.get('badRecords', '0') 480 return { 481 'inputFiles': 482 (int(input_files_str) if isinstance(input_files_str, str) and 483 input_files_str.isdigit() else 0), 484 'inputFileBytes': 485 (int(input_bytes_str) if isinstance(input_bytes_str, str) and 486 input_bytes_str.isdigit() else 0), 487 'outputRows': 488 (int(output_rows_str) if isinstance(output_rows_str, str) and 489 output_rows_str.isdigit() else 0), 490 'outputBytes': 491 (int(output_bytes_str) if isinstance(output_bytes_str, str) and 492 output_bytes_str.isdigit() else 0), 493 'badRecords': 494 (int(bad_records_str) if isinstance(bad_records_str, str) and 495 bad_records_str.isdigit() else 0), 496 }
498 @property 499 def spark_statistics(self) -> dict[str, Any]: 500 stats = self._query_stats.get('sparkStatistics') 501 if not isinstance(stats, dict): 502 return {} 503 logging_info_dict = stats.get('loggingInfo', {}) 504 logging_info = ({ 505 'resourceType': logging_info_dict.get('resourceType', ''), 506 'projectId': logging_info_dict.get('projectId', ''), 507 } if isinstance(logging_info_dict, dict) else {}) 508 return { 509 'endpoints': stats.get('endpoints', {}), 510 'sparkJobId': stats.get('sparkJobId', ''), 511 'sparkJobLocation': stats.get('sparkJobLocation', ''), 512 'kmsKeyName': stats.get('kmsKeyName', ''), 513 'gcsStagingBucket': stats.get('gcsStagingBucket', ''), 514 'loggingInfo': logging_info, 515 }
528 @property 529 def reservation_admin_project_id(self) -> Optional[str]: 530 if not self.reservation_id: 531 return None 532 try: 533 parts = self.reservation_id.split('/') 534 if parts[0] == 'projects' and len(parts) >= 2: 535 return parts[1] 536 else: 537 logging.warning( 538 'Could not parse project ID from reservation_id: %s', 539 self.reservation_id, 540 ) 541 return None 542 except (IndexError, AttributeError): 543 logging.warning( 544 'Could not parse project ID from reservation_id: %s', 545 self.reservation_id, 546 ) 547 return None
589 @property 590 def job_error_result(self) -> dict[str, Optional[str]]: 591 error_result = self._status.get('errorResult') 592 if not isinstance(error_result, dict): 593 return {} 594 return { 595 'reason': error_result.get('reason'), 596 'location': error_result.get('location'), 597 'debugInfo': error_result.get('debugInfo'), 598 'message': error_result.get('message'), 599 }
601 @property 602 def job_errors(self) -> list[dict[str, Optional[str]]]: 603 errors_list = self._status.get('errors', []) 604 errors_iterable = [] 605 if isinstance(errors_list, list): 606 for item in errors_list: 607 if isinstance(item, dict): 608 errors_iterable.append({ 609 'reason': item.get('reason'), 610 'location': item.get('location'), 611 'debugInfo': item.get('debugInfo'), 612 'message': item.get('message'), 613 }) 614 return errors_iterable
616 @property 617 def materialized_view_statistics(self) -> dict[str, Any]: 618 stats_list = self._query_stats.get('materializedViewStatistics') 619 materialized_view = [] 620 if isinstance(stats_list, list): 621 for item in stats_list: 622 if isinstance(item, dict): 623 table_ref_data = item.get('tableReference') 624 table_ref_obj = None 625 if isinstance(table_ref_data, dict): 626 project_id = table_ref_data.get('projectId') 627 dataset_id = table_ref_data.get('datasetId') 628 table_id = table_ref_data.get('tableId') 629 if (isinstance(project_id, str) and project_id and 630 isinstance(dataset_id, str) and dataset_id and 631 isinstance(table_id, str) and table_id): 632 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 633 chosen = item.get('chosen') is True 634 saved_str = item.get('estimatedBytesSaved', '0') 635 estimated_bytes_saved = (int(saved_str) 636 if isinstance(saved_str, str) and 637 saved_str.isdigit() else 0) 638 rejected_reason = str(item.get('rejectedReason', '')) 639 materialized_view.append({ 640 'chosen': chosen, 641 'estimatedBytesSaved': estimated_bytes_saved, 642 'rejectedReason': rejected_reason, 643 'tableReference': table_ref_obj, 644 }) 645 return {'materializedView': materialized_view}
647 @property 648 def metadata_cache_statistics(self) -> dict[str, Any]: 649 stats_list = self._query_stats.get('metadataCacheStatistics') 650 metadata_cache = [] 651 if isinstance(stats_list, list): 652 for item in stats_list: 653 if isinstance(item, dict): 654 table_ref_data = item.get('tableReference') 655 table_ref_obj = None 656 if isinstance(table_ref_data, dict): 657 project_id = table_ref_data.get('projectId') 658 dataset_id = table_ref_data.get('datasetId') 659 table_id = table_ref_data.get('tableId') 660 if (isinstance(project_id, str) and project_id and 661 isinstance(dataset_id, str) and dataset_id and 662 isinstance(table_id, str) and table_id): 663 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 664 metadata_cache.append({ 665 'explanation': item.get('explanation', ''), 666 'unusedReason': str(item.get('unusedReason', '')), 667 'tableReference': table_ref_obj, 668 }) 669 return {'tableMetadataCacheUsage': metadata_cache}
696 @property 697 def information_schema_total_modified_partitions(self) -> Union[int, str]: 698 """The total number of partitions the job modified. 699 700 This field is populated for LOAD and QUERY jobs. 701 """ 702 if not self._information_schema_job_metadata: 703 return C_NOT_AVAILABLE 704 try: 705 total_modified_partitions = self._information_schema_job_metadata[ 706 'total_modified_partitions'] 707 return total_modified_partitions 708 except KeyError: 709 return C_NOT_AVAILABLE
The total number of partitions the job modified.
This field is populated for LOAD and QUERY jobs.
711 @property 712 def information_schema_resource_warning(self) -> str: 713 """The warning message that appears if the resource usage during query 714 715 processing is above the internal threshold of the system. 716 """ 717 if not self._information_schema_job_metadata: 718 return C_NOT_AVAILABLE 719 try: 720 resource_warning = self._information_schema_job_metadata['query_info'][ 721 'resource_warning'] 722 return resource_warning 723 except KeyError: 724 return C_NOT_AVAILABLE
The warning message that appears if the resource usage during query
processing is above the internal threshold of the system.
726 @property 727 def information_schema_normalized_literals(self) -> str: 728 """Contains the hashes of the query.""" 729 try: 730 query_hashes = self._information_schema_job_metadata['query_info'][ 731 'query_hashes']['normalized_literals'] 732 return query_hashes 733 except KeyError: 734 return C_NOT_AVAILABLE
Contains the hashes of the query.
737@caching.cached_api_call 738def get_bigquery_job_api_resource_data( 739 project_id: str, 740 region: str, 741 job_id: str, 742) -> Union[dict[str, Any], None]: 743 """Fetch a specific BigQuery job's raw API resource data.""" 744 api = apis.get_api('bigquery', 'v2', project_id) 745 query_job = api.jobs().get(projectId=project_id, 746 location=region, 747 jobId=job_id) 748 749 try: 750 resp = query_job.execute(num_retries=config.API_RETRIES) 751 return resp 752 except errors.HttpError as err: 753 raise utils.GcpApiError(err) from err
Fetch a specific BigQuery job's raw API resource data.
756@caching.cached_api_call 757def get_information_schema_job_metadata( 758 project_id: str, 759 region: str, 760 job_id: str, 761 creation_time_milis: Optional[int] = None, 762 skip_permission_check: bool = False, 763) -> Optional[dict[str, Any]]: 764 """Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.""" 765 if not apis.is_enabled(project_id, 'bigquery'): 766 return None 767 user_email = '' 768 try: 769 user_email = apis.get_user_email() 770 except (RuntimeError, exceptions.DefaultCredentialsError): 771 pass 772 except AttributeError as err: 773 if (('has no attribute' in str(err)) and 774 ('with_quota_project' in str(err))): 775 op.info('Running the investigation within the GCA context.') 776 user = 'user:' + user_email 777 if not skip_permission_check: 778 try: 779 policy = iam.get_project_policy(project_id) 780 if (not policy.has_permission(user, 'bigquery.jobs.create')) or ( 781 not policy.has_permission(user, 'bigquery.jobs.listAll')): 782 op.info( 783 f'WARNING: Unable to run INFORMATION_SCHEMA view analysis due to missing permissions.\ 784 \nMake sure to grant {user_email} "bigquery.jobs.create" and "bigquery.jobs.listAll".\ 785 \nContinuing the investigation with the BigQuery job metadata obtained from the API.' 786 ) 787 return None 788 except utils.GcpApiError: 789 op.info( 790 'Attempting to query INFORMATION_SCHEMA with no knowledge of project' 791 ' level permissions \n(due to missing' 792 ' resourcemanager.projects.get permission).') 793 else: 794 op.info( 795 'Attempting to query INFORMATION_SCHEMA without checking project level permissions.' 796 ) 797 try: 798 creation_time_milis_filter = ' ' 799 if creation_time_milis: 800 creation_time_milis_filter = ( 801 f'AND creation_time = TIMESTAMP_MILLIS({creation_time_milis})') 802 query = f""" 803 SELECT 804 user_email, start_time, end_time, query 805 FROM 806 `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.JOBS 807 WHERE 808 job_id = '{job_id}' 809 {creation_time_milis_filter} 810 LIMIT 1 811 """ 812 results = get_query_results( 813 project_id=project_id, 814 query=query, 815 location=region, 816 timeout_sec=30, 817 poll_interval_sec=2, # Short poll interval 818 ) 819 if not results or len(results) != 1: 820 # We cannot raise an exception otherwise tests that use get_bigquery_job would fail 821 # raise ValueError(f"Job {job_id} not found in INFORMATION_SCHEMA") 822 return None 823 return results[0] 824 except errors.HttpError as err: 825 logging.warning( 826 'Failed to retrieve INFORMATION_SCHEMA job metadata for job %s: %s', 827 job_id, 828 err, 829 ) 830 return None 831 except KeyError as err: 832 logging.warning( 833 'Failed to parse INFORMATION_SCHEMA response for job %s: %s', 834 job_id, 835 err, 836 ) 837 return None
Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.
840def get_bigquery_job( 841 project_id: str, 842 region: str, 843 job_id: str, 844 skip_permission_check: bool = False) -> Union[BigQueryJob, None]: 845 """Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.""" 846 try: 847 job_api_resource_data = get_bigquery_job_api_resource_data( 848 project_id, region, job_id) 849 if not job_api_resource_data: 850 return None 851 except utils.GcpApiError as err: 852 # This will be returned when permissions to fetch a job are missing. 853 if 'permission' in err.message.lower(): 854 user_email = '' 855 try: 856 user_email = apis.get_user_email() 857 except (RuntimeError, AttributeError, 858 exceptions.DefaultCredentialsError) as error: 859 if (('has no attribute' in str(error)) and 860 ('with_quota_project' in str(error))): 861 op.info('Running the investigation within the GCA context.') 862 logging.debug(('Could not retrieve BigQuery job %s.\ 863 \n make sure to give the bigquery.jobs.get and bigquery.jobs.create permissions to %s', 864 (project_id + ':' + region + '.' + job_id), user_email)) 865 raise utils.GcpApiError(err) 866 # This will be returned when a job is not found. 867 elif 'not found' in err.message.lower(): 868 job_id_string = project_id + ':' + region + '.' + job_id 869 logging.debug('Could not find BigQuery job %s', job_id_string) 870 return None 871 else: 872 logging.debug(( 873 'Could not retrieve BigQuery job %s due to an issue calling the API. \ 874 Please restart the investigation.', 875 (project_id + ':' + region + '.' + job_id))) 876 return None 877 information_schema_job_metadata = {} 878 job_creation_millis = None 879 creation_time_str = job_api_resource_data.get('statistics', 880 {}).get('creationTime') 881 if creation_time_str: 882 try: 883 job_creation_millis = int(creation_time_str) 884 except (ValueError, TypeError): 885 pass 886 information_schema_job_metadata = get_information_schema_job_metadata( 887 project_id, region, job_id, job_creation_millis, skip_permission_check) 888 return BigQueryJob( 889 project_id=project_id, 890 job_api_resource_data=job_api_resource_data, 891 information_schema_job_metadata=information_schema_job_metadata)
Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.
931def get_query_results( 932 project_id: str, 933 query: str, 934 location: Optional[str] = None, 935 timeout_sec: int = 30, 936 poll_interval_sec: int = 2, 937) -> Optional[List[dict[str, Any]]]: 938 """Executes a BigQuery query, waits for completion, and returns the results. 939 940 Args: 941 project_id: The GCP project ID where the query should run. 942 query: The SQL query string to execute. 943 location: The location (e.g., 'US', 'EU', 'us-central1') where the job 944 should run. If None, BigQuery defaults might apply, often based on 945 dataset locations if referenced. 946 timeout_sec: Maximum time in seconds to wait for the query job to 947 complete. 948 poll_interval_sec: Time in seconds between polling the job status. 949 950 Returns: 951 A list of dictionaries representing the result rows, or None if the 952 query fails, times out, or the API is disabled. 953 Raises: 954 utils.GcpApiError: If an unrecoverable API error occurs during job 955 insertion, status check, or result fetching. 956 """ 957 if not apis.is_enabled(project_id, 'bigquery'): 958 logging.warning('BigQuery API is not enabled in project %s.', project_id) 959 return None 960 api = apis.get_api('bigquery', 'v2', project_id) 961 job_id = f'gcpdiag_query_{uuid.uuid4()}' 962 job_body = { 963 'jobReference': { 964 'projectId': project_id, 965 'jobId': job_id, 966 'location': location, # Location can be None 967 }, 968 'configuration': { 969 'query': { 970 'query': query, 971 'useLegacySql': False, 972 # Consider adding priority, destinationTable, etc. if needed 973 } 974 }, 975 } 976 try: 977 logging.debug( 978 'Starting BigQuery job %s in project %s, location %s', 979 job_id, 980 project_id, 981 location or 'default', 982 ) 983 insert_request = api.jobs().insert(projectId=project_id, body=job_body) 984 insert_response = insert_request.execute(num_retries=config.API_RETRIES) 985 job_ref = insert_response['jobReference'] 986 actual_job_id = job_ref['jobId'] 987 actual_location = job_ref.get('location') # Get location assigned by BQ 988 logging.debug('Job %s created. Polling for completion...', actual_job_id) 989 start_time = time.time() 990 while True: 991 # Check for timeout 992 if time.time() - start_time > timeout_sec: 993 logging.error( 994 'BigQuery job %s timed out after %d seconds.', 995 actual_job_id, 996 timeout_sec, 997 ) 998 return None 999 # Get job status 1000 logging.debug('>>> Getting job status for %s', actual_job_id) 1001 get_request = api.jobs().get( 1002 projectId=job_ref['projectId'], 1003 jobId=actual_job_id, 1004 location=actual_location, 1005 ) 1006 job_status_response = get_request.execute(num_retries=config.API_RETRIES) 1007 status = job_status_response.get('status', {}) 1008 logging.debug('>>> Job status: %s', status.get('state')) 1009 if status.get('state') == 'DONE': 1010 if status.get('errorResult'): 1011 error_info = status['errorResult'] 1012 if 'User does not have permission to query table' in error_info.get( 1013 'message'): 1014 op.info( 1015 error_info.get('message')[15:] + 1016 '\nContinuing the investigation with the job metadata obtained from the API.' 1017 ) 1018 else: 1019 error_info = status['errorResult'] 1020 logging.error( 1021 'BigQuery job %s failed. Reason: %s, Message: %s', 1022 actual_job_id, 1023 error_info.get('reason'), 1024 error_info.get('message'), 1025 ) 1026 # Log detailed errors if available 1027 for error in status.get('errors', []): 1028 logging.error( 1029 ' - Detail: %s (Location: %s)', 1030 error.get('message'), 1031 error.get('location'), 1032 ) 1033 return None 1034 else: 1035 logging.debug('BigQuery job %s completed successfully.', 1036 actual_job_id) 1037 break # Job finished successfully 1038 elif status.get('state') in ['PENDING', 'RUNNING']: 1039 logging.debug('>>> Job running, sleeping...') 1040 # Job still running, wait and poll again 1041 time.sleep(poll_interval_sec) 1042 else: 1043 # Unexpected state 1044 logging.error( 1045 'BigQuery job %s entered unexpected state: %s', 1046 actual_job_id, 1047 status.get('state', 'UNKNOWN'), 1048 ) 1049 return None 1050 # Fetch results 1051 logging.debug('>>> Fetching results for job %s...', 1052 actual_job_id) # <-- ADD 1053 results_request = api.jobs().getQueryResults( 1054 projectId=job_ref['projectId'], 1055 jobId=actual_job_id, 1056 location=actual_location, 1057 # Add startIndex, maxResults for pagination if needed 1058 ) 1059 results_response = results_request.execute(num_retries=config.API_RETRIES) 1060 # Check if job actually completed (getQueryResults might return before DONE sometimes) 1061 if not results_response.get('jobComplete', False): 1062 logging.warning( 1063 'getQueryResults returned jobComplete=False for job %s, results might' 1064 ' be incomplete.', 1065 actual_job_id, 1066 ) 1067 # Decide if you want to wait longer or return potentially partial results 1068 rows = [] 1069 if 'rows' in results_response and 'schema' in results_response: 1070 schema_fields = results_response['schema'].get('fields') 1071 if not schema_fields: 1072 return [] 1073 for row_data in results_response['rows']: 1074 if 'f' in row_data: 1075 rows.append(_parse_row(schema_fields, row_data['f'])) 1076 if results_response.get('pageToken'): 1077 logging.warning( 1078 'Query results for job %s are paginated, but pagination ' 1079 'is not yet implemented.', 1080 actual_job_id, 1081 ) 1082 return rows 1083 except errors.HttpError as err: 1084 logging.error('API error during BigQuery query execution for job %s: %s', 1085 job_id, err) 1086 # Raise specific GcpApiError if needed for upstream handling 1087 raise utils.GcpApiError(err) from err 1088 except Exception as e: 1089 logging.exception( 1090 'Unexpected error during BigQuery query execution for job %s: %s', 1091 job_id, 1092 e, 1093 ) 1094 # Re-raise or handle as appropriate 1095 raise
Executes a BigQuery query, waits for completion, and returns the results.
Arguments:
- project_id: The GCP project ID where the query should run.
- query: The SQL query string to execute.
- location: The location (e.g., 'US', 'EU', 'us-central1') where the job should run. If None, BigQuery defaults might apply, often based on dataset locations if referenced.
- timeout_sec: Maximum time in seconds to wait for the query job to complete.
- poll_interval_sec: Time in seconds between polling the job status.
Returns:
A list of dictionaries representing the result rows, or None if the query fails, times out, or the API is disabled.
Raises:
- utils.GcpApiError: If an unrecoverable API error occurs during job insertion, status check, or result fetching.
1098@caching.cached_api_call 1099def get_bigquery_project(project_id: str) -> crm.Project: 1100 """Attempts to retrieve project details for the supplied BigQuery project id or number. 1101 1102 If the project is found/accessible, it returns a Project object with the resource data. 1103 If the project cannot be retrieved, the application raises one of the exceptions below. 1104 The get_bigquery_project method avoids unnecessary printing of the error message to keep 1105 the user interface of the tool cleaner to focus on meaningful investigation results. 1106 Corresponding errors are handled gracefully downstream. 1107 1108 Args: 1109 project_id (str): The project id or number of 1110 the project (e.g., "123456789", "example-project"). 1111 1112 Returns: 1113 Project: An object representing the BigQuery project's full details. 1114 1115 Raises: 1116 utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API. 1117 1118 Usage: 1119 When using project identifier from gcpdiag.models.Context 1120 1121 project = crm.get_project(context.project_id) 1122 1123 An unknown project identifier 1124 try: 1125 project = crm.get_project("123456789") 1126 except: 1127 # Handle exception 1128 else: 1129 # use project data 1130 """ 1131 try: 1132 logging.debug('retrieving project %s ', project_id) 1133 crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id) 1134 request = crm_api.projects().get(name=f'projects/{project_id}') 1135 response = request.execute(num_retries=config.API_RETRIES) 1136 except errors.HttpError as e: 1137 error = utils.GcpApiError(response=e) 1138 raise error from e 1139 else: 1140 return crm.Project(resource_data=response)
Attempts to retrieve project details for the supplied BigQuery project id or number.
If the project is found/accessible, it returns a Project object with the resource data. If the project cannot be retrieved, the application raises one of the exceptions below. The get_bigquery_project method avoids unnecessary printing of the error message to keep the user interface of the tool cleaner to focus on meaningful investigation results. Corresponding errors are handled gracefully downstream.
Arguments:
- project_id (str): The project id or number of
- the project (e.g., "123456789", "example-project").
Returns:
Project: An object representing the BigQuery project's full details.
Raises:
- utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
Usage:
When using project identifier from gcpdiag.models.Context
project = crm.get_project(context.project_id)
An unknown project identifier try: project = crm.get_project("123456789") except: # Handle exception else: # use project data