Vorlage "Pub/Sub-Abo fรผr BigQuery"

Die Vorlage "Pub/Sub-Abo fรผr BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie kรถnnen die Vorlage als schnelle Lรถsung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Bevor Sie eine Dataflow-Pipeline fรผr dieses Szenario ausfรผhren, sollten Sie prรผfen, ob ein Pub/Sub-BigQuery-Abo mit einer nutzerdefinierten Funktion Ihren Anforderungen entspricht.

Pipelineanforderungen

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise kรถnnen Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefรผgt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausfรผhren. Das Tabellenschema muss mit den JSON-Eingabeobjekten รผbereinstimmen.

Vorlagenparameter

Erforderliche Parameter

  • outputTableSpec: Der Speicherort der BigQuery-Ausgabetabelle im Format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Optionale Parameter

  • outputDeadletterTable: Die BigQuery-Tabelle, die fรผr Nachrichten verwendet werden soll, die die Ausgabetabelle nicht erreichen, im Format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Wenn die Tabelle nicht vorhanden ist, wird sie wรคhrend der Pipelineausfรผhrung erstellt. Falls nicht angegeben, wird OUTPUT_TABLE_SPEC_error_records verwendet.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele fรผr JavaScript-UDFs finden Sie unter โ€žUDF-Beispieleโ€œ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Definieren Sie das Intervall, in dem die Worker mรถglicherweise nach JavaScript-UDF-ร„nderungen suchen, um die Dateien neu zu laden. Die Standardeinstellung ist 0.

Benutzerdefinierte Funktion

Optional kรถnnen Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF fรผr jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen fรผr Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle รผbereinstimmt.
  • Fรผhren Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellenโ€œ
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wรคhlen Sie fรผr Regionaler Endpunkt einen Wert aus dem Drop-down-Menรผ aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausfรผhren kรถnnen, finden Sie unter Dataflow-Standorte.

    5. Wรคhlen Sie im Drop-down-Menรผ Dataflow-Vorlage die Option the Pub/Sub Subscription to BigQuery templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Optional: Wรคhlen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
    8. Klicken Sie auf Job ausfรผhren.

    gcloud

    Fรผhren Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Ersetzen Sie Folgendes:

    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen mรถchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden mรถchten

      Sie kรถnnen die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten รผbergeordneten Ordner im Bucket verfรผgbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten รผbergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort fรผr das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszufรผhren. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausfรผhren mรถchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen mรถchten, z. B. us-central1
    • VERSION: Die Version der Vorlage, die Sie verwenden mรถchten

      Sie kรถnnen die folgenden Werte verwenden:

      • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten รผbergeordneten Ordner im Bucket verfรผgbar ist: gs://dataflow-templates-REGION_NAME/latest/
      • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten รผbergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
    • STAGING_LOCATION: der Speicherort fรผr das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
    • DATASET: Ihr BigQuery-Dataset
    • TABLE_NAME: Ihr BigQuery-Tabellenname

    Nรคchste Schritte