Distribution de charges de travail qui récupÚrent des résultats avec le connecteur Snowflake pour Python¶

Si vous utilisez un environnement distribuĂ© pour parallĂ©liser les charges de travail, vous pouvez utiliser le connecteur Snowflake pour Python pour rĂ©partir le travail d’extraction et de traitement des rĂ©sultats.

Dans ce chapitre :

Introduction¶

AprĂšs avoir utilisĂ© l’objet Curseur pour exĂ©cuter une requĂȘte, vous pouvez rĂ©partir le travail de rĂ©cupĂ©ration des rĂ©sultats en utilisant des lots de rĂ©sultats. Un lot de rĂ©sultats encapsule une fonction qui rĂ©cupĂšre un sous-ensemble de rĂ©sultats. Vous pouvez affecter diffĂ©rents employĂ©s Ă  l’utilisation de diffĂ©rents lots de rĂ©sultats pour rĂ©cupĂ©rer et traiter les rĂ©sultats en parallĂšle.

Récupération de la liste des lots de résultats¶

AprĂšs avoir exĂ©cutĂ© une requĂȘte, vous pouvez rĂ©cupĂ©rer les rĂ©sultats dans l’un des formats suivants :

  • Objets ResultBatch.

    Pour ce faire, appelez la mĂ©thode obtenir_lot_rĂ©sultats() dans l’objet Curseur. Cela renvoie une liste d’objets ResultBatch que vous pouvez affecter Ă  diffĂ©rents employĂ©s pour le traitement. Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • Tables PyArrow.

    Pour plus d’informations, voir les tables PyArrow.

    Vous pouvez utiliser les mĂ©thodes suivantes pour rĂ©cupĂ©rer les lots de rĂ©sultats en tant que tables PyArrow :

    • rĂ©cupĂ©rer_arrow_tout() : appelez cette mĂ©thode pour renvoyer une table PyArrow contenant tous les rĂ©sultats.

    • rĂ©cupĂ©rer_arrow_lots() : appelez cette mĂ©thode pour renvoyer un itĂ©rateur que vous pouvez utiliser pour renvoyer une table PyArrow pour chaque lot de rĂ©sultats.

    Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • Objets DataFrame Pandas.

    Si vous avez installĂ© la version compatible avec Pandas du connecteur Snowflake pour Python, vous pouvez utiliser les mĂ©thodes suivantes pour rĂ©cupĂ©rer les lots de rĂ©sultats sous forme d’objets DataFrame Pandas :

    • fetch_pandas_all() : appelez cette mĂ©thode pour renvoyer un DataFrame Pandas contenant tous les rĂ©sultats.

    • fetch_pandas_batches() : appelez cette mĂ©thode pour renvoyer un itĂ©rateur que vous pouvez utiliser pour renvoyer un DataFrame Pandas pour chaque lot de rĂ©sultats.

    Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

Sérialisation des lots de résultats¶

Pour dĂ©placer les lots de rĂ©sultats vers d’autres employĂ©s ou nƓuds, vous pouvez sĂ©rialiser et dĂ©sĂ©rialiser les lots de rĂ©sultats. Exemple :

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

Utilisation de lots de résultats¶

Les sections suivantes expliquent comment travailler avec les objets ResultBatch :

ItĂ©ration sur les lignes d’un lot de rĂ©sultats¶

Avec un objet ResultBatch, vous pouvez itĂ©rer sur les lignes qui font partie de ce lot. Par exemple :

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

Lorsque vous crĂ©ez un itĂ©rateur d’un objet ResultBatch, l’objet rĂ©cupĂšre et convertit le sous-ensemble de lignes pour ce lot.

MatĂ©rialisation des lignes d’un lot de rĂ©sultats¶

Pour matĂ©rialiser le sous-ensemble de lignes dans un lot de rĂ©sultats en passant cet objet ResultBatch Ă  la fonction list(). Par exemple :

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

Obtention du nombre de lignes et la taille d’un lot de rĂ©sultats¶

Si vous devez dĂ©terminer le nombre de lignes dans un lot de rĂ©sultats et la taille des donnĂ©es, vous pouvez utiliser les attributs rowcount, compressed_size et uncompressed_size de l’objet ResultBatch. Par exemple :

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

Notez que ces attributs sont disponibles avant que vous n’itĂ©riez sur le lot de rĂ©sultats. Il n’est pas nĂ©cessaire de rĂ©cupĂ©rer le sous-ensemble de lignes pour le lot afin d’obtenir les valeurs de ces attributs.

Conversion d’un lot de rĂ©sultats Arrow en une table PyArrow ou un DataFrame Pandas¶

Pour convertir un ArrowResultBatch en une table PyArrow ou un DataFrame Pandas, utilisez les mĂ©thodes suivantes :

Par exemple :

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy