gcpdiag.queries.bigquery

Queries related to BigQuery.
BIGQUERY_REGIONS = ['me-central1', 'me-central2', 'me-west1', 'africa-south1', 'us', 'eu', 'us-east1', 'us-east4', 'us-east5', 'us-west1', 'us-west2', 'us-west3', 'us-west4', 'us-central1', 'us-south1', 'northamerica-northeast1', 'northamerica-northeast2', 'southamerica-east1', 'southamerica-west1', 'asia-east1', 'asia-east2', 'asia-south1', 'asia-south2', 'asia-northeast1', 'asia-northeast2', 'asia-northeast3', 'asia-southeast1', 'asia-southeast2', 'australia-southeast1', 'australia-southeast2', 'europe-north1', 'europe-southwest1', 'europe-central2', 'europe-west1', 'europe-west10', 'europe-west2', 'europe-west3', 'europe-west4', 'europe-west6', 'europe-west8', 'europe-west9', 'europe-west12']
C_NOT_AVAILABLE = 'N/A'
def get_project_policy(project_id: str):
66def get_project_policy(project_id: str):
67  """Fetches the IAM policy object for a project."""
68  root_logger = logging.getLogger()
69  original_level = root_logger.level
70
71  try:
72    root_logger.setLevel(logging.ERROR)
73    policy = iam.get_project_policy(project_id, raise_error_if_fails=False)
74    return policy
75  except utils.GcpApiError:
76    return None
77  finally:
78    root_logger.setLevel(original_level)

Fetches the IAM policy object for a project.

def get_organization_policy(organization_id: str):
 81def get_organization_policy(organization_id: str):
 82  """Fetches the IAM policy object for an organization."""
 83  root_logger = logging.getLogger()
 84  original_level = root_logger.level
 85
 86  try:
 87    root_logger.setLevel(logging.ERROR)
 88    policy = iam.get_organization_policy(organization_id,
 89                                         raise_error_if_fails=False)
 90    return policy
 91  except utils.GcpApiError as err:
 92    if 'doesn\'t have access to' in err.message.lower(
 93    ) or 'denied on resource' in err.message.lower():
 94      op.info(
 95          'User does not have access to the organization policy. Investigation'
 96          ' completeness and accuracy might depend on the presence of'
 97          ' organization level permissions.')
 98    return None
 99  finally:
100    root_logger.setLevel(original_level)

Fetches the IAM policy object for an organization.

def check_permissions_for_principal( policy: Union[gcpdiag.queries.iam.ProjectPolicy, gcpdiag.queries.iam.OrganizationPolicy], principal: str, permissions_to_check: Set[str]) -> Dict[str, bool]:
103def check_permissions_for_principal(
104    policy: PolicyObject, principal: str,
105    permissions_to_check: Set[str]) -> Dict[str, bool]:
106  """Uses a policy object to check a set of permissions for a principal.
107
108  Returns a dictionary mapping each permission to a boolean indicating its
109  presence.
110  """
111  return {
112      permission: policy.has_permission(principal, permission)
113      for permission in permissions_to_check
114  }

Uses a policy object to check a set of permissions for a principal.

Returns a dictionary mapping each permission to a boolean indicating its presence.

def get_missing_permissions( required_permissions: Set[str], actual_permissions: Dict[str, bool]) -> Set[str]:
117def get_missing_permissions(required_permissions: Set[str],
118                            actual_permissions: Dict[str, bool]) -> Set[str]:
119  """Compares a set of required permissions against a dictionary of actual
120
121  permissions and returns the set of missing ones.
122  """
123  return {
124      perm for perm in required_permissions if not actual_permissions.get(perm)
125  }

Compares a set of required permissions against a dictionary of actual

permissions and returns the set of missing ones.

class BigQueryTable:
128class BigQueryTable:
129  """Represents a BigQuery Table object."""
130
131  project_id: str
132  dataset_id: str
133  table_id: str
134
135  def __init__(self, project_id: str, dataset_id: str, table_id: str):
136    self.project_id = project_id
137    self.dataset_id = dataset_id
138    self.table_id = table_id
139
140  @property
141  def table_identifier(self) -> str:
142    return f'{self.project_id}:{self.dataset_id}.{self.table_id}'

Represents a BigQuery Table object.

BigQueryTable(project_id: str, dataset_id: str, table_id: str)
135  def __init__(self, project_id: str, dataset_id: str, table_id: str):
136    self.project_id = project_id
137    self.dataset_id = dataset_id
138    self.table_id = table_id
project_id: str
dataset_id: str
table_id: str
table_identifier: str
140  @property
141  def table_identifier(self) -> str:
142    return f'{self.project_id}:{self.dataset_id}.{self.table_id}'
class BigQueryRoutine:
145class BigQueryRoutine:
146  """Represents a BigQuery Routine object."""
147
148  project_id: str
149  dataset_id: str
150  routine_id: str
151
152  def __init__(self, project_id: str, dataset_id: str, routine_id: str):
153    self.project_id = project_id
154    self.dataset_id = dataset_id
155    self.routine_id = routine_id
156
157  @property
158  def routine_identifier(self) -> str:
159    return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'

Represents a BigQuery Routine object.

BigQueryRoutine(project_id: str, dataset_id: str, routine_id: str)
152  def __init__(self, project_id: str, dataset_id: str, routine_id: str):
153    self.project_id = project_id
154    self.dataset_id = dataset_id
155    self.routine_id = routine_id
project_id: str
dataset_id: str
routine_id: str
routine_identifier: str
157  @property
158  def routine_identifier(self) -> str:
159    return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'
class BigQueryJob(gcpdiag.models.Resource):
162class BigQueryJob(models.Resource):
163  """Represents a BigQuery Job object."""
164
165  _job_api_resource_data: dict[str, Any]
166  _information_schema_job_metadata: dict[str, Any]
167  project_id: str
168
169  def __init__(
170      self,
171      project_id: str,
172      job_api_resource_data: dict[str, Any],
173      information_schema_job_metadata: dict[str, str],
174  ):
175    super().__init__(project_id)
176    self._job_api_resource_data = job_api_resource_data
177    self._information_schema_job_metadata = (information_schema_job_metadata or
178                                             {})
179
180  @property
181  def full_path(self) -> str:
182    # returns 'https://content-bigquery.googleapis.com/bigquery/v2/
183    # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>'
184    return self._job_api_resource_data.get('selfLink', '')
185
186  @property
187  def id(self) -> str:
188    # returns <PROJECT>:<REGION>.<JobID>
189    return self._job_api_resource_data.get('id', '')
190
191  @property
192  def short_path(self) -> str:
193    # returns <PROJECT>:<REGION>.<JobID>
194    return self.id
195
196  @property
197  def user_email(self) -> str:
198    return self._job_api_resource_data.get('user_email', '')
199
200  @property
201  def _job_configuration(self) -> dict[str, Any]:
202    return self._job_api_resource_data.get('configuration', {})
203
204  @property
205  def _query(self) -> dict[str, Any]:
206    return self._job_configuration.get('query', {})
207
208  @property
209  def _stats(self) -> dict[str, Any]:
210    """Safely access the 'statistics' dictionary."""
211    return self._job_api_resource_data.get('statistics', {})
212
213  @property
214  def _query_stats(self) -> dict[str, Any]:
215    """Safely access the 'statistics.query' dictionary."""
216    return self._stats.get('query', {})
217
218  @property
219  def _query_info(self) -> dict[str, Any]:
220    return self._query_stats.get('queryInfo', {})
221
222  @property
223  def _status(self) -> dict[str, Any]:
224    return self._job_api_resource_data.get('status', {})
225
226  @property
227  def job_type(self) -> str:
228    return self._job_configuration.get('jobType', '')
229
230  @property
231  def query_sql(self) -> str:
232    return self._query.get('query', '')
233
234  @property
235  def use_legacy_sql(self) -> bool:
236    return self._query.get('useLegacySql', False)
237
238  @property
239  def priority(self) -> str:
240    return self._query.get('priority', '')
241
242  @property
243  def edition(self) -> str:
244    edition_value = self._query.get('edition')
245    return str(edition_value) if edition_value else ''
246
247  @property
248  def creation_time(self) -> Optional[int]:
249    time_str = self._stats.get('creationTime')
250    return (int(time_str)
251            if isinstance(time_str, str) and time_str.isdigit() else None)
252
253  @property
254  def start_time(self) -> Optional[int]:
255    time_str = self._stats.get('startTime')
256    return (int(time_str)
257            if isinstance(time_str, str) and time_str.isdigit() else None)
258
259  @property
260  def end_time(self) -> Optional[int]:
261    time_str = self._stats.get('endTime')
262    return (int(time_str)
263            if isinstance(time_str, str) and time_str.isdigit() else None)
264
265  @property
266  def total_bytes_processed(self) -> int:
267    bytes_str = self._stats.get('totalBytesProcessed', '0')
268    return (int(bytes_str)
269            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
270
271  @property
272  def total_bytes_billed(self) -> int:
273    bytes_str = self._query_stats.get('totalBytesBilled', '0')
274    return (int(bytes_str)
275            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
276
277  @property
278  def total_slot_ms(self) -> int:
279    ms_str = self._stats.get('totalSlotMs', '0')
280    return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0
281
282  @property
283  def cache_hit(self) -> bool:
284    return self._query_stats.get('cacheHit') is True
285
286  @property
287  def quota_deferments(self) -> list[str]:
288    deferments_dict = self._stats.get('quotaDeferments', {})
289    if isinstance(deferments_dict, dict):
290      deferment_list = deferments_dict.get('', [])
291      if isinstance(deferment_list, list) and all(
292          isinstance(s, str) for s in deferment_list):
293        return deferment_list
294    return []
295
296  @property
297  def query_plan(self) -> list[dict[str, Any]]:
298    plan = self._query_stats.get('queryPlan', [])
299    return plan if isinstance(plan, list) else []
300
301  @property
302  def total_partitions_processed(self) -> int:
303    partitions_str = self._query_stats.get('totalPartitionsProcessed', '0')
304    return (int(partitions_str) if isinstance(partitions_str, str) and
305            partitions_str.isdigit() else 0)
306
307  @property
308  def referenced_tables(self) -> list[BigQueryTable]:
309    tables_list = self._query_stats.get('referencedTables', [])
310    referenced_tables = []
311    if isinstance(tables_list, list):
312      for item in tables_list:
313        if isinstance(item, dict):
314          project_id = item.get('projectId')
315          dataset_id = item.get('datasetId')
316          table_id = item.get('tableId')
317          if (isinstance(project_id, str) and project_id and
318              isinstance(dataset_id, str) and dataset_id and
319              isinstance(table_id, str) and table_id):
320            referenced_tables.append(
321                BigQueryTable(project_id, dataset_id, table_id))
322    return referenced_tables
323
324  @property
325  def referenced_routines(self) -> list[BigQueryRoutine]:
326    routines_list = self._query_stats.get('referencedRoutines', [])
327    referenced_routines = []
328    if isinstance(routines_list, list):
329      for item in routines_list:
330        if isinstance(item, dict):
331          project_id = item.get('projectId')
332          dataset_id = item.get('datasetId')
333          routine_id = item.get('routineId')
334          if (isinstance(project_id, str) and project_id and
335              isinstance(dataset_id, str) and dataset_id and
336              isinstance(routine_id, str) and routine_id):
337            referenced_routines.append(
338                BigQueryRoutine(project_id, dataset_id, routine_id))
339    return referenced_routines
340
341  @property
342  def num_affected_dml_rows(self) -> int:
343    rows_str = self._query_stats.get('numDmlAffectedRows', '0')
344    return (int(rows_str)
345            if isinstance(rows_str, str) and rows_str.isdigit() else 0)
346
347  @property
348  def dml_stats(self) -> dict[str, int]:
349    stats = self._query_stats.get('dmlStats')
350    if not isinstance(stats, dict):
351      return {}
352    inserted_str = stats.get('insertedRowCount', '0')
353    deleted_str = stats.get('deletedRowCount', '0')
354    updated_str = stats.get('updatedRowCount', '0')
355    return {
356        'insertedRowCount':
357            (int(inserted_str) if isinstance(inserted_str, str) and
358             inserted_str.isdigit() else 0),
359        'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and
360                            deleted_str.isdigit() else 0),
361        'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and
362                            updated_str.isdigit() else 0),
363    }
364
365  @property
366  def statement_type(self) -> str:
367    stype = self._query_stats.get('statementType', '')
368    return stype if isinstance(stype, str) else ''
369
370  @property
371  def bi_engine_statistics(self) -> dict[str, Any]:
372    stats = self._query_stats.get('biEngineStatistics')
373    if not isinstance(stats, dict):
374      return {}
375    reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', [])
376    bi_engine_reasons = []
377    if isinstance(reasons_list, list):
378      for item in reasons_list:
379        if isinstance(item, dict):
380          bi_engine_reasons.append({
381              'code': str(item.get('code', '')),
382              'message': item.get('message', ''),
383          })
384    return {
385        'biEngineMode': str(stats.get('biEngineMode', '')),
386        'accelerationMode': str(stats.get('accelerationMode', '')),
387        'biEngineReasons': bi_engine_reasons,
388    }
389
390  @property
391  def vector_search_statistics(self) -> dict[str, Any]:
392    stats = self._query_stats.get('vectorSearchStatistics')
393    if not isinstance(stats, dict):
394      return {}
395    reasons_list = stats.get('indexUnusedReasons', [])
396    index_unused_reasons = []
397    if isinstance(reasons_list, list):
398      for item in reasons_list:
399        if isinstance(item, dict):
400          base_table_data = item.get('baseTable')
401          base_table_obj = None
402          if isinstance(base_table_data, dict):
403            project_id = base_table_data.get('projectId')
404            dataset_id = base_table_data.get('datasetId')
405            table_id = base_table_data.get('tableId')
406            if (isinstance(project_id, str) and project_id and
407                isinstance(dataset_id, str) and dataset_id and
408                isinstance(table_id, str) and table_id):
409              base_table_obj = BigQueryTable(project_id, dataset_id, table_id)
410          index_unused_reasons.append({
411              'code': str(item.get('code', '')),
412              'message': item.get('message', ''),
413              'indexName': item.get('indexName', ''),
414              'baseTable': base_table_obj,
415          })
416    return {
417        'indexUsageMode': str(stats.get('indexUsageMode', '')),
418        'indexUnusedReasons': index_unused_reasons,
419    }
420
421  @property
422  def performance_insights(self) -> dict[str, Any]:
423    insights = self._query_stats.get('performanceInsights')
424    if not isinstance(insights, dict):
425      return {}
426    standalone_list = insights.get('stagePerformanceStandaloneInsights', [])
427    stage_performance_standalone_insights = []
428    if isinstance(standalone_list, list):
429      for item in standalone_list:
430        if isinstance(item, dict):
431          stage_performance_standalone_insights.append({
432              'stageId': item.get('stageId', ''),
433          })
434    change_list = insights.get('stagePerformanceChangeInsights', [])
435    stage_performance_change_insights = []
436    if isinstance(change_list, list):
437      for item in change_list:
438        if isinstance(item, dict):
439          stage_performance_change_insights.append({
440              'stageId': item.get('stageId', ''),
441          })
442    avg_ms_str = insights.get('avgPreviousExecutionMs', '0')
443    return {
444        'avgPreviousExecutionMs':
445            (int(avg_ms_str)
446             if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0),
447        'stagePerformanceStandaloneInsights':
448            (stage_performance_standalone_insights),
449        'stagePerformanceChangeInsights': stage_performance_change_insights,
450    }
451
452  @property
453  def optimization_details(self) -> Any:
454    return self._query_info.get('optimizationDetails')
455
456  @property
457  def export_data_statistics(self) -> dict[str, int]:
458    stats = self._query_stats.get('exportDataStatistics')
459    if not isinstance(stats, dict):
460      return {}
461    file_count_str = stats.get('fileCount', '0')
462    row_count_str = stats.get('rowCount', '0')
463    return {
464        'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and
465                      file_count_str.isdigit() else 0),
466        'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and
467                     row_count_str.isdigit() else 0),
468    }
469
470  @property
471  def load_query_statistics(self) -> dict[str, int]:
472    stats = self._query_stats.get('loadQueryStatistics')
473    if not isinstance(stats, dict):
474      return {}
475    input_files_str = stats.get('inputFiles', '0')
476    input_bytes_str = stats.get('inputFileBytes', '0')
477    output_rows_str = stats.get('outputRows', '0')
478    output_bytes_str = stats.get('outputBytes', '0')
479    bad_records_str = stats.get('badRecords', '0')
480    return {
481        'inputFiles':
482            (int(input_files_str) if isinstance(input_files_str, str) and
483             input_files_str.isdigit() else 0),
484        'inputFileBytes':
485            (int(input_bytes_str) if isinstance(input_bytes_str, str) and
486             input_bytes_str.isdigit() else 0),
487        'outputRows':
488            (int(output_rows_str) if isinstance(output_rows_str, str) and
489             output_rows_str.isdigit() else 0),
490        'outputBytes':
491            (int(output_bytes_str) if isinstance(output_bytes_str, str) and
492             output_bytes_str.isdigit() else 0),
493        'badRecords':
494            (int(bad_records_str) if isinstance(bad_records_str, str) and
495             bad_records_str.isdigit() else 0),
496    }
497
498  @property
499  def spark_statistics(self) -> dict[str, Any]:
500    stats = self._query_stats.get('sparkStatistics')
501    if not isinstance(stats, dict):
502      return {}
503    logging_info_dict = stats.get('loggingInfo', {})
504    logging_info = ({
505        'resourceType': logging_info_dict.get('resourceType', ''),
506        'projectId': logging_info_dict.get('projectId', ''),
507    } if isinstance(logging_info_dict, dict) else {})
508    return {
509        'endpoints': stats.get('endpoints', {}),
510        'sparkJobId': stats.get('sparkJobId', ''),
511        'sparkJobLocation': stats.get('sparkJobLocation', ''),
512        'kmsKeyName': stats.get('kmsKeyName', ''),
513        'gcsStagingBucket': stats.get('gcsStagingBucket', ''),
514        'loggingInfo': logging_info,
515    }
516
517  @property
518  def transferred_bytes(self) -> int:
519    bytes_str = self._query_stats.get('transferredBytes', '0')
520    return (int(bytes_str)
521            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
522
523  @property
524  def reservation_id(self) -> str:
525    res_id = self._stats.get('reservation_id', '')
526    return res_id if isinstance(res_id, str) else ''
527
528  @property
529  def reservation_admin_project_id(self) -> Optional[str]:
530    if not self.reservation_id:
531      return None
532    try:
533      parts = self.reservation_id.split('/')
534      if parts[0] == 'projects' and len(parts) >= 2:
535        return parts[1]
536      else:
537        logging.warning(
538            'Could not parse project ID from reservation_id: %s',
539            self.reservation_id,
540        )
541        return None
542    except (IndexError, AttributeError):
543      logging.warning(
544          'Could not parse project ID from reservation_id: %s',
545          self.reservation_id,
546      )
547      return None
548
549  @property
550  def num_child_jobs(self) -> int:
551    num_str = self._stats.get('numChildJobs', '0')
552    return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0
553
554  @property
555  def parent_job_id(self) -> str:
556    parent_id = self._stats.get('parentJobId', '')
557    return parent_id if isinstance(parent_id, str) else ''
558
559  @property
560  def row_level_security_applied(self) -> bool:
561    rls_stats = self._stats.get('RowLevelSecurityStatistics', {})
562    return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance(
563        rls_stats, dict) else False)
564
565  @property
566  def data_masking_applied(self) -> bool:
567    masking_stats = self._stats.get('dataMaskingStatistics', {})
568    return (masking_stats.get('dataMaskingApplied') is True if isinstance(
569        masking_stats, dict) else False)
570
571  @property
572  def session_id(self) -> str:
573    session_info = self._stats.get('sessionInfo', {})
574    session_id_val = (session_info.get('sessionId', '') if isinstance(
575        session_info, dict) else '')
576    return session_id_val if isinstance(session_id_val, str) else ''
577
578  @property
579  def final_execution_duration_ms(self) -> int:
580    duration_str = self._stats.get('finalExecutionDurationMs', '0')
581    return (int(duration_str)
582            if isinstance(duration_str, str) and duration_str.isdigit() else 0)
583
584  @property
585  def job_state(self) -> str:
586    state = self._status.get('state', '')
587    return state if isinstance(state, str) else ''
588
589  @property
590  def job_error_result(self) -> dict[str, Optional[str]]:
591    error_result = self._status.get('errorResult')
592    if not isinstance(error_result, dict):
593      return {}
594    return {
595        'reason': error_result.get('reason'),
596        'location': error_result.get('location'),
597        'debugInfo': error_result.get('debugInfo'),
598        'message': error_result.get('message'),
599    }
600
601  @property
602  def job_errors(self) -> list[dict[str, Optional[str]]]:
603    errors_list = self._status.get('errors', [])
604    errors_iterable = []
605    if isinstance(errors_list, list):
606      for item in errors_list:
607        if isinstance(item, dict):
608          errors_iterable.append({
609              'reason': item.get('reason'),
610              'location': item.get('location'),
611              'debugInfo': item.get('debugInfo'),
612              'message': item.get('message'),
613          })
614    return errors_iterable
615
616  @property
617  def materialized_view_statistics(self) -> dict[str, Any]:
618    stats_list = self._query_stats.get('materializedViewStatistics')
619    materialized_view = []
620    if isinstance(stats_list, list):
621      for item in stats_list:
622        if isinstance(item, dict):
623          table_ref_data = item.get('tableReference')
624          table_ref_obj = None
625          if isinstance(table_ref_data, dict):
626            project_id = table_ref_data.get('projectId')
627            dataset_id = table_ref_data.get('datasetId')
628            table_id = table_ref_data.get('tableId')
629            if (isinstance(project_id, str) and project_id and
630                isinstance(dataset_id, str) and dataset_id and
631                isinstance(table_id, str) and table_id):
632              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
633          chosen = item.get('chosen') is True
634          saved_str = item.get('estimatedBytesSaved', '0')
635          estimated_bytes_saved = (int(saved_str)
636                                   if isinstance(saved_str, str) and
637                                   saved_str.isdigit() else 0)
638          rejected_reason = str(item.get('rejectedReason', ''))
639          materialized_view.append({
640              'chosen': chosen,
641              'estimatedBytesSaved': estimated_bytes_saved,
642              'rejectedReason': rejected_reason,
643              'tableReference': table_ref_obj,
644          })
645    return {'materializedView': materialized_view}
646
647  @property
648  def metadata_cache_statistics(self) -> dict[str, Any]:
649    stats_list = self._query_stats.get('metadataCacheStatistics')
650    metadata_cache = []
651    if isinstance(stats_list, list):
652      for item in stats_list:
653        if isinstance(item, dict):
654          table_ref_data = item.get('tableReference')
655          table_ref_obj = None
656          if isinstance(table_ref_data, dict):
657            project_id = table_ref_data.get('projectId')
658            dataset_id = table_ref_data.get('datasetId')
659            table_id = table_ref_data.get('tableId')
660            if (isinstance(project_id, str) and project_id and
661                isinstance(dataset_id, str) and dataset_id and
662                isinstance(table_id, str) and table_id):
663              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
664          metadata_cache.append({
665              'explanation': item.get('explanation', ''),
666              'unusedReason': str(item.get('unusedReason', '')),
667              'tableReference': table_ref_obj,
668          })
669    return {'tableMetadataCacheUsage': metadata_cache}
670
671  # Properties derived from _information_schema_job_metadata
672  @property
673  def information_schema_user_email(self) -> str | None:
674    if not self._information_schema_job_metadata:
675      return C_NOT_AVAILABLE
676    return self._information_schema_job_metadata.get('user_email')
677
678  @property
679  def information_schema_start_time_str(self) -> str | None:
680    if not self._information_schema_job_metadata:
681      return C_NOT_AVAILABLE
682    return self._information_schema_job_metadata.get('start_time_str')
683
684  @property
685  def information_schema_end_time_str(self) -> str | None:
686    if not self._information_schema_job_metadata:
687      return C_NOT_AVAILABLE
688    return self._information_schema_job_metadata.get('end_time_str')
689
690  @property
691  def information_schema_query(self) -> str | None:
692    if not self._information_schema_job_metadata:
693      return C_NOT_AVAILABLE
694    return self._information_schema_job_metadata.get('query')
695
696  @property
697  def information_schema_total_modified_partitions(self) -> Union[int, str]:
698    """The total number of partitions the job modified.
699
700    This field is populated for LOAD and QUERY jobs.
701    """
702    if not self._information_schema_job_metadata:
703      return C_NOT_AVAILABLE
704    try:
705      total_modified_partitions = self._information_schema_job_metadata[
706          'total_modified_partitions']
707      return total_modified_partitions
708    except KeyError:
709      return C_NOT_AVAILABLE
710
711  @property
712  def information_schema_resource_warning(self) -> str:
713    """The warning message that appears if the resource usage during query
714
715    processing is above the internal threshold of the system.
716    """
717    if not self._information_schema_job_metadata:
718      return C_NOT_AVAILABLE
719    try:
720      resource_warning = self._information_schema_job_metadata['query_info'][
721          'resource_warning']
722      return resource_warning
723    except KeyError:
724      return C_NOT_AVAILABLE
725
726  @property
727  def information_schema_normalized_literals(self) -> str:
728    """Contains the hashes of the query."""
729    try:
730      query_hashes = self._information_schema_job_metadata['query_info'][
731          'query_hashes']['normalized_literals']
732      return query_hashes
733    except KeyError:
734      return C_NOT_AVAILABLE

