Esegui Spark su un cluster Ray su Vertex AI

La libreria Python RayDP consente di eseguire Spark su un cluster Ray. Questo documento illustra l'installazione, la configurazione e l'esecuzione di RayDP su Ray on Vertex AI (cluster Ray su Vertex AI).

Installazione

Ray su Vertex AI consente agli utenti di eseguire le proprie applicazioni utilizzando il framework Ray open source. RayDP fornisce API per l'esecuzione di Spark su Ray. Le immagini container predefinite disponibili per creare un cluster Ray su Vertex AI non vengono fornite con RayDP preinstallato. Ciรฒ significa che devi creare un'immagine del cluster Ray personalizzato su Vertex AI per consentire al cluster Ray su Vertex AI di eseguire applicazioni RayDP sul cluster Ray su Vertex AI. La sezione seguente spiega come creare un'immagine personalizzata RayDP.

Crea un'immagine container personalizzata di Ray su Vertex AI

Utilizza questo Dockerfile per creare un'immagine container personalizzata per Ray su Vertex AI con RayDP installato.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Puoi utilizzare l'ultima immagine predefinita del cluster Ray su Vertex AI per creare l'immagine personalizzata RayDP. Puoi anche installare altri pacchetti Python che prevedi di utilizzare nelle tue applicazioni. pyarrow==14.0 รจ dovuto a un vincolo di dipendenza di Ray 2.9.3.

Crea ed esegui il push dell'immagine container personalizzata

Crea un repository Docker in Artifact Registry prima di creare la tua immagine personalizzata (consulta Utilizzare le immagini container per informazioni su come creare e configurare il repository Docker). Dopo aver creato il repository Docker, crea ed esegui il push dell'immagine container personalizzata utilizzando il Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Dove:

  • LOCATION: la posizione Cloud Storage (ad esempio us-central1) che hai creato in Artifact Registry.
  • PROJECT_ID: il tuo ID progetto Google Cloud .
  • DOCKER_REPOSITORY: il nome del repository Docker che hai creato.
  • IMAGE_NAME: Il nome delle immagini container personalizzate.

Crea un cluster Ray su Vertex AI

Utilizza l'immagine container personalizzata creata nel passaggio precedente per creare un cluster Ray su Vertex AI. Puoi utilizzare l'SDK Vertex AI per Python per creare un cluster Ray su Vertex AI.

Se non l'hai ancora fatto, installa le librerie Python richieste.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configura i nodi head e worker e crea il cluster utilizzando l'SDK Vertex AI per Python. Ad esempio:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Dove:

  • CUSTOM_CONTAINER_IMAGE_URI: l'URI dell'immagine container personalizzata di cui รจ stato eseguito il push in Artifact Registry.
  • CLUSTER_NAME: il nome del tuo cluster Ray su Vertex AI.

Cluster Spark su Ray su Vertex AI

Prima di eseguire l'applicazione Spark, crea una sessione Spark utilizzando l'API RayDP. Puoi utilizzare il client Ray per eseguire questa operazione in modo interattivo o utilizzare l'API Ray Job. L'API Ray Job รจ consigliata, soprattutto per le applicazioni di produzione e a lunga esecuzione. L'API RayDP fornisce parametri per configurare la sessione Spark e supporta la configurazione Spark. Scopri di piรน sull'API RayDP per la creazione di sessioni Spark in Affinitร  dei nodi degli attori master Spark.

RayDP con il client Ray

Puoi utilizzare Task o Actor di Ray per creare un cluster e una sessione Spark sul cluster Ray su Vertex AI. Ray Task o Actor รจ necessario per utilizzare un client Ray per creare una sessione Spark sul cluster Ray su Vertex AI. Il seguente codice mostra come un attore Ray puรฒ creare una sessione Spark, eseguire un'applicazione Spark e arrestare un cluster Spark su un cluster Ray su Vertex AI utilizzando RayDP.

Per connetterti in modo interattivo al cluster Ray su Vertex AI, consulta Connettersi a un cluster Ray tramite Ray Client.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP con l'API Ray Job

Il client Ray รจ utile per piccoli esperimenti che richiedono una connessione interattiva con il cluster Ray. L'API Ray Job รจ il modo consigliato per eseguire job di produzione e di lunga durata su un cluster Ray. Questo vale anche per l'esecuzione di applicazioni Spark sul cluster Ray su Vertex AI.

Crea uno script Python che contenga il codice dell'applicazione Spark. Ad esempio:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Invia il job per eseguire lo script Python utilizzando l'API Ray Job. Ad esempio:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Dove:

  • SCRIPT_NAME: Il nome file dello script che hai creato.

Lettura dei file Cloud Storage dall'applicazione Spark

รˆ prassi comune archiviare i file di dati in un bucket Google Cloud Storage. Puoi leggere questi file in diversi modi da un'applicazione Spark in esecuzione sul cluster Ray su Vertex AI. Questa sezione illustra due tecniche per leggere i file Cloud Storage dalle applicazioni Spark in esecuzione su Ray Cluster su Vertex AI.

Utilizzare il connettore Google Cloud Storage

Puoi utilizzare il Google Cloud Connector per Hadoop per leggere i file da un bucket Cloud Storage dalla tua applicazione Spark. Dopo aver creato una sessione Spark utilizzando RayDP, puoi leggere i file utilizzando alcuni parametri di configurazione. Il seguente codice mostra come leggere un file CSV archiviato in un bucket Cloud Storage da un'applicazione Spark sul cluster Ray su Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Dove:

  • GCS_FILE_URI: l'URI di un file archiviato in un bucket Cloud Storage. Ad esempio: gs://my-bucket/my-file.csv.

Utilizzare i dati Ray

Il connettore Google Cloud fornisce un modo per leggere i file da un bucket Google Cloud e potrebbe essere sufficiente per la maggior parte dei casi d'uso. Potresti voler utilizzare Ray Data per leggere i file dal bucket Google Cloud quando devi utilizzare l'elaborazione distribuita di Ray per leggere i dati o quando riscontri problemi di lettura del fileGoogle Cloud con il connettore Google Cloud . Ciรฒ potrebbe verificarsi a causa di conflitti di dipendenze Java quando alcune dipendenze di altre applicazioni vengono aggiunte al classpath Java di Spark utilizzando spark.jars.packages o spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF Pandas Pyspark sul cluster Ray su Vertex AI

Le UDF Pandas Pyspark a volte richiedono codice aggiuntivo quando le utilizzi nell'applicazione Spark in esecuzione su un cluster Ray su Vertex AI. Questo รจ in genere obbligatorio quando la UDF Pandas utilizza una libreria Python non disponibile nel cluster Ray su Vertex AI. Puoi pacchettizzare le dipendenze Python di un'applicazione utilizzando l'ambiente di runtime con l'API Ray Job. Dopo aver inviato il job Ray al cluster, Ray installa queste dipendenze nell'ambiente virtuale Python che crea per l'esecuzione del job. Le UDF Pandas, tuttavia, non utilizzano lo stesso ambiente virtuale. Utilizzano invece l'ambiente Python System predefinito. Se questa dipendenza non รจ disponibile nell'ambiente di sistema, potrebbe essere necessario installarla all'interno della funzione definita dall'utente Pandas. Nell'esempio seguente, installa la libreria statsmodels all'interno della UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()