Skip to content
30 changes: 30 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,36 @@ def _validate_result_schema(self, result_df: pd.DataFrame):
f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}"
)

def to_arrow(
self,
*,
ordered: bool = True,
) -> Tuple[pa.Table, bigquery.QueryJob]:
"""Run query and download results as a pyarrow Table."""
# pa.Table.from_pandas puts index columns last, so update the expression to match.
expr = self.expr.select_columns(
list(self.value_columns) + list(self.index_columns)
)

_, query_job = self.session._query_to_destination(
self.session._to_sql(expr, ordered=ordered),
list(self.index_columns),
api_name="cached",
do_clustering=False,
)
results_iterator = query_job.result()
pa_table = results_iterator.to_arrow()

pa_index_labels = []
for index_level, index_label in enumerate(self._index_labels):
if isinstance(index_label, str):
pa_index_labels.append(index_label)
else:
pa_index_labels.append(f"__index_level_{index_level}__")

pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels)
return pa_table, query_job

def to_pandas(
self,
max_download_size: Optional[int] = None,
Expand Down
29 changes: 29 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import numpy
import pandas
import pandas.io.formats.format
import pyarrow
import tabulate

import bigframes
Expand Down Expand Up @@ -1183,6 +1184,34 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame:

return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp()))

def to_arrow(
self,
*,
ordered: Optional[bool] = None,
) -> pyarrow.Table:
"""Write DataFrame to an Arrow table / record batch.

Args:
ordered (bool, default None):
Determines whether the resulting Arrow table will be deterministically ordered.
In some cases, unordered may result in a faster-executing query. If set to a value
other than None, will override Session default.

Returns:
pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame.
"""
warnings.warn(
"to_arrow is in preview. Types and unnamed / duplicate name columns may change in future.",
category=bigframes.exceptions.PreviewWarning,
)

self._optimize_query_complexity()
pa_table, query_job = self._block.to_arrow(
ordered=ordered if ordered is not None else self._session._strictly_ordered,
)
self._set_internal_query_job(query_job)
return pa_table

def to_pandas(
self,
max_download_size: Optional[int] = None,
Expand Down
40 changes: 40 additions & 0 deletions samples/polars/create_polars_df_with_to_arrow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def test_create_polars_df() -> None:
# [START bigquery_dataframes_to_polars]
import polars

import bigframes.enums
import bigframes.pandas as bpd

bf_df = bpd.read_gbq_table(
"bigquery-public-data.usa_names.usa_1910_current",
# Setting index_col to either a unique column or NULL will give the
# best performance.
index_col=bigframes.enums.DefaultIndexKind.NULL,
)
# TODO(developer): Do some analysis using BigQuery DataFrames.
# ...

# Run the query and download the results as an Arrow table to convert into
# a Polars DataFrame. Use ordered=False if your polars analysis is OK with
# non-deterministic ordering.
arrow_table = bf_df.to_arrow(ordered=False)
polars_df = polars.from_arrow(arrow_table)
# [END bigquery_dataframes_to_polars]

assert polars_df.shape == bf_df.shape
assert polars_df["number"].sum() == bf_df["number"].sum()
Loading