Represents a BigQuery Job object.

BigQueryJob( project_id: str, job_api_resource_data: dict[str, typing.Any], information_schema_job_metadata: dict[str, str])
169  def __init__(
170      self,
171      project_id: str,
172      job_api_resource_data: dict[str, Any],
173      information_schema_job_metadata: dict[str, str],
174  ):
175    super().__init__(project_id)
176    self._job_api_resource_data = job_api_resource_data
177    self._information_schema_job_metadata = (information_schema_job_metadata or
178                                             {})
project_id: str
249  @property
250  def project_id(self) -> str:
251    """Project id (not project number)."""
252    return self._project_id

Project id (not project number).

full_path: str
180  @property
181  def full_path(self) -> str:
182    # returns 'https://content-bigquery.googleapis.com/bigquery/v2/
183    # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>'
184    return self._job_api_resource_data.get('selfLink', '')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

id: str
186  @property
187  def id(self) -> str:
188    # returns <PROJECT>:<REGION>.<JobID>
189    return self._job_api_resource_data.get('id', '')
short_path: str
191  @property
192  def short_path(self) -> str:
193    # returns <PROJECT>:<REGION>.<JobID>
194    return self.id

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

user_email: str
196  @property
197  def user_email(self) -> str:
198    return self._job_api_resource_data.get('user_email', '')
job_type: str
226  @property
227  def job_type(self) -> str:
228    return self._job_configuration.get('jobType', '')
query_sql: str
230  @property
231  def query_sql(self) -> str:
232    return self._query.get('query', '')
use_legacy_sql: bool
234  @property
235  def use_legacy_sql(self) -> bool:
236    return self._query.get('useLegacySql', False)
priority: str
238  @property
239  def priority(self) -> str:
240    return self._query.get('priority', '')
edition: str
242  @property
243  def edition(self) -> str:
244    edition_value = self._query.get('edition')
245    return str(edition_value) if edition_value else ''
creation_time: Optional[int]
247  @property
248  def creation_time(self) -> Optional[int]:
249    time_str = self._stats.get('creationTime')
250    return (int(time_str)
251            if isinstance(time_str, str) and time_str.isdigit() else None)
start_time: Optional[int]
253  @property
254  def start_time(self) -> Optional[int]:
255    time_str = self._stats.get('startTime')
256    return (int(time_str)
257            if isinstance(time_str, str) and time_str.isdigit() else None)
end_time: Optional[int]
259  @property
260  def end_time(self) -> Optional[int]:
261    time_str = self._stats.get('endTime')
262    return (int(time_str)
263            if isinstance(time_str, str) and time_str.isdigit() else None)
total_bytes_processed: int
265  @property
266  def total_bytes_processed(self) -> int:
267    bytes_str = self._stats.get('totalBytesProcessed', '0')
268    return (int(bytes_str)
269            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
total_bytes_billed: int
271  @property
272  def total_bytes_billed(self) -> int:
273    bytes_str = self._query_stats.get('totalBytesBilled', '0')
274    return (int(bytes_str)
275            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
total_slot_ms: int
277  @property
278  def total_slot_ms(self) -> int:
279    ms_str = self._stats.get('totalSlotMs', '0')
280    return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0
cache_hit: bool
282  @property
283  def cache_hit(self) -> bool:
284    return self._query_stats.get('cacheHit') is True
quota_deferments: list[str]
286  @property
287  def quota_deferments(self) -> list[str]:
288    deferments_dict = self._stats.get('quotaDeferments', {})
289    if isinstance(deferments_dict, dict):
290      deferment_list = deferments_dict.get('', [])
291      if isinstance(deferment_list, list) and all(
292          isinstance(s, str) for s in deferment_list):
293        return deferment_list
294    return []
query_plan: list[dict[str, typing.Any]]
296  @property
297  def query_plan(self) -> list[dict[str, Any]]:
298    plan = self._query_stats.get('queryPlan', [])
299    return plan if isinstance(plan, list) else []
total_partitions_processed: int
301  @property
302  def total_partitions_processed(self) -> int:
303    partitions_str = self._query_stats.get('totalPartitionsProcessed', '0')
304    return (int(partitions_str) if isinstance(partitions_str, str) and
305            partitions_str.isdigit() else 0)
referenced_tables: list[BigQueryTable]
307  @property
308  def referenced_tables(self) -> list[BigQueryTable]:
309    tables_list = self._query_stats.get('referencedTables', [])
310    referenced_tables = []
311    if isinstance(tables_list, list):
312      for item in tables_list:
313        if isinstance(item, dict):
314          project_id = item.get('projectId')
315          dataset_id = item.get('datasetId')
316          table_id = item.get('tableId')
317          if (isinstance(project_id, str) and project_id and
318              isinstance(dataset_id, str) and dataset_id and
319              isinstance(table_id, str) and table_id):
320            referenced_tables.append(
321                BigQueryTable(project_id, dataset_id, table_id))
322    return referenced_tables
referenced_routines: list[BigQueryRoutine]
324  @property
325  def referenced_routines(self) -> list[BigQueryRoutine]:
326    routines_list = self._query_stats.get('referencedRoutines', [])
327    referenced_routines = []
328    if isinstance(routines_list, list):
329      for item in routines_list:
330        if isinstance(item, dict):
331          project_id = item.get('projectId')
332          dataset_id = item.get('datasetId')
333          routine_id = item.get('routineId')
334          if (isinstance(project_id, str) and project_id and
335              isinstance(dataset_id, str) and dataset_id and
336              isinstance(routine_id, str) and routine_id):
337            referenced_routines.append(
338                BigQueryRoutine(project_id, dataset_id, routine_id))
339    return referenced_routines
num_affected_dml_rows: int
341  @property
342  def num_affected_dml_rows(self) -> int:
343    rows_str = self._query_stats.get('numDmlAffectedRows', '0')
344    return (int(rows_str)
345            if isinstance(rows_str, str) and rows_str.isdigit() else 0)
dml_stats: dict[str, int]
347  @property
348  def dml_stats(self) -> dict[str, int]:
349    stats = self._query_stats.get('dmlStats')
350    if not isinstance(stats, dict):
351      return {}
352    inserted_str = stats.get('insertedRowCount', '0')
353    deleted_str = stats.get('deletedRowCount', '0')
354    updated_str = stats.get('updatedRowCount', '0')
355    return {
356        'insertedRowCount':
357            (int(inserted_str) if isinstance(inserted_str, str) and
358             inserted_str.isdigit() else 0),
359        'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and
360                            deleted_str.isdigit() else 0),
361        'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and
362                            updated_str.isdigit() else 0),
363    }
statement_type: str
365  @property
366  def statement_type(self) -> str:
367    stype = self._query_stats.get('statementType', '')
368    return stype if isinstance(stype, str) else ''
bi_engine_statistics: dict[str, typing.Any]
370  @property
371  def bi_engine_statistics(self) -> dict[str, Any]:
372    stats = self._query_stats.get('biEngineStatistics')
373    if not isinstance(stats, dict):
374      return {}
375    reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', [])
376    bi_engine_reasons = []
377    if isinstance(reasons_list, list):
378      for item in reasons_list:
379        if isinstance(item, dict):
380          bi_engine_reasons.append({
381              'code': str(item.get('code', '')),
382              'message': item.get('message', ''),
383          })
384    return {
385        'biEngineMode': str(stats.get('biEngineMode', '')),
386        'accelerationMode': str(stats.get('accelerationMode', '')),
387        'biEngineReasons': bi_engine_reasons,
388    }
vector_search_statistics: dict[str, typing.Any]
390  @property
391  def vector_search_statistics(self) -> dict[str, Any]:
392    stats = self._query_stats.get('vectorSearchStatistics')
393    if not isinstance(stats, dict):
394      return {}
395    reasons_list = stats.get('indexUnusedReasons', [])
396    index_unused_reasons = []
397    if isinstance(reasons_list, list):
398      for item in reasons_list:
399        if isinstance(item, dict):
400          base_table_data = item.get('baseTable')
401          base_table_obj = None
402          if isinstance(base_table_data, dict):
403            project_id = base_table_data.get('projectId')
404            dataset_id = base_table_data.get('datasetId')
405            table_id = base_table_data.get('tableId')
406            if (isinstance(project_id, str) and project_id and
407                isinstance(dataset_id, str) and dataset_id and
408                isinstance(table_id, str) and table_id):
409              base_table_obj = BigQueryTable(project_id, dataset_id, table_id)
410          index_unused_reasons.append({
411              'code': str(item.get('code', '')),
412              'message': item.get('message', ''),
413              'indexName': item.get('indexName', ''),
414              'baseTable': base_table_obj,
415          })
416    return {
417        'indexUsageMode': str(stats.get('indexUsageMode', '')),
418        'indexUnusedReasons': index_unused_reasons,
419    }
performance_insights: dict[str, typing.Any]
421  @property
422  def performance_insights(self) -> dict[str, Any]:
423    insights = self._query_stats.get('performanceInsights')
424    if not isinstance(insights, dict):
425      return {}
426    standalone_list = insights.get('stagePerformanceStandaloneInsights', [])
427    stage_performance_standalone_insights = []
428    if isinstance(standalone_list, list):
429      for item in standalone_list:
430        if isinstance(item, dict):
431          stage_performance_standalone_insights.append({
432              'stageId': item.get('stageId', ''),
433          })
434    change_list = insights.get('stagePerformanceChangeInsights', [])
435    stage_performance_change_insights = []
436    if isinstance(change_list, list):
437      for item in change_list:
438        if isinstance(item, dict):
439          stage_performance_change_insights.append({
440              'stageId': item.get('stageId', ''),
441          })
442    avg_ms_str = insights.get('avgPreviousExecutionMs', '0')
443    return {
444        'avgPreviousExecutionMs':
445            (int(avg_ms_str)
446             if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0),
447        'stagePerformanceStandaloneInsights':
448            (stage_performance_standalone_insights),
449        'stagePerformanceChangeInsights': stage_performance_change_insights,
450    }
optimization_details: Any
452  @property
453  def optimization_details(self) -> Any:
454    return self._query_info.get('optimizationDetails')
export_data_statistics: dict[str, int]
456  @property
457  def export_data_statistics(self) -> dict[str, int]:
458    stats = self._query_stats.get('exportDataStatistics')
459    if not isinstance(stats, dict):
460      return {}
461    file_count_str = stats.get('fileCount', '0')
462    row_count_str = stats.get('rowCount', '0')
463    return {
464        'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and
465                      file_count_str.isdigit() else 0),
466        'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and
467                     row_count_str.isdigit() else 0),
468    }
load_query_statistics: dict[str, int]
470  @property
471  def load_query_statistics(self) -> dict[str, int]:
472    stats = self._query_stats.get('loadQueryStatistics')
473    if not isinstance(stats, dict):
474      return {}
475    input_files_str = stats.get('inputFiles', '0')
476    input_bytes_str = stats.get('inputFileBytes', '0')
477    output_rows_str = stats.get('outputRows', '0')
478    output_bytes_str = stats.get('outputBytes', '0')
479    bad_records_str = stats.get('badRecords', '0')
480    return {
481        'inputFiles':
482            (int(input_files_str) if isinstance(input_files_str, str) and
483             input_files_str.isdigit() else 0),
484        'inputFileBytes':
485            (int(input_bytes_str) if isinstance(input_bytes_str, str) and
486             input_bytes_str.isdigit() else 0),
487        'outputRows':
488            (int(output_rows_str) if isinstance(output_rows_str, str) and
489             output_rows_str.isdigit() else 0),
490        'outputBytes':
491            (int(output_bytes_str) if isinstance(output_bytes_str, str) and
492             output_bytes_str.isdigit() else 0),
493        'badRecords':
494            (int(bad_records_str) if isinstance(bad_records_str, str) and
495             bad_records_str.isdigit() else 0),
496    }
spark_statistics: dict[str, typing.Any]
498  @property
499  def spark_statistics(self) -> dict[str, Any]:
500    stats = self._query_stats.get('sparkStatistics')
501    if not isinstance(stats, dict):
502      return {}
503    logging_info_dict = stats.get('loggingInfo', {})
504    logging_info = ({
505        'resourceType': logging_info_dict.get('resourceType', ''),
506        'projectId': logging_info_dict.get('projectId', ''),
507    } if isinstance(logging_info_dict, dict) else {})
508    return {
509        'endpoints': stats.get('endpoints', {}),
510        'sparkJobId': stats.get('sparkJobId', ''),
511        'sparkJobLocation': stats.get('sparkJobLocation', ''),
512        'kmsKeyName': stats.get('kmsKeyName', ''),
513        'gcsStagingBucket': stats.get('gcsStagingBucket', ''),
514        'loggingInfo': logging_info,
515    }
transferred_bytes: int
517  @property
518  def transferred_bytes(self) -> int:
519    bytes_str = self._query_stats.get('transferredBytes', '0')
520    return (int(bytes_str)
521            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
reservation_id: str
523  @property
524  def reservation_id(self) -> str:
525    res_id = self._stats.get('reservation_id', '')
526    return res_id if isinstance(res_id, str) else ''
reservation_admin_project_id: Optional[str]
528  @property
529  def reservation_admin_project_id(self) -> Optional[str]:
530    if not self.reservation_id:
531      return None
532    try:
533      parts = self.reservation_id.split('/')
534      if parts[0] == 'projects' and len(parts) >= 2:
535        return parts[1]
536      else:
537        logging.warning(
538            'Could not parse project ID from reservation_id: %s',
539            self.reservation_id,
540        )
541        return None
542    except (IndexError, AttributeError):
543      logging.warning(
544          'Could not parse project ID from reservation_id: %s',
545          self.reservation_id,
546      )
547      return None
num_child_jobs: int
549  @property
550  def num_child_jobs(self) -> int:
551    num_str = self._stats.get('numChildJobs', '0')
552    return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0
parent_job_id: str
554  @property
555  def parent_job_id(self) -> str:
556    parent_id = self._stats.get('parentJobId', '')
557    return parent_id if isinstance(parent_id, str) else ''
row_level_security_applied: bool
559  @property
560  def row_level_security_applied(self) -> bool:
561    rls_stats = self._stats.get('RowLevelSecurityStatistics', {})
562    return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance(
563        rls_stats, dict) else False)
data_masking_applied: bool
565  @property
566  def data_masking_applied(self) -> bool:
567    masking_stats = self._stats.get('dataMaskingStatistics', {})
568    return (masking_stats.get('dataMaskingApplied') is True if isinstance(
569        masking_stats, dict) else False)
session_id: str
571  @property
572  def session_id(self) -> str:
573    session_info = self._stats.get('sessionInfo', {})
574    session_id_val = (session_info.get('sessionId', '') if isinstance(
575        session_info, dict) else '')
576    return session_id_val if isinstance(session_id_val, str) else ''
final_execution_duration_ms: int
578  @property
579  def final_execution_duration_ms(self) -> int:
580    duration_str = self._stats.get('finalExecutionDurationMs', '0')
581    return (int(duration_str)
582            if isinstance(duration_str, str) and duration_str.isdigit() else 0)
job_state: str
584  @property
585  def job_state(self) -> str:
586    state = self._status.get('state', '')
587    return state if isinstance(state, str) else ''
job_error_result: dict[str, typing.Optional[str]]
589  @property
590  def job_error_result(self) -> dict[str, Optional[str]]:
591    error_result = self._status.get('errorResult')
592    if not isinstance(error_result, dict):
593      return {}
594    return {
595        'reason': error_result.get('reason'),
596        'location': error_result.get('location'),
597        'debugInfo': error_result.get('debugInfo'),
598        'message': error_result.get('message'),
599    }
job_errors: list[dict[str, typing.Optional[str]]]
601  @property
602  def job_errors(self) -> list[dict[str, Optional[str]]]:
603    errors_list = self._status.get('errors', [])
604    errors_iterable = []
605    if isinstance(errors_list, list):
606      for item in errors_list:
607        if isinstance(item, dict):
608          errors_iterable.append({
609              'reason': item.get('reason'),
610              'location': item.get('location'),
611              'debugInfo': item.get('debugInfo'),
612              'message': item.get('message'),
613          })
614    return errors_iterable
materialized_view_statistics: dict[str, typing.Any]
616  @property
617  def materialized_view_statistics(self) -> dict[str, Any]:
618    stats_list = self._query_stats.get('materializedViewStatistics')
619    materialized_view = []
620    if isinstance(stats_list, list):
621      for item in stats_list:
622        if isinstance(item, dict):
623          table_ref_data = item.get('tableReference')
624          table_ref_obj = None
625          if isinstance(table_ref_data, dict):
626            project_id = table_ref_data.get('projectId')
627            dataset_id = table_ref_data.get('datasetId')
628            table_id = table_ref_data.get('tableId')
629            if (isinstance(project_id, str) and project_id and
630                isinstance(dataset_id, str) and dataset_id and
631                isinstance(table_id, str) and table_id):
632              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
633          chosen = item.get('chosen') is True
634          saved_str = item.get('estimatedBytesSaved', '0')
635          estimated_bytes_saved = (int(saved_str)
636                                   if isinstance(saved_str, str) and
637                                   saved_str.isdigit() else 0)
638          rejected_reason = str(item.get('rejectedReason', ''))
639          materialized_view.append({
640              'chosen': chosen,
641              'estimatedBytesSaved': estimated_bytes_saved,
642              'rejectedReason': rejected_reason,
643              'tableReference': table_ref_obj,
644          })
645    return {'materializedView': materialized_view}
metadata_cache_statistics: dict[str, typing.Any]
647  @property
648  def metadata_cache_statistics(self) -> dict[str, Any]:
649    stats_list = self._query_stats.get('metadataCacheStatistics')
650    metadata_cache = []
651    if isinstance(stats_list, list):
652      for item in stats_list:
653        if isinstance(item, dict):
654          table_ref_data = item.get('tableReference')
655          table_ref_obj = None
656          if isinstance(table_ref_data, dict):
657            project_id = table_ref_data.get('projectId')
658            dataset_id = table_ref_data.get('datasetId')
659            table_id = table_ref_data.get('tableId')
660            if (isinstance(project_id, str) and project_id and
661                isinstance(dataset_id, str) and dataset_id and
662                isinstance(table_id, str) and table_id):
663              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
664          metadata_cache.append({
665              'explanation': item.get('explanation', ''),
666              'unusedReason': str(item.get('unusedReason', '')),
667              'tableReference': table_ref_obj,
668          })
669    return {'tableMetadataCacheUsage': metadata_cache}
information_schema_user_email: str | None
672  @property
673  def information_schema_user_email(self) -> str | None:
674    if not self._information_schema_job_metadata:
675      return C_NOT_AVAILABLE
676    return self._information_schema_job_metadata.get('user_email')
information_schema_start_time_str: str | None
678  @property
679  def information_schema_start_time_str(self) -> str | None:
680    if not self._information_schema_job_metadata:
681      return C_NOT_AVAILABLE
682    return self._information_schema_job_metadata.get('start_time_str')
information_schema_end_time_str: str | None
684  @property
685  def information_schema_end_time_str(self) -> str | None:
686    if not self._information_schema_job_metadata:
687      return C_NOT_AVAILABLE
688    return self._information_schema_job_metadata.get('end_time_str')
information_schema_query: str | None
690  @property
691  def information_schema_query(self) -> str | None:
692    if not self._information_schema_job_metadata:
693      return C_NOT_AVAILABLE
694    return self._information_schema_job_metadata.get('query')
information_schema_total_modified_partitions: Union[int, str]
696  @property
697  def information_schema_total_modified_partitions(self) -> Union[int, str]:
698    """The total number of partitions the job modified.
699
700    This field is populated for LOAD and QUERY jobs.
701    """
702    if not self._information_schema_job_metadata:
703      return C_NOT_AVAILABLE
704    try:
705      total_modified_partitions = self._information_schema_job_metadata[
706          'total_modified_partitions']
707      return total_modified_partitions
708    except KeyError:
709      return C_NOT_AVAILABLE

