Streaming data to a BigQuery table with GCP

Practicing DatScy
7 min readAug 9, 2023

My obsession with BigQuery continues! BigQuery is very useful in terms of having a centralized location of structured data; ingestion on GCP is wonderful using the ‘bq load’ command line tool for uploading local .csv files automatically, similar to other cloud platforms. PubSub and Dataflow are solutions for storing newly created data from website/application activity, in either BigQuery or Google Cloud Storage. This post shows how to setup the basics of GCP data streaming to a BigQuery table.

An outline of which items to use to save data from a website URL.

Specify locations and default parameters

Below is a list of required parameters to set for creating a Dataflow Schedule to stream data from a url. I live in a Region where Dataflow Scheduler jobs creation is not possible, so I organized the Dataflow workflow using two locations: location_where_DATAFLOW_is_possible, location_closest_to_my_physical_location.

# Select a Location for DataFlow
export DATAFLOW_REGION=$(echo "location_where_DATAFLOW_is_possible")

# Locations for Project and BigQuery (dataset, Table): Set region or location
export LOCATION=$(echo "location_closest_to_my_physical_location")

export ZONE_outside=$(echo "location_closest_to_my_physical_location-a")
export ZONE_dataflow=$(echo "location_where_DATAFLOW_is_possible-a")

export REGION_outside=$(echo "location_closest_to_my_physical_location")
export REGION_dataflow=$(echo "location_where_DATAFLOW_is_possible")

# ---------------------------------------------

# Required default parameters
# Set project
export PROJECT_ID=$(echo "XXXXXXXX")
gcloud config set project $PROJECT_ID

You can check the available Dataflow regions using the command:

# List the region
gcloud compute regions list

Note, that some of the Dataflow regions do not function, if the Dataflow region does not function it will return an error for the “gcloud scheduler jobs create pubsub” command.

Enable API Services

# IAM
gcloud services enable iam.googleapis.com

# Dataflow
gcloud services enable dataflow.googleapis.com

# Compute Engine
gcloud services enable compute.googleapis.com

# Logging
gcloud services enable logging.googleapis.com

# Cloud Storage
gcloud services enable storage-component.googleapis.com

# ****** For Streaming to BigQuery ******
gcloud services enable bigquery.googleapis.com

# Pub/Sub
gcloud services enable pubsub.googleapis.com

# Resource Manager
gcloud services enable cloudresourcemanager.googleapis.com

# Cloud Scheduler
gcloud services enable cloudscheduler.googleapis.com

# Cloud Run API
gcloud services enable run.googleapis.com

# ****** For Streaming to Cloud Storage ******
gcloud services enable storage-api.googleapis.com
# List the services the current project has enabled
gcloud services list --enabled

Create a Service Account and grant the Service Account permission to use resources

A Service Account is an account used by a GCP resource or application to obtain permission to use GCP resources associated with the service account; in other words a service account is used to allow GCP resources to share information between themselves because they can not directly share information. For example, PubSub can not directly send information to Cloudscheduler using a Cloudscheduler.admin role under a PubSub resource, it must use the service account to send the information. A service account is also associated with a unique user email address. In this case, we create a service account that has permission to use the following GCP resources: Dataflow, PubSub, Storage, BigQuery, Cloudscheduler, Cloud Run, and Compute. This means that these resources, under this specific project, can share information with each other using the service account credentials.

# ---------------------------------------------
# Give permission to the Project to use PubSub
# ---------------------------------------------
export SERVICE_ACCOUNT_ID=$(echo "dataflow-practice") # select a name
export SERVICE_ACCOUNT_EMAIL=$(echo "XXXXXX@gmail.com")
export SERVICE_ACCOUNT=$SERVICE_ACCOUNT_ID@$PROJECT_ID.iam.gserviceaccount.com

# [0] Create a custom service account (ONLY HAVE TO DO ONCE)
gcloud iam service-accounts create $SERVICE_ACCOUNT_ID --description="Creating a service account for Pub Sub" --display-name="Pub Sub practice"


# [1] Then, grant access to the existing service account

# dataflow.worker - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/dataflow.worker"

# dataflow.admin - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/dataflow.admin"

# dataflow.admin - user(may not be necessary)
gcloud projects add-iam-policy-binding $PROJECT_ID --member="user:"$SERVICE_ACCOUNT_EMAIL --role='roles/pubsub.admin'

# ------------------

# pubsub.admin - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/pubsub.admin"

# pubsub.admin - user (may not be necessary)
gcloud projects add-iam-policy-binding $PROJECT_ID --member="user:"$SERVICE_ACCOUNT_EMAIL --role='roles/pubsub.admin'

# ------------------

# storage.objectAdmin - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/storage.objectAdmin"

# bigquery.dataEditor - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/bigquery.dataEditor"

# ------------------

# cloudscheduler.admin - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/cloudscheduler.admin"

# cloudscheduler.admin - user (may not be necessary)
gcloud projects add-iam-policy-binding $PROJECT_ID --member="user:"$SERVICE_ACCOUNT_EMAIL --role='roles/cloudscheduler.admin'

# ------------------

# run.admin [Cloud Run Job] - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/run.admin"

# ------------------

# compute.admin - serviceaccount
gcloud projects add-iam-policy-binding $PROJECT_ID --member=serviceAccount:$SERVICE_ACCOUNT --role="roles/compute.admin"
# List IAM permission roles for the Project 
gcloud projects get-iam-policy $PROJECT_ID --flatten="bindings[].members" --format='table(bindings.role)' --filter="bindings.members:"$SERVICE_ACCOUNT_ID

If the permissions are added correctly, the output should be:

ROLE
roles/bigquery.dataEditor
roles/cloudscheduler.admin
roles/compute.admin
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.admin
roles/pubsub.editor
roles/run.admin
roles/storage.objectAdmin

Create a Bucket

The bucket name is needed in the Dataflow Scheduler pipeline command, however nothing is saved to the bucket if you specify that the data is saved in the BigQuery table.

export BUCKET_NAME=$(echo "the_bucket0")
export STORAGE_CLASS=$(echo "STANDARD")
gcloud storage buckets create gs://$BUCKET_NAME --project=$PROJECT_ID \
--default-storage-class=$STORAGE_CLASS \
--location=$LOCATION \
--uniform-bucket-level-access

Create a BigQuery dataset and table

# Create a BigQuery dataset in the project
export dataset_name=$(echo "pubsub_dataset") # select a name
bq --location=$LOCATION mk $PROJECT_ID:$dataset_name

# Create an empty table in the dataset
export TABLE_name=$(echo "pubsub_table")# select a name
bq mk --table $PROJECT_ID:$dataset_name.$TABLE_name url:STRING,review:STRING

Create and run Dataflow Scheduler jobs

The Dataflow Scheduler is used to route topic information from the url for each expected user response; in this example the app, due to user response, sends schema fields url and review.

The Dataflow Scheduler can also sort certain logged information to an appropriate database (ie: incorrectly logged data, positive review, negative review). The code below allows one to send two types of data to a topic; I guess there must be two buttons/inputs on the https://beam.apache.org/ site where on can push/input “positive” or “negative”. I looked on the https://beam.apache.org/ site, additionally using the web developer view, and I did not see any thing related to “positive” or “negative”; I do not know where this data comes from.

# [a] create a publisher for "positive ratings", as quick as possible
gcloud scheduler jobs create pubsub positive-ratings-publisher --schedule="* * * * *" --location=$DATAFLOW_REGION --topic=$TOPIC_ID --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'

# [b] start the cloud scheduler Job:positive
gcloud scheduler jobs run --location=$DATAFLOW_REGION positive-ratings-publisher

# [c] create a publisher for "negative ratings", as quick as possible
gcloud scheduler jobs create pubsub negative-ratings-publisher --schedule="* * * * *" --location=$DATAFLOW_REGION --topic=$TOPIC_ID --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'

# [d] start the cloud scheduler Job:negative
gcloud scheduler jobs run --location=$DATAFLOW_REGION negative-ratings-publisher

Run the Dataflow Scheduler pipeline

The Dataflow jobs Run allows one to run the Scheduler jobs using real-time streaming. I used the basic dataflow templates that Google offers, meaning it transfers the application data to BigQuery data by matching schema. Somewhere in the application it has schema assigned as [url, review], that can be matched to the BigQuery table schema [url, review]. In reference [1], you can also make your own secondary javascript template to filter streaming values, to separate erroneous data from reliable data.

export JOB_NAME=$(echo "send_pubsub_data2bigquery")
export WORKER_REGION=$(echo $REGION)
export WORKER_ZONE=$(echo $ZONE)

gcloud dataflow jobs run $JOB_NAME \
--gcs-location gs://dataflow-templates-$DATAFLOW_REGION/latest/PubSub_to_BigQuery \
--region $DATAFLOW_REGION \
--staging-location gs://$BUCKET_NAME/temp \
--parameters inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID,outputTableSpec=$PROJECT_ID:$dataset_name.$TABLE_name \
--num-workers=1 \
--worker-machine-type=$WORKER_MACHINE_TYPE \
--worker-region=$WORKER_REGION

# OR

# Code for adding an optional secondary filtering javascript template
gcloud dataflow jobs run $JOB_NAME \
--gcs-location gs://dataflow-templates-$DATAFLOW_REGION/latest/PubSub_to_BigQuery \
--region $DATAFLOW_REGION \
--staging-location gs://$BUCKET_NAME/temp \
--parameters inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID,outputTableSpec=$PROJECT_ID:$dataset_name.$TABLE_name,javascriptTextTransformGcsPath=gs://$BUCKET_NAME/dataflow_udf_transform.js,javascriptTextTransformFunctionName=process \
--num-workers=1 \
--worker-machine-type=$WORKER_MACHINE_TYPE\
--worker-region=$WORKER_REGION

I found that the key parameters to set for the code above were “num-workers” and “worker-region” because without setting these parameters the job would fail, it would say ‘unavailable resources’ and not be able to assign workers to complete the jobs. When I specified the “num-workers” and “worker-region” I could successfully run the code.

View accumulated data in the BigQuery table

After running the “gcloud dataflow jobs run”, the following table values accumulated!

Understanding of workflow

I find that this streaming workflow is interesting because it allows one to obtain application (app) data (ie: questionnaire responses, user interaction per page) and organize it directly into a database.

As someone who has little experience in app design, I found it unfortunate that reference [1] did not explain or give references for how the data is sent from https://beam.apache.org/ such that readers could have confidence that the data source and data are correct. I can make html/javascript webpages, code in javascript/html, I used python flask, streamlite, deployed google script coded figures to http endpoints, used numerous visualization/app programs like Tableau/Power BI/Looker, and made many API endpoints. However, I still lack fundamental understanding of how dynamic websites function.

By performing several tests, I tried to understand how the data is sent from https://beam.apache.org/ and received by Google, to have more confidence about where the data came from. I did the following test:

  1. As a dumb test, I changed the url to my github website, and reran the dataflow job and it does not record any data as I assumed.
  2. I removed the negative-ratings-publisher and only ran [a] and [b] under ‘Create and run Dataflow Scheduler jobs’, the data could not stream to the BigQuery table (there was no data in the table), but I could see that data was available in the topic using the command below.
gcloud pubsub subscriptions pull pubsub_subscription_id0 - auto-ack

# ┌───────────────────────────────────────────────────────────┬──────────────────┬──────────────┬────────────┬──────────────────┬────────────┐
# │ DATA │ MESSAGE_ID │ ORDERING_KEY │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_STATUS │
# ├───────────────────────────────────────────────────────────┼──────────────────┼──────────────┼────────────┼──────────────────┼────────────┤
# │ {"url": "https://beam.apache.org/", "review": "positive"} │ 8039289608009670 │ │ │ │ SUCCESS │
# └───────────────────────────────────────────────────────────┴──────────────────┴──────────────┴────────────┴──────────────────┴────────────┘

Also, I could send information to the BigQuery table using the command below.

gcloud pubsub topics publish $TOPIC_ID --message='{"url": "https://beam.apache.org/", "review": "positive"}'

These tests show that the program running on https://beam.apache.org/ does send data constantly and it can be received by the gcloud pubsub topic. However, it appears that Google requires that there are at least two publishers for it to save data to a BigQuery table.

References

  1. Stream from Pub/Sub to BigQuery: https://cloud.google.com/dataflow/docs/tutorials/dataflow-stream-to-bigquery

Happy practicing! 👋

I hope that this will give readers some experience in Dataflow. And hopefully in a future post I will practice dynamic websites!

🎁 Donate | 💻 GitHub | 🔔 Subscribe

WRITER at MLearning.ai

--

--

Practicing DatScy

Practicing coding, Data Science, and research ideas. Blog brand: Use logic in a clam space, like a forest, and use reliable Data Science workflows!