Las plantillas de agentes en Vertex AI Agent Engine se definen como clases de Python. En los siguientes pasos, se muestra cómo crear una plantilla personalizada para crear instancias de agentes que se puedan implementar en Vertex AI:
- Ejemplo básico
- (Opcional) Respuestas de transmisión
- Registra métodos personalizados (opcional)
- (Opcional) Proporciona anotaciones de tipos
- (Opcional) Envía seguimientos a Cloud Trace
- (Opcional) Trabaja con variables de entorno
- (Opcional) Integración con Secret Manager
- (Opcional) Cómo controlar las credenciales
- (Opcional) Control de errores
Ejemplo básico
Para dar un ejemplo básico, la siguiente clase de Python es una plantilla para crear instancias de agentes que se pueden implementar en Vertex AI (puedes asignar a la variable CLASS_NAME
un valor como MyAgent
):
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Consideraciones sobre la implementación
Cuando escribes tu clase de Python, los siguientes tres métodos son importantes:
__init__()
:- Usa este método solo para los parámetros de configuración del agente. Por ejemplo, puedes usar este método para recopilar los parámetros del modelo y los atributos de seguridad como argumentos de entrada de tus usuarios. También puedes usar este método para recopilar parámetros, como el ID del proyecto, la región, las credenciales de la aplicación y las claves de API.
- El constructor devuelve un objeto que debe ser "pickle-able" para que se pueda implementar en Vertex AI Agent Engine. Por lo tanto, debes inicializar los clientes de servicio y establecer conexiones con las bases de datos en el método
.set_up
, en lugar de hacerlo en el método__init__
. - Este método es opcional. Si no se especifica, Vertex AI usa el constructor de Python predeterminado para la clase.
set_up()
:- Debes usar este método para definir la lógica de inicialización del agente. Por ejemplo, usa este método para establecer conexiones con bases de datos o servicios dependientes, importar paquetes dependientes o datos de procesamiento previo que se usan para entregar consultas.
- Este método es opcional. Si no se especifica, Vertex AI supone que el agente no necesita llamar a un método
.set_up
antes de entregar consultas de los usuarios.
query()
/stream_query()
:- Usa
query()
para devolver la respuesta completa como un solo resultado. - Usa
stream_query()
para devolver la respuesta en fragmentos a medida que esté disponible, lo que permite una experiencia de transmisión. El métodostream_query
debe devolver un objeto iterable (por ejemplo, un generador) para habilitar la transmisión. - Puedes implementar ambos métodos si deseas admitir interacciones de respuesta única y de transmisión con tu agente.
- Debes proporcionar a este método una docstring clara que defina lo que hace,
documenta sus atributos y proporciona anotaciones de tipo para sus entradas.
Evita los argumentos variables en los métodos
query
ystream_query
.
- Usa
Crea una instancia del agente de forma local
Puedes crear una instancia local de tu agente con el siguiente código:
agent = CLASS_NAME(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Prueba el método query
Puedes probar el agente enviando consultas a la instancia local:
response = agent.query(
input="What is the exchange rate from US dollars to Swedish currency?"
)
print(response)
La respuesta es un diccionario similar al siguiente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Cómo realizar consultas de forma asíncrona
Para responder a las consultas de forma asíncrona, puedes definir un método (como async_query
) que devuelva una corrutina de Python. Como ejemplo, la siguiente plantilla extiende el ejemplo básico para responder de forma asíncrona y se puede implementar en Vertex AI:
class AsyncAgent(CLASS_NAME):
async def async_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.ainvoke(**kwargs):
yield dumpd(chunk)
agent = AsyncAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Prueba el método async_query
Puedes probar el agente de forma local llamando al método async_query
. Por ejemplo:
response = await agent.async_query(
input="What is the exchange rate from US dollars to Swedish Krona today?"
)
print(response)
La respuesta es un diccionario similar al siguiente:
{"input": "What is the exchange rate from US dollars to Swedish currency?",
# ...
"output": "For 1 US dollar you will get 10.7345 Swedish Krona."}
Respuestas de transmisión
Para transmitir respuestas a las consultas, puedes definir un método llamado stream_query
que genere respuestas. Como ejemplo, la siguiente plantilla extiende el ejemplo básico para transmitir respuestas y se puede implementar en Vertex AI:
from typing import Iterable
class StreamingAgent(CLASS_NAME):
def stream_query(self, **kwargs) -> Iterable:
from langchain.load.dump import dumpd
for chunk in self.graph.stream(**kwargs):
yield dumpd(chunk)
agent = StreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Estos son algunos aspectos clave que debes tener en cuenta cuando uses la API de transmisión:
- Tiempo de espera máximo: El tiempo de espera máximo para las respuestas de transmisión es de 10 minutos. Si tu agente requiere tiempos de procesamiento más largos, considera dividir la tarea en partes más pequeñas.
- Modelos y cadenas de transmisión: La interfaz Runnable de LangChain admite la transmisión, por lo que puedes transmitir respuestas no solo de agentes, sino también de modelos y cadenas.
- Compatibilidad con LangChain: Ten en cuenta que, por el momento, no se admiten métodos asíncronos, como el método
astream_event
de LangChain. - Limita la generación de contenido: Si tienes problemas de contrapresión (en los que el productor genera datos más rápido de lo que el consumidor puede procesarlos), debes limitar la tasa de generación de contenido. Esto puede ayudar a evitar desbordamientos de búfer y garantizar una experiencia de transmisión fluida.
Prueba el método stream_query
Puedes probar la consulta de transmisión de forma local llamando al método stream_query
y, luego, iterando los resultados. Por ejemplo:
import pprint
for chunk in agent.stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Este código imprime cada fragmento de la respuesta a medida que se genera. El resultado podría verse de la siguiente manera:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
En este ejemplo, cada fragmento contiene información diferente sobre la respuesta, como las acciones que realizó el agente, los mensajes intercambiados y el resultado final.
Transmite respuestas de forma asíncrona
Para transmitir respuestas de forma asíncrona, puedes definir un método (p.ej., async_stream_query
) que muestre un generador asíncrono. Como ejemplo, la siguiente plantilla extiende el ejemplo básico para transmitir respuestas de forma asíncrona y se puede implementar en Vertex AI:
class AsyncStreamingAgent(CLASS_NAME):
async def async_stream_query(self, **kwargs):
from langchain.load.dump import dumpd
for chunk in self.graph.astream(**kwargs):
yield dumpd(chunk)
agent = AsyncStreamingAgent(
model=model, # Required.
tools=[get_exchange_rate], # Optional.
project="PROJECT_ID",
location="LOCATION",
)
agent.set_up()
Prueba el método async_stream_query
De manera similar al código para probar consultas de transmisión, puedes probar el agente de forma local llamando al método async_stream_query
y, luego, iterando los resultados. Por ejemplo:
import pprint
async for chunk in agent.async_stream_query(
input="What is the exchange rate from US dollars to Swedish currency?"
):
# Use pprint with depth=1 for a more concise, high-level view of the
# streamed output.
# To see the full content of the chunk, use:
# print(chunk)
pprint.pprint(chunk, depth=1)
Este código imprime cada fragmento de la respuesta a medida que se genera. El resultado podría verse de la siguiente manera:
{'actions': [...], 'messages': [...]}
{'messages': [...], 'steps': [...]}
{'messages': [...],
'output': 'The exchange rate from US dollars to Swedish currency is 1 USD to '
'10.5751 SEK. \n'}
Cómo registrar métodos personalizados
De forma predeterminada, los métodos query
y stream_query
se registran como operaciones en el agente implementado.
Puedes anular el comportamiento predeterminado y definir el conjunto de operaciones que se registrarán con el método register_operations
.
Las operaciones se pueden registrar como modos de ejecución estándar (representados por una cadena vacía ""
) o de transmisión ("stream"
).
Para registrar varias operaciones, puedes definir un método llamado register_operations
que enumere los métodos que estarán disponibles para los usuarios cuando se implemente el agente. En el siguiente ejemplo de código, el método register_operations
hará que el agente implementado registre query
y get_state
como operaciones que se ejecutan de forma síncrona, y stream_query
y get_state_history
como operaciones que transmiten las respuestas:
from typing import Iterable
class CustomAgent(StreamingAgent):
def get_state(self) -> dict: # new synchronous method
return self.graph.get_state(**kwargs)._asdict()
def get_state_history(self) -> Iterable: # new streaming operation
for state_snapshot in self.graph.get_state_history(**kwargs):
yield state_snapshot._asdict()
def register_operations(self):
return {
# The list of synchronous operations to be registered
"": ["query", "get_state"],
# The list of streaming operations to be registered
"stream": ["stream_query", "get_state_history"],
}
Puedes probar los métodos personalizados llamándolos directamente en la instancia local del agente, de manera similar a como probarías los métodos query
y stream_query
.
Cómo proporcionar anotaciones de tipos
Puedes usar anotaciones de tipo para especificar los tipos de entrada y salida esperados de los métodos de tu agente. Cuando se implementa el agente, solo se admiten tipos serializables en JSON en la entrada y salida de las operaciones que admite el agente. Los esquemas de las entradas y salidas se pueden anotar con TypedDict
o modelos de Pydantic.
En el siguiente ejemplo, anotamos la entrada como un TypedDict
y convertimos el resultado sin procesar de .get_state
(que es un NamedTuple
) en un diccionario serializable con su método ._asdict()
:
from typing import Any, Dict, TypedDict
# schemas.py
class RunnableConfig(TypedDict, total=False):
metadata: Dict[str, Any]
configurable: Dict[str, Any]
# agents.py
class AnnotatedAgent(CLASS_NAME):
def get_state(self, config: RunnableConfig) -> dict:
return self.graph.get_state(config=config)._asdict()
def register_operations(self):
return {"": ["query", "get_state"]}
Envía seguimientos a Cloud Trace
Para enviar seguimientos a Cloud Trace con bibliotecas de instrumentación que admiten OpenTelemetry, puedes importarlas e inicializarlas en el método .set_up
. Para los frameworks de agentes comunes, es posible que puedas usar la integración de OpenTelemetry Google Cloud en combinación con un framework de instrumentación como OpenInference o OpenLLMetry.
Por ejemplo, la siguiente plantilla es una modificación del ejemplo básico para exportar seguimientos a Cloud Trace:
OpenInference
Primero, instala el paquete requerido con pip
ejecutando
pip install openinference-instrumentation-langchain==0.1.34
A continuación, importa e inicializa el instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangChainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
OpenLLMetry
Primero, instala el paquete requerido con pip
ejecutando
pip install opentelemetry-instrumentation-langchain==0.38.10
A continuación, importa e inicializa el instrumentador:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
location: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.location = location
def set_up(self):
# The additional code required for tracing instrumentation.
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
import google.cloud.trace_v2 as cloud_trace_v2
import google.auth
credentials, _ = google.auth.default()
trace.set_tracer_provider(TracerProvider())
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=self.project,
client=cloud_trace_v2.TraceServiceClient(
credentials=credentials.with_quota_project(self.project),
),
)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(cloud_trace_exporter)
)
LangchainInstrumentor().instrument()
# end of additional code required
import vertexai
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
vertexai.init(project=self.project, location=self.location)
model = ChatVertexAI(model_name=self.model_name)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Trabaja con variables de entorno
Para configurar variables de entorno, asegúrate de que estén disponibles a través de os.environ
durante el desarrollo y sigue las instrucciones en Define las variables de entorno cuando implementes el agente.
Integración con Secret Manager
Sigue estos pasos para realizar la integración con Secret Manager:
Instala la biblioteca cliente ejecutando el siguiente comando:
pip install google-cloud-secret-manager
Sigue las instrucciones en Otorga roles para un agente implementado para otorgar a la cuenta de servicio el rol de "Descriptor de acceso a secretos de Secret Manager" (
roles/secretmanager.secretAccessor
) a través de la consola de Google Cloud .Importa e inicializa el cliente en el método
.set_up
y obtén el secreto correspondiente cuando sea necesario. Como ejemplo, la siguiente plantilla es una modificación del ejemplo básico para usar una clave de API paraChatAnthropic
que se almacenó en Secret Manager:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str,
tools: Sequence[Callable],
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.secret_id = secret_id # <- new
def set_up(self):
from google.cloud import secretmanager
from langchain_anthropic import ChatAnthropic
from langgraph.prebuilt import create_react_agent
# Get the API Key from Secret Manager here.
self.secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = self.secret_manager_client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
# Use the API Key from Secret Manager here.
model = ChatAnthropic(
model_name=self.model_name,
model_kwargs={"api_key": secret_version.payload.data.decode()}, # <- new
)
self.graph = create_react_agent(model, tools=self.tools)
def query(self, **kwargs):
return self.graph.invoke(**kwargs)
Cómo controlar las credenciales
Cuando se implementa el agente, es posible que deba controlar diferentes tipos de credenciales:
- Credenciales predeterminadas de la aplicación (ADC) que suelen surgir de las cuentas de servicio
- OAuth, que suele surgir de las cuentas de usuario
- Proveedores de identidad para credenciales de cuentas externas (federación de identidades para cargas de trabajo)
Credenciales predeterminadas de la aplicación
import google.auth
credentials, project = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
Se puede usar en el código de la siguiente manera:
from typing import Callable, Sequence
class CLASS_NAME:
def __init__(
self,
model: str = "meta/llama3-405b-instruct-maas",
tools: Sequence[Callable],
location: str,
project: str,
):
self.model_name = model
self.tools = tools
self.project = project
self.endpoint = f"https://{location}-aiplatform.googleapis.com"
self.base_url = f'{self.endpoint}/v1beta1/projects/{project}/locations/{location}/endpoints/openapi'
def query(self, **kwargs):
import google.auth
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Note: the credential lives for 1 hour by default.
# After expiration, it must be refreshed.
creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds.refresh(google.auth.transport.requests.Request())
model = ChatOpenAI(
model=self.model_name,
base_url=self.base_url,
api_key=creds.token, # Use the token from the credentials here.
)
graph = create_react_agent(model, tools=self.tools)
return graph.invoke(**kwargs)
Para obtener más detalles, consulta Cómo funcionan las credenciales predeterminadas de la aplicación.
OAuth
Por lo general, las credenciales del usuario se obtienen con OAuth 2.0.
Si tienes un token de acceso (p.ej., de oauthlib
), puedes crear una instancia de google.oauth2.credentials.Credentials
. Además, si obtienes un token de actualización, también puedes especificar el URI del token y el token de actualización para permitir que las credenciales se actualicen automáticamente:
credentials = google.oauth2.credentials.Credentials(
token="ACCESS_TOKEN",
refresh_token="REFRESH_TOKEN", # Optional
token_uri="TOKEN_URI", # E.g. "https://oauth2.googleapis.com/token"
client_id="CLIENT_ID", # Optional
client_secret="CLIENT_SECRET" # Optional
)
Aquí, TOKEN_URI
, CLIENT_ID
y CLIENT_SECRET
se basan en Crea una credencial de cliente de OAuth.
Si no tienes un token de acceso, puedes usar google_auth_oauthlib.flow
para realizar el flujo de concesión de autorización de OAuth 2.0 y obtener una instancia de google.oauth2.credentials.Credentials
correspondiente:
from google.cloud import secretmanager
from google_auth_oauthlib.flow import InstalledAppFlow
import json
# Get the client config from Secret Manager here.
secret_manager_client = secretmanager.SecretManagerServiceClient()
secret_version = client.access_secret_version(request={
"name": "projects/PROJECT_ID/secrets/SECRET_ID/versions/SECRET_VERSION",
})
client_config = json.loads(secret_version.payload.data.decode())
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = InstalledAppFlow.from_client_config(
client_config,
scopes=['https://www.googleapis.com/auth/cloud-platform'],
state="OAUTH_FLOW_STATE" # from flow.authorization_url(...)
)
# You can get the credentials from the flow object.
credentials: google.oauth2.credentials.Credentials = flow.credentials
# After obtaining the credentials, you can then authorize API requests on behalf
# of the given user or service account. For example, to authorize API requests
# to vertexai services, you'll specify it in vertexai.init(credentials=)
import vertexai
vertexai.init(
project="PROJECT_ID",
location="LOCATION",
credentials=credentials, # specify the credentials here
)
Para obtener más información, consulta la documentación del módulo google_auth_oauthlib.flow
.
Proveedor de identidad
Si quieres autenticar usuarios con correo electrónico/contraseña, número de teléfono, proveedores de redes sociales como Google, Facebook o GitHub, o un mecanismo de autenticación personalizado, puedes usar Identity Platform o Firebase Authentication, o cualquier proveedor de identidad que admita OpenID Connect (OIDC).
Para obtener más detalles, consulta Acceder a los recursos desde un proveedor de identidad de OIDC.
Maneja los errores
Para garantizar que los errores de la API se muestren en un formato JSON estructurado, te recomendamos que implementes el manejo de errores en el código de tu agente con un bloque try...except
, que se puede abstraer en un decorador.
Si bien Vertex AI Agent Engine puede controlar varios códigos de estado de forma interna, Python no tiene una forma estandarizada de representar errores con códigos de estado HTTP asociados en todos los tipos de excepciones. Intentar asignar todas las excepciones posibles de Python a los estados HTTP dentro del servicio subyacente sería complejo y difícil de mantener.
Un enfoque más escalable es detectar de forma explícita las excepciones pertinentes dentro de los métodos del agente o usar un decorador reutilizable como error_wrapper
. Luego, puedes asociar códigos de estado adecuados (por ejemplo, agregando atributos code
y error
a excepciones personalizadas o controlando excepciones estándar de forma específica) y darle formato al error como un diccionario JSON para el valor de devolución. Esto requiere un cambio mínimo de código en los métodos del agente, y, a menudo, solo se requiere que agregues el decorador.
A continuación, se muestra un ejemplo de cómo puedes implementar el manejo de errores en tu agente:
from functools import wraps
import json
def error_wrapper(func):
@wraps(func) # Preserve original function metadata
def wrapper(*args, **kwargs):
try:
# Execute the original function with its arguments
return func(*args, **kwargs)
except Exception as err:
error_code = getattr(err, 'code')
error_message = getattr(err, 'error')
# Construct the error response dictionary
error_response = {
"error": {
"code": error_code,
"message": f"'{func.__name__}': {error_message}"
}
}
# Return the Python dictionary directly.
return error_response
return wrapper
# Example exception
class SessionNotFoundError(Exception):
def __init__(self, session_id, message="Session not found"):
self.code = 404
self.error = f"{message}: {session_id}"
super().__init__(self.error)
# Example Agent Class
class MyAgent:
@error_wrapper
def get_session(self, session_id: str):
# Simulate the condition where the session isn't found
raise SessionNotFoundError(session_id=session_id)
# Example Usage: Session Not Found
agent = MyAgent()
error_result = agent.get_session(session_id="nonexistent_session_123")
print(json.dumps(error_result, indent=2))
El código anterior genera el siguiente resultado:
json
{
"error": {
"code": 404,
"message": "Invocation error in 'get_session': Session not found: nonexistent_session_123"
}
}
¿Qué sigue?
- Evalúa un agente.
- Implementa un agente.
- Soluciona problemas de desarrollo de un agente.
- Obtén asistencia.