The total number of partitions the job modified.

This field is populated for LOAD and QUERY jobs.

information_schema_resource_warning: str
711  @property
712  def information_schema_resource_warning(self) -> str:
713    """The warning message that appears if the resource usage during query
714
715    processing is above the internal threshold of the system.
716    """
717    if not self._information_schema_job_metadata:
718      return C_NOT_AVAILABLE
719    try:
720      resource_warning = self._information_schema_job_metadata['query_info'][
721          'resource_warning']
722      return resource_warning
723    except KeyError:
724      return C_NOT_AVAILABLE

The warning message that appears if the resource usage during query

processing is above the internal threshold of the system.

information_schema_normalized_literals: str
726  @property
727  def information_schema_normalized_literals(self) -> str:
728    """Contains the hashes of the query."""
729    try:
730      query_hashes = self._information_schema_job_metadata['query_info'][
731          'query_hashes']['normalized_literals']
732      return query_hashes
733    except KeyError:
734      return C_NOT_AVAILABLE

Contains the hashes of the query.

@caching.cached_api_call
def get_bigquery_job_api_resource_data(project_id: str, region: str, job_id: str) -> Optional[dict[str, Any]]:
737@caching.cached_api_call
738def get_bigquery_job_api_resource_data(
739    project_id: str,
740    region: str,
741    job_id: str,
742) -> Union[dict[str, Any], None]:
743  """Fetch a specific BigQuery job's raw API resource data."""
744  api = apis.get_api('bigquery', 'v2', project_id)
745  query_job = api.jobs().get(projectId=project_id,
746                             location=region,
747                             jobId=job_id)
748
749  try:
750    resp = query_job.execute(num_retries=config.API_RETRIES)
751    return resp
752  except errors.HttpError as err:
753    raise utils.GcpApiError(err) from err

Fetch a specific BigQuery job's raw API resource data.

@caching.cached_api_call
def get_information_schema_job_metadata( project_id: str, region: str, job_id: str, creation_time_milis: Optional[int] = None, skip_permission_check: bool = False) -> Optional[dict[str, Any]]:
756@caching.cached_api_call
757def get_information_schema_job_metadata(
758    project_id: str,
759    region: str,
760    job_id: str,
761    creation_time_milis: Optional[int] = None,
762    skip_permission_check: bool = False,
763) -> Optional[dict[str, Any]]:
764  """Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA."""
765  if not apis.is_enabled(project_id, 'bigquery'):
766    return None
767  user_email = ''
768  try:
769    user_email = apis.get_user_email()
770  except (RuntimeError, exceptions.DefaultCredentialsError):
771    pass
772  except AttributeError as err:
773    if (('has no attribute' in str(err)) and
774        ('with_quota_project' in str(err))):
775      op.info('Running the investigation within the GCA context.')
776  user = 'user:' + user_email
777  if not skip_permission_check:
778    try:
779      policy = iam.get_project_policy(project_id)
780      if (not policy.has_permission(user, 'bigquery.jobs.create')) or (
781          not policy.has_permission(user, 'bigquery.jobs.listAll')):
782        op.info(
783            f'WARNING: Unable to run INFORMATION_SCHEMA view analysis due to missing permissions.\
784            \nMake sure to grant {user_email} "bigquery.jobs.create" and "bigquery.jobs.listAll".\
785            \nContinuing the investigation with the BigQuery job metadata obtained from the API.'
786        )
787        return None
788    except utils.GcpApiError:
789      op.info(
790          'Attempting to query INFORMATION_SCHEMA with no knowledge of project'
791          ' level permissions        \n(due to missing'
792          ' resourcemanager.projects.get permission).')
793  else:
794    op.info(
795        'Attempting to query INFORMATION_SCHEMA without checking project level permissions.'
796    )
797  try:
798    creation_time_milis_filter = ' '
799    if creation_time_milis:
800      creation_time_milis_filter = (
801          f'AND creation_time = TIMESTAMP_MILLIS({creation_time_milis})')
802    query = f"""
803    SELECT
804        user_email, start_time, end_time, query
805      FROM
806        `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.JOBS
807      WHERE
808        job_id = '{job_id}'
809        {creation_time_milis_filter}
810      LIMIT 1
811    """
812    results = get_query_results(
813        project_id=project_id,
814        query=query,
815        location=region,
816        timeout_sec=30,
817        poll_interval_sec=2,  # Short poll interval
818    )
819    if not results or len(results) != 1:
820      # We cannot raise an exception otherwise tests that use get_bigquery_job would fail
821      # raise ValueError(f"Job {job_id} not found in INFORMATION_SCHEMA")
822      return None
823    return results[0]
824  except errors.HttpError as err:
825    logging.warning(
826        'Failed to retrieve INFORMATION_SCHEMA job metadata for job %s: %s',
827        job_id,
828        err,
829    )
830    return None
831  except KeyError as err:
832    logging.warning(
833        'Failed to parse INFORMATION_SCHEMA response for job %s: %s',
834        job_id,
835        err,
836    )
837    return None

Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.

