The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto
data from a Pub/Sub subscription into a BigQuery table.
Any errors that occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
A JavaScript user-defined function (UDF) can be provided to transform data. Errors while executing
the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as
the BigQuery errors.
Before running a Dataflow pipeline for this scenario, consider whether a Pub/Sub BigQuery subscription with a UDF meets your requirements.
Pipeline requirements
- The input Pub/Sub subscription must exist.
- The schema file for the Proto records must exist on Cloud Storage.
- The output Pub/Sub topic must exist.
- The output BigQuery dataset must exist.
- If the BigQuery table exists, it must have a schema matching the proto data regardless of the
createDisposition
value.
Template parameters
Required Parameters
- protoSchemaPath (Cloud Storage Path to the Proto Schema File): Cloud Storage path to a self-contained descriptor set file. Example: gs://MyBucket/schema.pb.
schema.pb
can be generated by adding--descriptor_set_out=schema.pb
to theprotoc
command that compiles the protos. The--include_imports
flag can be used to guarantee that the file is self-contained. - fullMessageName (Full Proto Message Name): The full message name (example: package.name.MessageName). If the message is nested inside of another message, then include all messages with the '.' delimiter (example: package.name.OuterMessage.InnerMessage). 'package.name' should be from the
package
statement, not thejava_package
statement. - inputSubscription (Pub/Sub input subscription): Pub/Sub subscription to read the input from, in the format of 'projects/your-project-id/subscriptions/your-subscription-name' (Example: projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (BigQuery output table): BigQuery table location to write the output to. The name should be in the format
<project>:<dataset>.<table_name>
. The table's schema must match input objects. - outputTopic (Output Pub/Sub topic): The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name' (Example: projects/your-project-id/topics/your-topic-name).
Optional Parameters
- preserveProtoFieldNames (Preserve Proto Field Names): Flag to control whether proto field names should be kept or converted to lowerCamelCase. If the table already exists, this should be based on what matches the table's schema. Otherwise, it will determine the column names of the created table. True to preserve proto snake_case. False will convert fields to lowerCamelCase. (Default: false).
- bigQueryTableSchemaPath (BigQuery Table Schema Path): Cloud Storage path to the BigQuery schema JSON file. If this is not set, then the schema is inferred from the Proto schema. (Example: gs://MyBucket/bq_schema.json).
- udfOutputTopic (Pub/Sub output topic for UDF failures): An optional output topic to send UDF failures to. If this option is not set, then failures will be written to the same topic as the BigQuery failures. (Example: projects/your-project-id/topics/your-topic-name).
- writeDisposition (Write Disposition to use for BigQuery): BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Defaults to: WRITE_APPEND.
- createDisposition (Create Disposition to use for BigQuery): BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Defaults to: CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (Cloud Storage path to Javascript UDF source): The Cloud Storage path pattern for the JavaScript code containing your user-defined functions. (Example: gs://your-bucket/your-function.js).
- javascriptTextTransformFunctionName (UDF Javascript Function Name): The name of the function to call from your JavaScript file. Use only letters, digits, and underscores. (Example: 'transform' or 'transform_udf1').
- javascriptTextTransformReloadIntervalMinutes (JavaScript UDF auto-reload interval (minutes)): Define the interval that workers may check for JavaScript UDF changes to reload the files. Defaults to: 0.
- useStorageWriteApi (Use BigQuery Storage Write API): If true, the pipeline uses the Storage Write API when writing the data to BigQuery (see https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). The default value is false. When using Storage Write API in exactly-once mode, you must set the following parameters: "Number of streams for BigQuery Storage Write API" and "Triggering frequency in seconds for BigQuery Storage Write API". If you enable Dataflow at-least-once mode or set the useStorageWriteApiAtLeastOnce parameter to true, then you don't need to set the number of streams or the triggering frequency.
- useStorageWriteApiAtLeastOnce (Use at at-least-once semantics in BigQuery Storage Write API): This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
- numStorageWriteApiStreams (Number of streams for BigQuery Storage Write API): Number of streams defines the parallelism of the BigQueryIOโs Write transform and roughly corresponds to the number of Storage Write APIโs streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. Defaults to: 0.
- storageWriteApiTriggeringFrequencySec (Triggering frequency in seconds for BigQuery Storage Write API): Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values.
User-defined function
Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Proto to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow jobโfor example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucketโ gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucketโ gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Proto schema file (for example,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: the Proto message name (for example,package.name.MessageName
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameUNPROCESSED_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow jobโfor example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucketโ gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucketโ gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Proto schema file (for example,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: the Proto message name (for example,package.name.MessageName
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameUNPROCESSED_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.