Python์šฉ Snowflake ์ปค๋„ฅํ„ฐ๋กœ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์›Œํฌ๋กœ๋“œ ๋ถ„์‚ฐํ•˜๊ธฐยถ

๋ถ„์‚ฐ ํ™˜๊ฒฝ์„ ์‚ฌ์šฉํ•˜์—ฌ ์›Œํฌ๋กœ๋“œ๋ฅผ ๋ณ‘๋ ฌํ™”ํ•˜๋Š” ๊ฒฝ์šฐ Python์šฉ Snowflake ์ปค๋„ฅํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ ๋ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์„ ๋ถ„์‚ฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ํ•ญ๋ชฉ์˜ ๋‚ด์šฉ:

์†Œ๊ฐœยถ

Cursor ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•œ ํ›„, ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ์ž‘์—…์„ ๋ถ„์‚ฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฒฐ๊ณผ ๋ฐฐ์น˜ ๋Š” ๊ฒฐ๊ณผ์˜ ํ•˜์œ„ ์„ธํŠธ๋ฅผ ๊ฒ€์ƒ‰ํ•˜๋Š” ํ•จ์ˆ˜๋ฅผ ์บก์Аํ™”ํ•ฉ๋‹ˆ๋‹ค. ๋‹ค์–‘ํ•œ ์ž‘์—…์ž๋ฅผ ํ• ๋‹นํ•˜์—ฌ ๋‹ค์–‘ํ•œ ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ ์‚ฌ์šฉํ•ด ๊ฒฐ๊ณผ๋ฅผ ๋ณ‘๋ ฌ๋กœ ๊ฐ€์ ธ์˜ค๊ณ  ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฒฐ๊ณผ ๋ฐฐ์น˜ ๋ชฉ๋ก ๊ฒ€์ƒ‰ํ•˜๊ธฐยถ

์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•œ ํ›„ ๋‹ค์Œ ํ˜•์‹ ์ค‘ ํ•˜๋‚˜๋กœ ๊ฒฐ๊ณผ๋ฅผ ๊ฒ€์ƒ‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ResultBatch ์˜ค๋ธŒ์ ํŠธ.

    ์ด๋ ‡๊ฒŒ ํ•˜๋ ค๋ฉด Cursor ์˜ค๋ธŒ์ ํŠธ์—์„œ get_result_batches() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์‹ญ์‹œ์˜ค. ์ด ๋ฉ”์„œ๋“œ๋Š” ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด ๋‹ค๋ฅธ ์ž‘์—…์ž์— ํ• ๋‹นํ•  ์ˆ˜ ์žˆ๋Š” ResultBatch ์˜ค๋ธŒ์ ํŠธ์˜ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow ํ…Œ์ด๋ธ”.

    ์ž์„ธํ•œ ๋‚ด์šฉ์€ PyArrow ํ…Œ์ด๋ธ” ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

    ๋‹ค์Œ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ PyArrow ํ…Œ์ด๋ธ”๋กœ ๊ฒ€์ƒ‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • fetch_arrow_all(): ๋ชจ๋“  ๊ฒฐ๊ณผ๊ฐ€ ํฌํ•จ๋œ PyArrow ํ…Œ์ด๋ธ”์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด ์ด ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์‹ญ์‹œ์˜ค.

    • fetch_arrow_batches(): ๊ฐ ๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ PyArrow ํ…Œ์ด๋ธ”์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐ˜๋ณต๊ธฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด ์ด ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์‹ญ์‹œ์˜ค.

    ์˜ˆ:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • pandas DataFrame ์˜ค๋ธŒ์ ํŠธ.

    Python์šฉ Snowflake ์ปค๋„ฅํ„ฐ์˜ pandas ํ˜ธํ™˜ ๋ฒ„์ „์„ ์„ค์น˜ํ•œ ๊ฒฝ์šฐ, ๋‹ค์Œ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ pandas DataFrame ์˜ค๋ธŒ์ ํŠธ๋กœ ๊ฒ€์ƒ‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    • fetch_pandas_all(): ๋ชจ๋“  ๊ฒฐ๊ณผ๊ฐ€ ํฌํ•จ๋œ pandas DataFrame์„ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด ์ด ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์‹ญ์‹œ์˜ค.

    • fetch_pandas_batches(): ๊ฐ ๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ pandas DataFrame์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐ˜๋ณต๊ธฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ ค๋ฉด ์ด ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์‹ญ์‹œ์˜ค.

    ์˜ˆ:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