def get_bigquery_job( project_id: str, region: str, job_id: str, skip_permission_check: bool = False) -> Optional[BigQueryJob]:
840def get_bigquery_job(
841    project_id: str,
842    region: str,
843    job_id: str,
844    skip_permission_check: bool = False) -> Union[BigQueryJob, None]:
845  """Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data."""
846  try:
847    job_api_resource_data = get_bigquery_job_api_resource_data(
848        project_id, region, job_id)
849    if not job_api_resource_data:
850      return None
851  except utils.GcpApiError as err:
852    # This will be returned when permissions to fetch a job are missing.
853    if 'permission' in err.message.lower():
854      user_email = ''
855      try:
856        user_email = apis.get_user_email()
857      except (RuntimeError, AttributeError,
858              exceptions.DefaultCredentialsError) as error:
859        if (('has no attribute' in str(error)) and
860            ('with_quota_project' in str(error))):
861          op.info('Running the investigation within the GCA context.')
862      logging.debug(('Could not retrieve BigQuery job %s.\
863          \n make sure to give the bigquery.jobs.get and bigquery.jobs.create permissions to %s',
864                     (project_id + ':' + region + '.' + job_id), user_email))
865      raise utils.GcpApiError(err)
866    # This will be returned when a job is not found.
867    elif 'not found' in err.message.lower():
868      job_id_string = project_id + ':' + region + '.' + job_id
869      logging.debug('Could not find BigQuery job %s', job_id_string)
870      return None
871    else:
872      logging.debug((
873          'Could not retrieve BigQuery job %s due to an issue calling the API. \
874            Please restart the investigation.',
875          (project_id + ':' + region + '.' + job_id)))
876      return None
877  information_schema_job_metadata = {}
878  job_creation_millis = None
879  creation_time_str = job_api_resource_data.get('statistics',
880                                                {}).get('creationTime')
881  if creation_time_str:
882    try:
883      job_creation_millis = int(creation_time_str)
884    except (ValueError, TypeError):
885      pass
886  information_schema_job_metadata = get_information_schema_job_metadata(
887      project_id, region, job_id, job_creation_millis, skip_permission_check)
888  return BigQueryJob(
889      project_id=project_id,
890      job_api_resource_data=job_api_resource_data,
891      information_schema_job_metadata=information_schema_job_metadata)

Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.

