Créer un pipeline de traitement en flux continu à l'aide d'un modèle Dataflow
Ce guide de démarrage rapide explique comment créer un pipeline de traitement en flux continu à l'aide d'un modèle Dataflow fourni par Google. Plus précisément, nous utiliserons ici le modèle Pub/Sub vers BigQuery à titre d'exemple.
Le modèle Pub/Sub vers BigQuery est un pipeline de flux de données capable de lire des messages au format JSON à partir d'un sujet Pub/Sub et de les écrire dans une table BigQuery.
Pour obtenir des instructions détaillées sur cette tâche directement dans la console Google Cloud , cliquez sur Visite guidée :
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
- Créez un bucket Cloud Storage :
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- In the Set a default class section, select the following: Standard.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
- Click Create.
- Copiez les éléments suivants, car vous en aurez besoin dans une section ultérieure :
- Le nom de votre bucket Cloud Storage.
- L'ID de votre projet Google Cloud .
Pour trouver cet ID, consultez la section Identifier des projets.
Pour suivre la procédure de ce guide de démarrage rapide, votre compte utilisateur doit disposer du rôle Administrateur Dataflow et du rôle Utilisateur du compte de service. Le compte de service Compute Engine par défaut doit disposer des rôles Nœud de calcul Dataflow, Administrateur des objets de l'espace de stockage, Éditeur Pub/Sub, Éditeur de données BigQuery et Lecteur. Pour ajouter les rôles requis dans la console Google Cloud :
- Accédez à la page IAM et sélectionnez votre projet.
Accéder à IAM - Sur la ligne contenant votre compte utilisateur, cliquez sur Modifier le compte principal. Cliquez sur Ajouter un autre rôle, puis ajoutez les rôles Administrateur Dataflow et Utilisateur du compte de service.
- Cliquez sur Enregistrer.
- Sur la ligne contenant le compte de service Compute Engine par défaut (PROJECT_NUMBER-compute@developer.gserviceaccount.com), cliquez sur Modifier le compte principal.
- Cliquez sur Ajouter un autre rôle, puis ajoutez les rôles suivants : Nœud de calcul Dataflow, Administrateur des objets Storage, Éditeur Pub/Sub, Éditeur de données BigQuery et Lecteur.
Cliquez sur Enregistrer.
Pour en savoir plus sur l'attribution de rôles, consultez la page Attribuer un rôle IAM à l'aide de la console.
- Accédez à la page IAM et sélectionnez votre projet.
- Par défaut, chaque nouveau projet démarre avec un réseau par défaut.
Si le réseau par défaut de votre projet est désactivé ou a été supprimé, vous devez disposer d'un réseau dans votre projet pour lequel votre compte utilisateur dispose du Rôle d'utilisateur de réseau de Compute (
roles/compute.networkUser
).
Créer un ensemble de données et une table BigQuery
Créez un ensemble de données et une table BigQuery selon le schéma approprié à votre sujet Pub/Sub à l'aide de la console Google Cloud .
Dans cet exemple, le nom de l'ensemble de données est taxirides
et le nom de la table est realtime
. Pour créer cet ensemble de données et cette table, procédez comme suit :
- Accédez à la page BigQuery.
Accéder à BigQuery - Dans le panneau Explorateur, à côté du projet dans lequel vous souhaitez créer l'ensemble de données, cliquez sur Afficher les actions, puis cliquez sur Créer un ensemble de données.
- Dans le panneau Créer un ensemble de données, procédez comme suit :
- Dans le champ ID de l'ensemble de données, saisissez
taxirides
. Les ID des ensembles de données sont uniques pour chaque projet Google Cloud . - Pour Type d'emplacement, choisissez Multirégional, puis sélectionnez US (plusieurs régions aux États-Unis). Les ensembles de données publics sont stockés dans l'emplacement multirégional
US
. Par souci de simplicité, utilisez le même emplacement pour votre ensemble de données. - Conservez les autres paramètres par défaut, puis cliquez sur Créer un ensemble de données.
- Dans le panneau
Explorateur , développez votre projet. - À côté de votre ensemble de données
taxirides
, cliquez sur Afficher les actions, puis sur Créer une table. - Dans le panneau Créer une table, procédez comme suit :
- Dans la section Source, sous Créer une table à partir de, sélectionnez Table vide.
- Pour le champ Table de la section Destination, saisissez
realtime
. - Dans la section Schéma, cliquez sur le bouton Modifier sous forme de texte et collez la définition de schéma suivante dans la zone :
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- Dans la section Paramètres de partitionnement et de clustering, sous Partitionnement, sélectionnez le champ Code temporel.
- Ne modifiez aucun autre paramètre par défaut et cliquez sur Créer une table.
Exécuter le pipeline
Exécutez un pipeline de flux de données à l'aide du modèle Pub/Sub vers BigQuery fourni par Google. Le pipeline reçoit des données entrantes à partir du sujet d'entrée.
- Accédez à la page Tâches Dataflow.
Accéder aux tâches - Cliquez sur
Créer une tâche à partir d'un modèle . - Saisissez
taxi-data
comme nom de la tâche pour votre tâche Dataflow. - Pour Modèle Dataflow, sélectionnez le modèle Pub/Sub vers BigQuery.
- Dans le champ Table BigQuery de sortie, saisissez ce qui suit :
PROJECT_ID:taxirides.realtime
Remplacez
PROJECT_ID
par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery. - Dans la section Paramètres source facultatifs, cliquez sur Saisir le sujet manuellement pour le champ Sujet Pub/Sub d'entrée.
- Dans la boîte de dialogue, saisissez les éléments suivants comme Nom du sujet, puis cliquez sur Enregistrer :
projects/pubsub-public-data/topics/taxirides-realtime
Ce sujet Pub/Sub disponible publiquement est basé sur l'ensemble de données ouvert de la NYC Taxi & Limousine Commission. Voici un exemple de message de cet article au format JSON :
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- Dans le champ Emplacement temporaire, saisissez ce qui suit :
gs://BUCKET_NAME/temp/
Remplacez
BUCKET_NAME
par le nom de votre bucket Cloud Storage. Le dossiertemp
stocke les fichiers temporaires, comme la tâche de pipeline en préproduction. - Si votre projet ne possède pas de réseau par défaut, saisissez un réseau et un sous-réseau. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau.
- Cliquez sur Exécuter la tâche.
Afficher les résultats
Pour afficher les données écrites dans la tablerealtime
, procédez comme suit :
Accédez à la page BigQuery.
Cliquez sur
Saisir une nouvelle requête. Un onglet Éditeur s'affiche.SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
Remplacez
PROJECT_ID
par l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery. L'affichage des données dans votre tableau peut prendre jusqu'à cinq minutes.Cliquez sur Exécuter.
La requête renvoie les lignes qui ont été ajoutées à votre table au cours des dernières 24 heures. Vous pouvez également exécuter des requêtes en SQL standard.
Effectuer un nettoyage
Pour éviter que les ressources utilisées dans cette démonstration soient facturées sur votre compte Google Cloud , procédez comme suit :
Supprimer le projet
Le moyen le plus simple d'empêcher la facturation est de supprimer le projet Google Cloud que vous avez créé pour ce guide de démarrage rapide.- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Supprimer les ressources individuelles
Si vous souhaitez conserver le projet Google Cloud que vous avez utilisé dans ce guide de démarrage rapide, supprimez les ressources individuelles :
- Accédez à la page Tâches Dataflow.
Accéder aux tâches - Sélectionnez votre tâche de traitement par flux dans la liste des tâches.
- Dans la barre de navigation, cliquez sur Arrêter.
- Dans la boîte de dialogue Arrêter la tâche, annulez ou drainez votre pipeline, puis cliquez sur Arrêter la tâche.
- Accédez à la page BigQuery.
Accéder à BigQuery - Sur le panneau Explorateur, développez votre projet.
- À côté de l'ensemble de données que vous souhaitez supprimer, cliquez sur Afficher les actions, puis sur Ouvrir.
- Dans le panneau des détails, cliquez sur Supprimer l'ensemble de données, puis suivez les instructions.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.