diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 6b9fa308d8..3ca983c2fe 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -69,7 +69,7 @@ def _output_bq_type(self): def _create_udf(self): """Create Python UDF in BQ. Return name of the UDF.""" udf_name = str( - self._session._loader._storage_manager.generate_unique_resource_id() + self._session._anon_dataset_manager.generate_unique_resource_id() ) func_body = inspect.getsource(self._func) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index bc56a53196..2056c192ad 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3775,7 +3775,7 @@ def to_gbq( # The client code owns this table reference now temp_table_ref = ( - self._session._temp_storage_manager.generate_unique_resource_id() + self._session._anon_dataset_manager.generate_unique_resource_id() ) destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c0eebc0299..330c585419 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -70,13 +70,14 @@ import bigframes.dtypes import bigframes.functions._function_session as bff_session import bigframes.functions.function as bff +from bigframes.session import bigquery_session import bigframes.session._io.bigquery as bf_io_bigquery +import bigframes.session.anonymous_dataset import bigframes.session.clients import bigframes.session.executor import bigframes.session.loader import bigframes.session.metrics import bigframes.session.planner -import bigframes.session.temp_storage import bigframes.session.validation # Avoid circular imports. @@ -247,14 +248,26 @@ def __init__( self._metrics = bigframes.session.metrics.ExecutionMetrics() self._function_session = bff_session.FunctionSession() - self._temp_storage_manager = ( - bigframes.session.temp_storage.AnonymousDatasetManager( + self._anon_dataset_manager = ( + bigframes.session.anonymous_dataset.AnonymousDatasetManager( self._clients_provider.bqclient, location=self._location, session_id=self._session_id, kms_key=self._bq_kms_key_name, ) ) + # Session temp tables don't support specifying kms key, so use anon dataset if kms key specified + self._session_resource_manager = ( + bigquery_session.SessionResourceManager( + self.bqclient, + self._location, + ) + if (self._bq_kms_key_name is None) + else None + ) + self._temp_storage_manager = ( + self._session_resource_manager or self._anon_dataset_manager + ) self._executor: bigframes.session.executor.Executor = ( bigframes.session.executor.BigQueryCachingExecutor( bqclient=self._clients_provider.bqclient, @@ -375,7 +388,7 @@ def _allows_ambiguity(self) -> bool: @property def _anonymous_dataset(self): - return self._temp_storage_manager.dataset + return self._anon_dataset_manager.dataset def __hash__(self): # Stable hash needed to use in expression tree @@ -388,9 +401,11 @@ def close(self): # Protect against failure when the Session is a fake for testing or # failed to initialize. - temp_storage_manager = getattr(self, "_temp_storage_manager", None) - if temp_storage_manager: - self._temp_storage_manager.clean_up_tables() + if anon_dataset_manager := getattr(self, "_anon_dataset_manager", None): + anon_dataset_manager.close() + + if session_resource_manager := getattr(self, "_session_resource_manager", None): + session_resource_manager.close() remote_function_session = getattr(self, "_function_session", None) if remote_function_session: @@ -906,8 +921,6 @@ def read_csv( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager.allocate_temp_table() - if engine is not None and engine == "bigquery": if any(param is not None for param in (dtype, names)): not_supported = ("dtype", "names") @@ -967,9 +980,7 @@ def read_csv( ) job_config = bigquery.LoadJobConfig() - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.source_format = bigquery.SourceFormat.CSV - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.autodetect = True job_config.field_delimiter = sep job_config.encoding = encoding @@ -983,9 +994,8 @@ def read_csv( elif header > 0: job_config.skip_leading_rows = header - return self._loader._read_bigquery_load_job( + return self._loader.read_bigquery_load_job( filepath_or_buffer, - table, job_config=job_config, index_col=index_col, columns=columns, @@ -1052,18 +1062,12 @@ def read_parquet( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager.allocate_temp_table() - if engine == "bigquery": job_config = bigquery.LoadJobConfig() - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.labels = {"bigframes-api": "read_parquet"} - return self._loader._read_bigquery_load_job( - path, table, job_config=job_config - ) + return self._loader.read_bigquery_load_job(path, job_config=job_config) else: if "*" in path: raise ValueError( @@ -1106,8 +1110,6 @@ def read_json( engine=engine, write_engine=write_engine, ) - table = self._temp_storage_manager.allocate_temp_table() - if engine == "bigquery": if dtype is not None: @@ -1131,16 +1133,13 @@ def read_json( ) job_config = bigquery.LoadJobConfig() - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.autodetect = True job_config.encoding = encoding job_config.labels = {"bigframes-api": "read_json"} - return self._loader._read_bigquery_load_job( + return self._loader.read_bigquery_load_job( path_or_buf, - table, job_config=job_config, ) else: @@ -1713,7 +1712,7 @@ def _start_query_ml_ddl( def _create_object_table(self, path: str, connection: str) -> str: """Create a random id Object Table from the input path and connection.""" - table = str(self._loader._storage_manager.generate_unique_resource_id()) + table = str(self._anon_dataset_manager.generate_unique_resource_id()) import textwrap diff --git a/bigframes/session/temp_storage.py b/bigframes/session/anonymous_dataset.py similarity index 89% rename from bigframes/session/temp_storage.py rename to bigframes/session/anonymous_dataset.py index 3b2965efef..c5808aa63c 100644 --- a/bigframes/session/temp_storage.py +++ b/bigframes/session/anonymous_dataset.py @@ -18,13 +18,14 @@ import google.cloud.bigquery as bigquery -import bigframes.constants as constants +from bigframes import constants +from bigframes.session import temporary_storage import bigframes.session._io.bigquery as bf_io_bigquery _TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}" -class AnonymousDatasetManager: +class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager): """ Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session. """ @@ -38,10 +39,10 @@ def __init__( kms_key: Optional[str] = None ): self.bqclient = bqclient - self.location = location + self._location = location self.dataset = bf_io_bigquery.create_bq_dataset_reference( self.bqclient, - location=self.location, + location=self._location, api_name="session-__init__", ) @@ -49,8 +50,12 @@ def __init__( self._table_ids: List[bigquery.TableReference] = [] self._kms_key = kms_key - def allocate_and_create_temp_table( - self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] + @property + def location(self): + return self._location + + def create_temp_table( + self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = [] ) -> bigquery.TableReference: """ Allocates and and creates a table in the anonymous dataset. @@ -99,7 +104,8 @@ def generate_unique_resource_id(self) -> bigquery.TableReference: ) return self.dataset.table(table_id) - def clean_up_tables(self): + def close(self): """Delete tables that were created with this session's session_id.""" for table_ref in self._table_ids: self.bqclient.delete_table(table_ref, not_found_ok=True) + self._table_ids.clear() diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index 28dfee7840..ae8dc88d43 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -23,6 +23,7 @@ import google.cloud.bigquery as bigquery from bigframes.core.compile import googlesql +from bigframes.session import temporary_storage KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0 @@ -32,21 +33,22 @@ logger = logging.getLogger(__name__) -class SessionResourceManager: +class SessionResourceManager(temporary_storage.TemporaryStorageManager): """ Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session. """ - def __init__( - self, bqclient: bigquery.Client, location: str, *, kms_key: Optional[str] = None - ): + def __init__(self, bqclient: bigquery.Client, location: str): self.bqclient = bqclient - self.location = location - self._kms_key = kms_key + self._location = location self._session_id: Optional[str] = None self._sessiondaemon: Optional[RecurringTaskDaemon] = None self._session_lock = threading.RLock() + @property + def location(self): + return self._location + def create_temp_table( self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = [] ) -> bigquery.TableReference: @@ -56,17 +58,13 @@ def create_temp_table( with self._session_lock: table_ref = bigquery.TableReference( bigquery.DatasetReference(self.bqclient.project, "_SESSION"), - uuid.uuid4().hex, + f"bqdf_{uuid.uuid4()}", ) job_config = bigquery.QueryJobConfig( connection_properties=[ bigquery.ConnectionProperty("session_id", self._get_session_id()) ] ) - if self._kms_key: - job_config.destination_encryption_configuration = ( - bigquery.EncryptionConfiguration(kms_key_name=self._kms_key) - ) ibis_schema = ibis_bq.BigQuerySchema.to_ibis(list(schema)) @@ -87,7 +85,6 @@ def create_temp_table( ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}" job = self.bqclient.query(ddl, job_config=job_config) - job.result() # return the fully qualified table, so it can be used outside of the session return job.destination diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 07645c2a98..aabbbdcf5d 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -53,7 +53,7 @@ import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner -import bigframes.session.temp_storage +import bigframes.session.temporary_storage # Max complexity that should be executed as a single query QUERY_COMPLEXITY_LIMIT = 1e7 @@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor): def __init__( self, bqclient: bigquery.Client, - storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager, + storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, *, strictly_ordered: bool = True, @@ -221,7 +221,7 @@ def to_sql( enable_cache: bool = True, ) -> str: if offset_column: - array_value, internal_offset_col = array_value.promote_offsets() + array_value, _ = array_value.promote_offsets() node = ( self.replace_cached_subtrees(array_value.node) if enable_cache @@ -248,7 +248,7 @@ def execute( job_config = bigquery.QueryJobConfig() # Use explicit destination to avoid 10GB limit of temporary table if use_explicit_destination: - destination_table = self.storage_manager.allocate_and_create_temp_table( + destination_table = self.storage_manager.create_temp_table( array_value.schema.to_bigquery(), cluster_cols=[] ) job_config.destination = destination_table @@ -392,7 +392,7 @@ def peek( job_config = bigquery.QueryJobConfig() # Use explicit destination to avoid 10GB limit of temporary table if use_explicit_destination: - destination_table = self.storage_manager.allocate_and_create_temp_table( + destination_table = self.storage_manager.create_temp_table( array_value.schema.to_bigquery(), cluster_cols=[] ) job_config.destination = destination_table @@ -645,9 +645,7 @@ def _sql_as_cached_temp_table( cluster_cols: Sequence[str], ) -> bigquery.TableReference: assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS - temp_table = self.storage_manager.allocate_and_create_temp_table( - schema, cluster_cols - ) + temp_table = self.storage_manager.create_temp_table(schema, cluster_cols) # TODO: Get default job config settings job_config = cast( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 290e489bb0..bde3ff456f 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -20,13 +20,14 @@ import itertools import os import typing -from typing import Dict, Hashable, IO, Iterable, List, Optional, Sequence, Tuple, Union +from typing import Dict, Hashable, IO, Iterable, List, Optional, Sequence, Tuple import bigframes_vendored.constants as constants import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions import google.auth.credentials import google.cloud.bigquery as bigquery +import google.cloud.bigquery.job import google.cloud.bigquery.table import google.cloud.bigquery_connection_v1 import google.cloud.bigquery_storage_v1 @@ -55,11 +56,12 @@ import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session._io.pandas as bf_io_pandas +import bigframes.session.anonymous_dataset import bigframes.session.clients import bigframes.session.executor import bigframes.session.metrics import bigframes.session.planner -import bigframes.session.temp_storage +import bigframes.session.temporary_storage import bigframes.session.time as session_time import bigframes.version @@ -71,6 +73,9 @@ import bigframes.session _MAX_CLUSTER_COLUMNS = 4 +_PLACEHOLDER_SCHEMA = ( + google.cloud.bigquery.SchemaField("bf_loader_placeholder", "INTEGER"), +) def _to_index_cols( @@ -115,7 +120,7 @@ def __init__( self, session: bigframes.session.Session, bqclient: bigquery.Client, - storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager, + storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, default_index_type: bigframes.enums.DefaultIndexKind, scan_index_uniqueness: bool, force_total_order: bool, @@ -159,15 +164,18 @@ def read_pandas_load_job( job_config = bigquery.LoadJobConfig() job_config.schema = schema - # TODO: Remove this. It's likely that the slower load job due to - # clustering doesn't improve speed of queries because pandas tables are - # small. cluster_cols = [ordering_col] job_config.clustering_fields = cluster_cols job_config.labels = {"bigframes-api": api_name} + # Allow field addition, as we want to keep the clustered ordering_col field we already created + job_config.schema_update_options = [ + google.cloud.bigquery.job.SchemaUpdateOption.ALLOW_FIELD_ADDITION + ] - load_table_destination = self._storage_manager.allocate_temp_table() + load_table_destination = self._storage_manager.create_temp_table( + [google.cloud.bigquery.SchemaField(ordering_col, "INTEGER")], [ordering_col] + ) load_job = self._bqclient.load_table_from_dataframe( pandas_dataframe_copy, load_table_destination, @@ -216,7 +224,7 @@ def read_pandas_streaming( index=True, ) - destination = self._storage_manager.allocate_and_create_temp_table( + destination = self._storage_manager.create_temp_table( schema, [ordering_col], ) @@ -496,30 +504,28 @@ def read_gbq_table( df.sort_index() return df - def _read_bigquery_load_job( + def read_bigquery_load_job( self, filepath_or_buffer: str | IO["bytes"], - table: Union[bigquery.Table, bigquery.TableReference], *, job_config: bigquery.LoadJobConfig, index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), ) -> dataframe.DataFrame: - index_cols = _to_index_cols(index_col) - - if not job_config.clustering_fields and index_cols: - job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS] - + # Need to create session table beforehand + table = self._storage_manager.create_temp_table(_PLACEHOLDER_SCHEMA) + # but, we just overwrite the placeholder schema immediately with the load job + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE if isinstance(filepath_or_buffer, str): filepath_or_buffer = os.path.expanduser(filepath_or_buffer) if filepath_or_buffer.startswith("gs://"): load_job = self._bqclient.load_table_from_uri( - filepath_or_buffer, table, job_config=job_config + filepath_or_buffer, destination=table, job_config=job_config ) elif os.path.exists(filepath_or_buffer): # local file path with open(filepath_or_buffer, "rb") as source_file: load_job = self._bqclient.load_table_from_file( - source_file, table, job_config=job_config + source_file, destination=table, job_config=job_config ) else: raise NotImplementedError( @@ -528,21 +534,12 @@ def _read_bigquery_load_job( ) else: load_job = self._bqclient.load_table_from_file( - filepath_or_buffer, table, job_config=job_config + filepath_or_buffer, destination=table, job_config=job_config ) self._start_generic_job(load_job) table_id = f"{table.project}.{table.dataset_id}.{table.table_id}" - # Update the table expiration so we aren't limited to the default 24 - # hours of the anonymous dataset. - table_expiration = bigquery.Table(table_id) - table_expiration.expires = ( - datetime.datetime.now(datetime.timezone.utc) - + bigframes.constants.DEFAULT_EXPIRATION - ) - self._bqclient.update_table(table_expiration, ["expires"]) - # The BigQuery REST API for tables.get doesn't take a session ID, so we # can't get the schema for a temp table that way. @@ -676,9 +673,7 @@ def _query_to_destination( ) else: cluster_cols = [] - temp_table = self._storage_manager.allocate_and_create_temp_table( - schema, cluster_cols - ) + temp_table = self._storage_manager.create_temp_table(schema, cluster_cols) timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( "timeoutMs" diff --git a/bigframes/session/temporary_storage.py b/bigframes/session/temporary_storage.py new file mode 100644 index 0000000000..0c2a36f3fe --- /dev/null +++ b/bigframes/session/temporary_storage.py @@ -0,0 +1,32 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol, Sequence + +from google.cloud import bigquery + + +class TemporaryStorageManager(Protocol): + @property + def location(self) -> str: + ... + + def create_temp_table( + self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = [] + ) -> bigquery.TableReference: + ... + + # implementations should be robust to repeatedly closing + def close(self) -> None: + ... diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index e117cf0327..1dac8c851e 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -72,12 +72,12 @@ def test_close(session: bigframes.Session): ) full_id_1 = bigframes.session._io.bigquery.create_temp_table( session.bqclient, - session._temp_storage_manager.allocate_temp_table(), + session._anon_dataset_manager.allocate_temp_table(), expiration, ) full_id_2 = bigframes.session._io.bigquery.create_temp_table( session.bqclient, - session._temp_storage_manager.allocate_temp_table(), + session._anon_dataset_manager.allocate_temp_table(), expiration, ) @@ -112,12 +112,12 @@ def test_clean_up_by_session_id(): ) bigframes.session._io.bigquery.create_temp_table( session.bqclient, - session._temp_storage_manager.allocate_temp_table(), + session._anon_dataset_manager.allocate_temp_table(), expiration, ) bigframes.session._io.bigquery.create_temp_table( session.bqclient, - session._temp_storage_manager.allocate_temp_table(), + session._anon_dataset_manager.allocate_temp_table(), expiration, ) @@ -157,10 +157,11 @@ def test_clean_up_via_context_manager(session_creator): with session_creator() as session: bqclient = session.bqclient - full_id_1 = session._temp_storage_manager.allocate_and_create_temp_table( + full_id_1 = session._anon_dataset_manager.create_temp_table( [bigquery.SchemaField("a", "INT64")], cluster_cols=[] ) - full_id_2 = session._temp_storage_manager.allocate_and_create_temp_table( + assert session._session_resource_manager is not None + full_id_2 = session._session_resource_manager.create_temp_table( [bigquery.SchemaField("b", "STRING")], cluster_cols=["b"] ) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index fbaf4fcb49..9886102e2e 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -258,7 +258,7 @@ def test_to_pandas_override_global_option(scalars_df_index): scalars_df_index.to_pandas() table_id = scalars_df_index._query_job.destination.table_id - assert table_id.startswith("bqdf") + assert table_id is not None # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. @@ -268,12 +268,11 @@ def test_to_pandas_override_global_option(scalars_df_index): def test_to_arrow_override_global_option(scalars_df_index): # Direct call to_arrow uses global default setting (allow_large_results=True), - # table has 'bqdf' prefix. with bigframes.option_context("bigquery.allow_large_results", True): scalars_df_index.to_arrow() table_id = scalars_df_index._query_job.destination.table_id - assert table_id.startswith("bqdf") + assert table_id is not None # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. diff --git a/tests/system/small/test_encryption.py b/tests/system/small/test_encryption.py index 8ce53c218b..97f44694b0 100644 --- a/tests/system/small/test_encryption.py +++ b/tests/system/small/test_encryption.py @@ -84,36 +84,6 @@ def test_session_query_job(bq_cmek, session_with_bq_cmek): assert table.encryption_configuration.kms_key_name == bq_cmek -def test_session_load_job(bq_cmek, session_with_bq_cmek): - if not bq_cmek: # pragma: NO COVER - pytest.skip("no cmek set for testing") # pragma: NO COVER - - # Session should have cmek set in the default query and load job configs - load_table = session_with_bq_cmek._temp_storage_manager.allocate_temp_table() - - df = pandas.DataFrame({"col0": [1, 2, 3]}) - load_job_config = bigquery.LoadJobConfig() - load_job_config.schema = [ - bigquery.SchemaField(df.columns[0], bigquery.enums.SqlTypeNames.INT64) - ] - - load_job = session_with_bq_cmek.bqclient.load_table_from_dataframe( - df, - load_table, - job_config=load_job_config, - ) - load_job.result() - - assert load_job.destination == load_table - assert load_job.destination_encryption_configuration.kms_key_name.startswith( - bq_cmek - ) - - # The load destination table should be created with the intended encryption - table = session_with_bq_cmek.bqclient.get_table(load_job.destination) - assert table.encryption_configuration.kms_key_name == bq_cmek - - def test_read_gbq(bq_cmek, session_with_bq_cmek, scalars_table_id): if not bq_cmek: # pragma: NO COVER pytest.skip("no cmek set for testing") # pragma: NO COVER @@ -194,7 +164,7 @@ def test_to_gbq(bq_cmek, session_with_bq_cmek, scalars_table_id): # Write the result to BQ custom table and assert encryption session_with_bq_cmek.bqclient.get_table(output_table_id) - output_table_ref = session_with_bq_cmek._temp_storage_manager.allocate_temp_table() + output_table_ref = session_with_bq_cmek._anon_dataset_manager.allocate_temp_table() output_table_id = str(output_table_ref) df.to_gbq(output_table_id) output_table = session_with_bq_cmek.bqclient.get_table(output_table_id) @@ -232,7 +202,7 @@ def test_read_pandas_large(bq_cmek, session_with_bq_cmek): _assert_bq_table_is_encrypted(df, bq_cmek, session_with_bq_cmek) -def test_bqml(bq_cmek, session_with_bq_cmek, penguins_table_id): +def test_kms_encryption_bqml(bq_cmek, session_with_bq_cmek, penguins_table_id): if not bq_cmek: # pragma: NO COVER pytest.skip("no cmek set for testing") # pragma: NO COVER diff --git a/tests/system/small/test_index_io.py b/tests/system/small/test_index_io.py index 85001e4ec5..3bf9794f5a 100644 --- a/tests/system/small/test_index_io.py +++ b/tests/system/small/test_index_io.py @@ -20,10 +20,9 @@ def test_to_pandas_override_global_option(scalars_df_index): bf_index = scalars_df_index.index # Direct call to_pandas uses global default setting (allow_large_results=True), - # table has 'bqdf' prefix. bf_index.to_pandas() table_id = bf_index._query_job.destination.table_id - assert table_id.startswith("bqdf") + assert table_id is not None # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. @@ -40,7 +39,7 @@ def test_to_numpy_override_global_option(scalars_df_index): # table has 'bqdf' prefix. bf_index.to_numpy() table_id = bf_index._query_job.destination.table_id - assert table_id.startswith("bqdf") + assert table_id is not None # When allow_large_results=False, a query_job object should not be created. # Therefore, the table_id should remain unchanged. diff --git a/tests/system/small/test_series_io.py b/tests/system/small/test_series_io.py index ae09a2cf5d..edc32f824f 100644 --- a/tests/system/small/test_series_io.py +++ b/tests/system/small/test_series_io.py @@ -19,11 +19,10 @@ def test_to_pandas_override_global_option(scalars_df_index): bf_series = scalars_df_index["int64_col"] - # Direct call to_pandas uses global default setting (allow_large_results=True), - # table has 'bqdf' prefix. + # Direct call to_pandas uses global default setting (allow_large_results=True) bf_series.to_pandas() table_id = bf_series._query_job.destination.table_id - assert table_id.startswith("bqdf") + assert table_id is not None session = bf_series._block.session execution_count = session._metrics.execution_count