def get_query_results( project_id: str, query: str, location: Optional[str] = None, timeout_sec: int = 30, poll_interval_sec: int = 2) -> Optional[List[dict[str, Any]]]:
 931def get_query_results(
 932    project_id: str,
 933    query: str,
 934    location: Optional[str] = None,
 935    timeout_sec: int = 30,
 936    poll_interval_sec: int = 2,
 937) -> Optional[List[dict[str, Any]]]:
 938  """Executes a BigQuery query, waits for completion, and returns the results.
 939
 940  Args:
 941      project_id: The GCP project ID where the query should run.
 942      query: The SQL query string to execute.
 943      location: The location (e.g., 'US', 'EU', 'us-central1') where the job
 944        should run. If None, BigQuery defaults might apply, often based on
 945        dataset locations if referenced.
 946      timeout_sec: Maximum time in seconds to wait for the query job to
 947        complete.
 948      poll_interval_sec: Time in seconds between polling the job status.
 949
 950  Returns:
 951      A list of dictionaries representing the result rows, or None if the
 952      query fails, times out, or the API is disabled.
 953  Raises:
 954      utils.GcpApiError: If an unrecoverable API error occurs during job
 955                         insertion, status check, or result fetching.
 956  """
 957  if not apis.is_enabled(project_id, 'bigquery'):
 958    logging.warning('BigQuery API is not enabled in project %s.', project_id)
 959    return None
 960  api = apis.get_api('bigquery', 'v2', project_id)
 961  job_id = f'gcpdiag_query_{uuid.uuid4()}'
 962  job_body = {
 963      'jobReference': {
 964          'projectId': project_id,
 965          'jobId': job_id,
 966          'location': location,  # Location can be None
 967      },
 968      'configuration': {
 969          'query': {
 970              'query': query,
 971              'useLegacySql': False,
 972              # Consider adding priority, destinationTable, etc. if needed
 973          }
 974      },
 975  }
 976  try:
 977    logging.debug(
 978        'Starting BigQuery job %s in project %s, location %s',
 979        job_id,
 980        project_id,
 981        location or 'default',
 982    )
 983    insert_request = api.jobs().insert(projectId=project_id, body=job_body)
 984    insert_response = insert_request.execute(num_retries=config.API_RETRIES)
 985    job_ref = insert_response['jobReference']
 986    actual_job_id = job_ref['jobId']
 987    actual_location = job_ref.get('location')  # Get location assigned by BQ
 988    logging.debug('Job %s created. Polling for completion...', actual_job_id)
 989    start_time = time.time()
 990    while True:
 991      # Check for timeout
 992      if time.time() - start_time > timeout_sec:
 993        logging.error(
 994            'BigQuery job %s timed out after %d seconds.',
 995            actual_job_id,
 996            timeout_sec,
 997        )
 998        return None
 999      # Get job status