๊ฒฐ๊ณผ ๋ฐฐ์น˜ ์ง๋ ฌํ™”ํ•˜๊ธฐยถ

๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ ๋‹ค๋ฅธ ์ž‘์—…์ž๋‚˜ ๋…ธ๋“œ๋กœ ์ด๋™ํ•˜๊ธฐ ์œ„ํ•ด ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ ์ง๋ ฌํ™” ๋ฐ ์—ญ์ง๋ ฌํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

๊ฒฐ๊ณผ ๋ฐฐ์น˜๋กœ ์ž‘์—…ํ•˜๊ธฐยถ

๋‹ค์Œ ์„น์…˜์—์„œ๋Š” ResultBatch ์˜ค๋ธŒ์ ํŠธ๋กœ ์ž‘์—…ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค.

๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ ํ–‰์— ๋Œ€ํ•ด ๋ฐ˜๋ณตํ•˜๊ธฐยถ

ResultBatch ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ•ด๋‹น ๋ฐฐ์น˜์— ์†ํ•œ ํ–‰์— ๋Œ€ํ•ด ๋ฐ˜๋ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

ResultBatch ์˜ค๋ธŒ์ ํŠธ์˜ ๋ฐ˜๋ณต๊ธฐ๋ฅผ ๋งŒ๋“ค ๋•Œ ์ด ์˜ค๋ธŒ์ ํŠธ๋Š” ํ•ด๋‹น ๋ฐฐ์น˜์— ๋Œ€ํ•œ ํ–‰์˜ ํ•˜์œ„ ์„ธํŠธ๋ฅผ ๊ฐ€์ ธ์™€ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ ํ–‰ ๊ตฌ์ฒดํ™”ํ•˜๊ธฐยถ

ํ•ด๋‹น ResultBatch ์˜ค๋ธŒ์ ํŠธ๋ฅผ list() ํ•จ์ˆ˜์— ์ „๋‹ฌํ•˜์—ฌ ๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ ํ–‰ ํ•˜์œ„ ์„ธํŠธ๋ฅผ ๊ตฌ์ฒดํ™”ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ ํ–‰ ์ˆ˜์™€ ํฌ๊ธฐ ๊ฐ€์ ธ์˜ค๊ธฐยถ

๊ฒฐ๊ณผ ๋ฐฐ์น˜์˜ ํ–‰ ์ˆ˜์™€ ๋ฐ์ดํ„ฐ ํฌ๊ธฐ๋ฅผ ๊ฒฐ์ •ํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ResultBatch ์˜ค๋ธŒ์ ํŠธ์˜ ํ–‰ ์ˆ˜, ์••์ถ•_ํฌ๊ธฐ, ๋น„์••์ถ•_ํฌ๊ธฐ ์†์„ฑ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ ๋ฐ˜๋ณตํ•˜๊ธฐ ์ „์— ์ด๋Ÿฌํ•œ ์†์„ฑ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์†์„ฑ์˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๋ ค๊ณ  ๋ฐฐ์น˜์— ๋Œ€ํ•œ ํ–‰์˜ ํ•˜์œ„ ์„ธํŠธ๋ฅผ ๊ฐ€์ ธ์˜ฌ ํ•„์š”๋Š” ์—†์Šต๋‹ˆ๋‹ค.

Arrow ๊ฒฐ๊ณผ ๋ฐฐ์น˜๋ฅผ PyArrow ํ…Œ์ด๋ธ” ๋˜๋Š” pandas DataFrame๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐยถ

ArrowResultBatch ๋ฅผ PyArrow ํ…Œ์ด๋ธ” ๋˜๋Š” pandas DataFrame์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋ ค๋ฉด ๋‹ค์Œ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์‹ญ์‹œ์˜ค.

์˜ˆ:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy