Python์ฉ Snowflake ์ปค๋ฅํฐ๋ก ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ธ์ค๋ ์ํฌ๋ก๋ ๋ถ์ฐํ๊ธฐยถ
๋ถ์ฐ ํ๊ฒฝ์ ์ฌ์ฉํ์ฌ ์ํฌ๋ก๋๋ฅผ ๋ณ๋ ฌํํ๋ ๊ฒฝ์ฐ Python์ฉ Snowflake ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐ ๋ฐ ์ฒ๋ฆฌ ์์ ์ ๋ถ์ฐํ ์ ์์ต๋๋ค.
์ด ํญ๋ชฉ์ ๋ด์ฉ:
์๊ฐยถ
Cursor
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ, ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ธ์ค๋ ์์
์ ๋ถ์ฐํ ์ ์์ต๋๋ค. ๊ฒฐ๊ณผ ๋ฐฐ์น ๋ ๊ฒฐ๊ณผ์ ํ์ ์ธํธ๋ฅผ ๊ฒ์ํ๋ ํจ์๋ฅผ ์บก์ํํฉ๋๋ค. ๋ค์ํ ์์
์๋ฅผ ํ ๋นํ์ฌ ๋ค์ํ ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ ์ฌ์ฉํด ๊ฒฐ๊ณผ๋ฅผ ๋ณ๋ ฌ๋ก ๊ฐ์ ธ์ค๊ณ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๊ฒฐ๊ณผ ๋ฐฐ์น ๋ชฉ๋ก ๊ฒ์ํ๊ธฐยถ
์ฟผ๋ฆฌ๋ฅผ ์คํํ ํ ๋ค์ ํ์ ์ค ํ๋๋ก ๊ฒฐ๊ณผ๋ฅผ ๊ฒ์ํ ์ ์์ต๋๋ค.
ResultBatch ์ค๋ธ์ ํธ.
์ด๋ ๊ฒ ํ๋ ค๋ฉด Cursor ์ค๋ธ์ ํธ์์
get_result_batches()
๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค. ์ด ๋ฉ์๋๋ ์ฒ๋ฆฌ๋ฅผ ์ํด ๋ค๋ฅธ ์์ ์์ ํ ๋นํ ์ ์๋ResultBatch
์ค๋ธ์ ํธ์ ๋ชฉ๋ก์ ๋ฐํํฉ๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Get the list of result batches result_batch_list = cur.get_result_batches() # Get the number of result batches in the list. num_result_batches = len(result_batch_list) # Split the list of result batches into two # to distribute the work of fetching results # between two workers. result_batch_list_1 = result_batch_list[:: 2] result_batch_list_2 = result_batch_list[1 :: 2]
PyArrow ํ ์ด๋ธ.
์์ธํ ๋ด์ฉ์ PyArrow ํ ์ด๋ธ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ PyArrow ํ ์ด๋ธ๋ก ๊ฒ์ํ ์ ์์ต๋๋ค.
fetch_arrow_all()
: ๋ชจ๋ ๊ฒฐ๊ณผ๊ฐ ํฌํจ๋ PyArrow ํ ์ด๋ธ์ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.fetch_arrow_batches()
: ๊ฐ ๊ฒฐ๊ณผ ๋ฐฐ์น์ PyArrow ํ ์ด๋ธ์ ๋ฐํํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.
์:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a PyArrow table containing all of the results. table = cur.fetch_arrow_all() # Iterate over a list of PyArrow tables for result batches. for table_for_batch in cur.fetch_arrow_batches(): my_pyarrow_table_processing_function(table_for_batch)
pandas DataFrame ์ค๋ธ์ ํธ.
Python์ฉ Snowflake ์ปค๋ฅํฐ์ pandas ํธํ ๋ฒ์ ์ ์ค์นํ ๊ฒฝ์ฐ, ๋ค์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ pandas DataFrame ์ค๋ธ์ ํธ๋ก ๊ฒ์ํ ์ ์์ต๋๋ค.
fetch_pandas_all()
: ๋ชจ๋ ๊ฒฐ๊ณผ๊ฐ ํฌํจ๋ pandas DataFrame์ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.fetch_pandas_batches()
: ๊ฐ ๊ฒฐ๊ณผ ๋ฐฐ์น์ pandas DataFrame์ ๋ฐํํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.
์:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a pandas DataFrame containing all of the results. table = cur.fetch_pandas_all() # Iterate over a list of pandas DataFrames for result batches. for dataframe_for_batch in cur.fetch_pandas_batches(): my_dataframe_processing_function(dataframe_for_batch)
๊ฒฐ๊ณผ ๋ฐฐ์น ์ง๋ ฌํํ๊ธฐยถ
๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ ๋ค๋ฅธ ์์ ์๋ ๋ ธ๋๋ก ์ด๋ํ๊ธฐ ์ํด ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ ์ง๋ ฌํ ๋ฐ ์ญ์ง๋ ฌํํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
import pickle
# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])
# At this point, you can move the serialized data to
# another worker/node.
...
# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
๊ฒฐ๊ณผ ๋ฐฐ์น๋ก ์์ ํ๊ธฐยถ
๋ค์ ์น์ ์์๋ ResultBatch ์ค๋ธ์ ํธ๋ก ์์ ํ๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํฉ๋๋ค.
๊ฒฐ๊ณผ ๋ฐฐ์น์ ํ์ ๋ํด ๋ฐ๋ณตํ๊ธฐยถ
ResultBatch
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ํด๋น ๋ฐฐ์น์ ์ํ ํ์ ๋ํด ๋ฐ๋ณตํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
# Iterate over the list of result batches.
for batch in result_batch_list_1:
# Iterate over the subset of rows in a result batch.
for row in batch:
print(row)
ResultBatch
์ค๋ธ์ ํธ์ ๋ฐ๋ณต๊ธฐ๋ฅผ ๋ง๋ค ๋ ์ด ์ค๋ธ์ ํธ๋ ํด๋น ๋ฐฐ์น์ ๋ํ ํ์ ํ์ ์ธํธ๋ฅผ ๊ฐ์ ธ์ ๋ณํํฉ๋๋ค.
๊ฒฐ๊ณผ ๋ฐฐ์น์ ํ ๊ตฌ์ฒดํํ๊ธฐยถ
ํด๋น ResultBatch
์ค๋ธ์ ํธ๋ฅผ list()
ํจ์์ ์ ๋ฌํ์ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น์ ํ ํ์ ์ธํธ๋ฅผ ๊ตฌ์ฒดํํฉ๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
๊ฒฐ๊ณผ ๋ฐฐ์น์ ํ ์์ ํฌ๊ธฐ ๊ฐ์ ธ์ค๊ธฐยถ
๊ฒฐ๊ณผ ๋ฐฐ์น์ ํ ์์ ๋ฐ์ดํฐ ํฌ๊ธฐ๋ฅผ ๊ฒฐ์ ํด์ผ ํ๋ ๊ฒฝ์ฐ ResultBatch
์ค๋ธ์ ํธ์ ํ ์, ์์ถ_ํฌ๊ธฐ, ๋น์์ถ_ํฌ๊ธฐ ์์ฑ์ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount
# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ ๋ฐ๋ณตํ๊ธฐ ์ ์ ์ด๋ฌํ ์์ฑ์ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฌํ ์์ฑ์ ๊ฐ์ ๊ฐ์ ธ์ค๋ ค๊ณ ๋ฐฐ์น์ ๋ํ ํ์ ํ์ ์ธํธ๋ฅผ ๊ฐ์ ธ์ฌ ํ์๋ ์์ต๋๋ค.
Arrow ๊ฒฐ๊ณผ ๋ฐฐ์น๋ฅผ PyArrow ํ ์ด๋ธ ๋๋ pandas DataFrame๋ก ๋ณํํ๊ธฐยถ
ArrowResultBatch
๋ฅผ PyArrow ํ
์ด๋ธ ๋๋ pandas DataFrame์ผ๋ก ๋ณํํ๋ ค๋ฉด ๋ค์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ญ์์ค.
to_pandas()
: Python์ฉ Snowflake ์ปค๋ฅํฐ์ pandas ํธํ ๋ฒ์ ์ ์ค์นํ ๊ฒฝ์ฐ,ArrowResultBatch
์ ํ์ด ํฌํจ๋ pandas DataFrame์ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.to_arrow()
:ResultBatch
์ ํ์ด ํฌํจ๋ PyArrow ํ ์ด๋ธ์ ๋ฐํํ๋ ค๋ฉด ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ญ์์ค.
์:
with conn_cnx as con:
with con.cursor() as cur:
cur.execute("select col1 from table")
batches = cur.get_result_batches()
# Get the row from the ResultBatch as a pandas DataFrame.
dataframe = batches[0].to_pandas()
# Get the row from the ResultBatch as a PyArrow table.
table = batches[0].to_arrow()