1000      logging.debug('>>> Getting job status for %s', actual_job_id)
1001      get_request = api.jobs().get(
1002          projectId=job_ref['projectId'],
1003          jobId=actual_job_id,
1004          location=actual_location,
1005      )
1006      job_status_response = get_request.execute(num_retries=config.API_RETRIES)
1007      status = job_status_response.get('status', {})
1008      logging.debug('>>> Job status: %s', status.get('state'))
1009      if status.get('state') == 'DONE':
1010        if status.get('errorResult'):
1011          error_info = status['errorResult']
1012          if 'User does not have permission to query table' in error_info.get(
1013              'message'):
1014            op.info(
1015                error_info.get('message')[15:] +
1016                '\nContinuing the investigation with the job metadata obtained from the API.'
1017            )
1018          else:
1019            error_info = status['errorResult']
1020            logging.error(
1021                'BigQuery job %s failed. Reason: %s, Message: %s',
1022                actual_job_id,
1023                error_info.get('reason'),
1024                error_info.get('message'),
1025            )
1026            # Log detailed errors if available
1027            for error in status.get('errors', []):
1028              logging.error(
1029                  '  - Detail: %s (Location: %s)',
1030                  error.get('message'),
1031                  error.get('location'),
1032              )
1033          return None
1034        else:
1035          logging.debug('BigQuery job %s completed successfully.',
1036                        actual_job_id)
1037          break  # Job finished successfully
1038      elif status.get('state') in ['PENDING', 'RUNNING']:
1039        logging.debug('>>> Job running, sleeping...')
1040        # Job still running, wait and poll again
1041        time.sleep(poll_interval_sec)
1042      else:
1043        # Unexpected state
1044        logging.error(
1045            'BigQuery job %s entered unexpected state: %s',
1046            actual_job_id,
1047            status.get('state', 'UNKNOWN'),
1048        )
1049        return None
1050    # Fetch results
1051    logging.debug('>>> Fetching results for job %s...',
1052                  actual_job_id)  # <-- ADD
1053    results_request = api.jobs().getQueryResults(
1054        projectId=job_ref['projectId'],
1055        jobId=actual_job_id,
1056        location=actual_location,
1057        # Add startIndex, maxResults for pagination if needed
1058    )
1059    results_response = results_request.execute(num_retries=config.API_RETRIES)
1060    # Check if job actually completed (getQueryResults might return before DONE sometimes)
1061    if not results_response.get('jobComplete', False):
1062      logging.warning(
1063          'getQueryResults returned jobComplete=False for job %s, results might'
1064          ' be incomplete.',
1065          actual_job_id,
1066      )
1067      # Decide if you want to wait longer or return potentially partial results
1068    rows = []
1069    if 'rows' in results_response and 'schema' in results_response:
1070      schema_fields = results_response['schema'].get('fields')
1071      if not schema_fields:
1072        return []
1073      for row_data in results_response['rows']:
1074        if 'f' in row_data:
1075          rows.append(_parse_row(schema_fields, row_data['f']))
1076    if results_response.get('pageToken'):
1077      logging.warning(
1078          'Query results for job %s are paginated, but pagination '
1079          'is not yet implemented.',
1080          actual_job_id,
1081      )
1082    return rows
1083  except errors.HttpError as err:
1084    logging.error('API error during BigQuery query execution for job %s: %s',
1085                  job_id, err)
1086    # Raise specific GcpApiError if needed for upstream handling
1087    raise utils.GcpApiError(err) from err
1088  except Exception as e:
1089    logging.exception(
1090        'Unexpected error during BigQuery query execution for job %s: %s',
1091        job_id,
1092        e,
1093    )
1094    # Re-raise or handle as appropriate
1095    raise

Executes a BigQuery query, waits for completion, and returns the results.

Arguments:
  • project_id: The GCP project ID where the query should run.
  • query: The SQL query string to execute.
  • location: The location (e.g., 'US', 'EU', 'us-central1') where the job should run. If None, BigQuery defaults might apply, often based on dataset locations if referenced.
  • timeout_sec: Maximum time in seconds to wait for the query job to complete.
  • poll_interval_sec: Time in seconds between polling the job status.
Returns:

A list of dictionaries representing the result rows, or None if the query fails, times out, or the API is disabled.

Raises:
  • utils.GcpApiError: If an unrecoverable API error occurs during job insertion, status check, or result fetching.
@caching.cached_api_call
def get_bigquery_project(project_id: str) -> gcpdiag.queries.crm.Project:
1098@caching.cached_api_call
1099def get_bigquery_project(project_id: str) -> crm.Project:
1100  """Attempts to retrieve project details for the supplied BigQuery project id or number.
1101
1102    If the project is found/accessible, it returns a Project object with the resource data.
1103    If the project cannot be retrieved, the application raises one of the exceptions below.
1104    The get_bigquery_project method avoids unnecessary printing of the error message to keep
1105    the user interface of the tool cleaner to focus on meaningful investigation results.
1106    Corresponding errors are handled gracefully downstream.
1107
1108    Args:
1109        project_id (str): The project id or number of
1110        the project (e.g., "123456789", "example-project").
1111
1112    Returns:
1113        Project: An object representing the BigQuery project's full details.
1114
1115    Raises:
1116        utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
1117
1118    Usage:
1119        When using project identifier from gcpdiag.models.Context
1120
1121        project = crm.get_project(context.project_id)
1122
1123        An unknown project identifier
1124        try:
1125          project = crm.get_project("123456789")
1126        except:
1127          # Handle exception
1128        else:
1129          # use project data
1130  """
1131  try:
1132    logging.debug('retrieving project %s ', project_id)
1133    crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id)
1134    request = crm_api.projects().get(name=f'projects/{project_id}')
1135    response = request.execute(num_retries=config.API_RETRIES)
1136  except errors.HttpError as e:
1137    error = utils.GcpApiError(response=e)
1138    raise error from e
1139  else:
1140    return crm.Project(resource_data=response)

Attempts to retrieve project details for the supplied BigQuery project id or number.

If the project is found/accessible, it returns a Project object with the resource data. If the project cannot be retrieved, the application raises one of the exceptions below. The get_bigquery_project method avoids unnecessary printing of the error message to keep the user interface of the tool cleaner to focus on meaningful investigation results. Corresponding errors are handled gracefully downstream.

Arguments:
  • project_id (str): The project id or number of
  • the project (e.g., "123456789", "example-project").
Returns:

Project: An object representing the BigQuery project's full details.

Raises:
  • utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
Usage:

When using project identifier from gcpdiag.models.Context

project = crm.get_project(context.project_id)

An unknown project identifier try: project = crm.get_project("123456789") except: # Handle exception else: # use project data