Optimizing GCP Dataflow Pipelines with Flex Templates

Learn how to optimize GCP Dataflow pipelines using Flex Templates. Practical tips for parallel processing, custom I/O connectors, and Apache Beam best practices from real production experience.

In today’s world, where everything is largely driven by data, efficient ETL data pipelines are essential to process large volumes of data quickly. GCP Dataflow and Apache Beam provide a powerful tool to facilitate building such pipelines.

The SZNS team recently worked with GCP Dataflow in a project that involved moving large sets of data, performing transformations, and outputting the results into a database. Through that experience, I encountered several learnings that I wanted to share.

In this blog, we will go over best practices and tips for optimizing data pipelines using Apache Beam and Dataflow. We will focus specifically on Flex templates, packaging dependencies and optimizing parallel processing--crucial components in ensuring your pipeline runs in a scalable and repeatable environment.

Foundational Concepts to Understand

Before diving into pipeline development, it would be helpful to get familiar with Apache Beam’s core concepts. Understanding these core concepts of Apache Beam will help you write more efficient pipelines since Dataflow is Google Cloud's implementation of the open-source Apache Beam project:

PCollections - Data set that your Beam pipeline operates on

PTransforms - Represents a data processing operation, or a step, in your pipeline

ParDo - Beam transform for generic parallel processing and considers each element in a collection

DoFn - When using Beam, often the most important pieces of code you’ll write are DoFns because they are what define your pipeline’s data processing tasks

Inside the DoFn you need to define key processing dunctions like setup, start_bundle, process, and finish_bundle. These functions define how data is processed at different stages in your pipeline. For example, setup() prepares an instance for processing a batch of elements before processing bundle of elements, so this might be a good place to connect to database instances or other resources. And inprocess() you can apply processing logic to each of the elements Inside your DoFn subclass.

The Apache Beam Programming Guide can be referred to here for more details.

Learning #1: Choosing the Right Template / Packaging an Apache Beam Project for GCP

First is choosing the right template for your Dataflow pipeline. Dataflow templates allow you to package a Dataflow pipeline for deployment. If you’re creating a new Dataflow template, Google recommends using Flex templates. With Flex templates, your data pipelines are packaged as Docker images which can be stored in and deployed from GCP's Artifact Registry. When you start a Dataflow job, a VM instance is launched and the VM pulls the Docker image specified in the template to run your pipeline.

Running custom Dataflow jobs that are not supported by GCP’s provided template jobs means you will need to package up your code, dependencies, and containerize it. This step can be a bit tricky, especially if you’re working with multiple files and Python dependencies. In our case, we wanted to organize our code into multiple files and wanted to import code from helper/util files.

It is important to note that you will need to use a setup.py file to achieve this. I learned this the hard way—most examples in Google’s documentation use a single file with just a requirements.txt, which works fine for simple setups. The setup.py ensures that your code is treated as a package when the Docker container is built. Without it, you might run into import errors when trying to reference code in other files.

Here is an example of the folder structure for our dataflow pipeline

dataflow_project/
│
├── Dockerfile
├── [setup.py](<http://setup.py/>)
├── requirements.txt
├── dataflow_pipeline/
│   ├── [main.py](<http://main.py/>)               # Main pipeline file
│   ├── utils/
│   │   ├── helpers.py   
│   │   └── utils.py   

We also need to set these two environment variables when containerizing our pipeline:

  • FLEX_TEMPLATE_PYTHON_PY_FILE: Specifies which Python file to run to launch the Flex Template.
  • FLEX_TEMPLATE_PYTHON_SETUP_FILE: Specifies the path to the pipeline package setup.py file.

Your Flex Template Docker file might include content similar to the following example:

FROM gcr.io/dataflow-templates-base/python311-template-launcher-base:latest as template_launcher
FROM apache/beam_python3.10_sdk:2.59.0

COPY --from=template_launcher /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher
WORKDIR /pipeline
COPY requirements.txt /pipeline/

ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="/pipeline/setup.py"
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"
COPY . /pipeline/
RUN pip install .
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/pipeline/dataflow_jobs/hello_world/main.py"

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

Learning #2: Design for Parallel Processing

Now that we’ve covered deployment, let’s dive into one of the most important concepts when building your pipeline, parallel processing. Another core concept we need to consider when building our pipeline is thinking about how Apache Beam will process your elements/data. It is important to note Dataflow does not guarantee the order of elements processed. By default, Apache Beam prioritizes processing elements in parallel, which makes it difficult to perform sequential actions like “assign a sequence number to each element in a PCollection”. This is intentional because these types of actions would cause scalability problems because Beam would be unable to parallelize such actions when forced to process elements in order.

Dataflow will try to process the elements in parallel and split into groups that needs to be processed. Each worker will process batches of elements assigned to the worker, but these steps are not always in order.

Now let’s look at an example where parallelism is constrained. In one of our projects, we faced a situation where we needed previous elements to be inserted into a database before processing the next set of elements. This dependency on the order of operations limited Apache Beam's ability to parallelize effectively because each new element was dependent on the previous one already being processed and stored.

import apache_beam as beam

class ProcessInOrderFn(beam.DoFn):
    def __init__(self, db_client):
        self.db_client = db_client

    def process(self, element):
        # Insert the element into the database first
        if self.operation == "insert_element":
		        self.db_client = db_client.insert_db()
            yield element
        # Second operation is called in a later pipeline step after all the inserts
        elif self.operation == "calculate_total":
            yield self.calculate_balance(element)

with beam.Pipeline() as p:
    # Read data from bq and order the elements by field
    grouped_data = (
        p
        | 'Read Data' >> beam.io.ReadFromBigquery()
        | 'Group by Field' >> beam.GroupByKey()
        | 'Sort by Order Field' >> beam.FlatMap(
            lambda group: sorted(group, key=lambda x: x['order_by_field'])
        )
    )
    # Process the sorted elements in order
    result = (
        grouped_data
        | 'Process in Order' >> beam.ParDo(ProcessInOrderFn(db_client, operation="insert_element"))
    )
		# Note that this takes in the "result" output from the previous step and is dependent on it
    final_result = (
        result
        | 'Process in Order' >> beam.ParDo(ProcessInOrderFn(db_client, operation="calculate_total"))
        | 'Write Output' >> beam.io.WriteToText('gs://my-bucket/output_balance.txt')
    )

Because each step (insert_element and calculate_total operation) in this example depends on the completion of the previous step, parallel processing cannot be utilized effectively. The pipeline is forced to process elements one by one in the order defined by the order_field. This ensures data consistency but comes at the cost of reduced performance due to the lack of parallelism.

While we want to maximize parallel processing capabilities of Apache Beam, one key thing that we learned while developing a pipeline is that it's important to balance the parallelism of the job. Too little parallelism can make the job slow. Data will build up in the source, causing “wall time” increase, which slows down the entire pipeline. On the other hand, too much parallelism can overwhelm your data sink with too many requests, causing issues like rate limits.

To help facilitate this, Dataflow allows you to set the number of workers for each job. Each worker spins up an additional VM for use by the dataflow pipeline.

Another important concept to know is that Dataflow jobs try to parallelize elements even processed on a single worker. So even if you have the num_workers set to 1, the single worker will try to parallelize processing to try to speed up the jobs. This means that setting the worker count to 1 does not guarantee elements will be processed in order.

If you need to process elements in a specific order, you can use techniques ordering or grouping the elements and sorting them and making sure the pipeline steps are sequential. But that limits Dataflow’s true potential and we should only use strict ordering if it's necessary. Otherwise, Dataflow will significantly slow down.

Learning #3 Building Custom I/O Connectors

Apache Beam comes with built-in connectors for a wide array of commonly used data sources, such as BigQuery, Cloud Storage, Pub/Sub, and more. These connectors allow developers to streamline integration with popular data storage and messaging systems without extensive setup. You can explore the comprehensive list of supported connectors here.

There are scenarios, however, where you may need to connect to a data source or sink that is not supported natively by Beam’s library. This is especially relevant for unique, legacy, or proprietary data systems that lack native support or where you need enhanced control over how data is processed. Examples include:

  • Connecting to databases or APIs with specific authentication or performance requirements.
  • Executing complex SQL queries.
  • Integrating with data platforms that do not have already have a public interface.

This example initializes the connection in setup, executes a query, yields each row as a pipeline element, and closes the connection in teardown. For our use case, we wanted to use IAM authentication for secure access to our database, which was not yet supported by Beam’s built-in I/O connectors. This limitation led us to build a custom I/O connector, allowing us to securely connect to Cloud SQL using IAM authentication

class MySQLConnector(beam.DoFn):
    def __init__(self, instance_connection_name, db_user, db_name, use_iam, private_ip):
        self.instance_connection_name = instance_connection_name
        self.db_user = db_user
        self.db_name = db_name
        self.use_iam = use_iam
        self.mysql_client = None

    def setup(self):
        from dataflow_jobs.clients.mysql import MySQLClient
        self.mysql_client = MySQLClient(
            instance_connection_name=self.instance_connection_name,
            user=self.db_user,
            database=self.db_name,
            use_iam=self.use_iam,
            private_ip=self.private_ip,
        )
        self.mysql_client.connect()

    def process(self, batch):
        import logging
        self.mysql_client.insert_records("test_db", batch)

    def teardown(self):
        if self.mysql_client:
            self.mysql_client.close()

Additional Learnings:

1. Dataflow Console Visualization

Once your dataflow job is up and running, its graph will be automatically generated. You can track the progress of your jobs in the Dataflow console, which helps visualize stages and identify bottlenecks.

Here’s an example of what the Dataflow job graph might look like:

2. Local Testing with Direct Runner

For local testing of your pipeline, you can use the Apache Beam Direct Runner. It’s useful for testing the pipeline logic without having to deploy it to GCP every time to test and you can simply replace --runner=DirectRunner with --runner=DataflowRunner and it will kick off a job in GCP.

python3 dataflow_jobs/hello_world/main.py \\
--runner=DirectRunner \\
--project="my-gcp-project" \\
--region="us-central1" \\
--temp_location="gs://sui-common-data/temp"

Conclusion

Optimizing data pipelines with GCP Dataflow and Flex Templates offers powerful capabilities for processing large volumes of data efficiently. Key takeaways from our experience include:

  • Choosing the right template and properly packaging your Apache Beam project is crucial for smooth deployment.
  • Understanding and leveraging parallel processing is essential for maximizing pipeline performance, but it's important to balance parallelism to avoid overwhelming data sinks.
  • Local testing with the Direct Runner is invaluable for iterating on pipeline logic before deployment.
  • Familiarity with Apache Beam's core concepts, such as DoFn and its lifecycle methods, is fundamental to building effective pipelines.

If you found this post helpful, give us a shoutout by tagging us on LinkedIn or shooting us an email at engineering@szns.solutions! Your feedback and questions are always welcome!

References

Here are a few resources to help you get started with Apache Beam and GCP Dataflow

Basics of the Beam model
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes.

Apache Beam Core Concepts

I/O Connectors
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes.

List of Supported Apache Beam I/O Connectors

Beam Programming Guide
Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes.

Apache Beam Getting Started Programming Guide

Dataflow overview | Google Cloud

GCP Dataflow Overview

Build and run a Flex Template | Cloud Dataflow | Google Cloud
This tutorial shows how to build and run a Flex Template in Dataflow.

GCP Flex Templates