Work with Dataflow data pipelines  |  Google Cloud (2024)

Stay organized with collections Save and categorize content based on your preferences.

Overview

You can use Dataflow data pipelines for the following tasks:

  • Create recurrent job schedules.
  • Understand where resources are spent over multiple job executions.
  • Define and manage data freshness objectives.
  • Drill down into individual pipeline stages to fix and optimize yourpipelines.

For API documentation, see the Data Pipelines reference.

Features

  • Create a recurring batch pipeline to run a batch job on a schedule.
  • Create a recurring incremental batch pipeline to run a batch job against thelatest version of input data.
  • Use the pipeline summary scorecard to view the aggregated capacityusage and resource consumption of a pipeline.
  • View the data freshness of a streaming pipeline. This metric, which evolves overtime, can be tied to an alert that notifies you when freshness falls lower than aspecified objective.
  • Use pipeline metric graphs to compare batch pipeline jobs and findanomalies.

Limitations

  • Regional availability: You can create data pipelines inavailable Cloud Scheduler regions.

  • Quota:

    • Default number of pipelines per project: 500
    • Default number of pipelines per organization: 2500

      The organization level quota is disabled by default. You can opt-in toorganization level quotas, and if you do so, each organization can have atmost 2500 pipelines by default.

  • Labels: You can't useuser-defined labels tolabel Dataflow data pipelines.However, when you use theadditionalUserLabelsfield, those values are passed through to your Dataflow job.For more information about how labels apply to individualDataflow jobs, seePipeline options.

Types of data pipelines

Dataflow has two data pipeline types, streaming and batch.Both types of pipeline run jobs that are defined in Dataflowtemplates.

Streaming data pipeline
A streaming data pipeline runs a Dataflow streamingjob immediately after it's created.
Batch data pipeline

A batch data pipeline runs a Dataflow batchjob on a user-defined schedule. The batch pipeline input filename canbe parameterized to allow for incremental batch pipeline processing.

Incremental batch pipelines

You can use datetime placeholders to specify an incremental input fileformat for a batch pipeline.

  • Placeholders for year, month, date, hour, minute, and second can be used, andmust follow thestrftime() format.Placeholders are preceded by the percentage symbol (%).
  • Parameter formatting is not verified during pipeline creation.
    • Example: If you specify "gs://bucket/Y" as the parameterized input path,it's evaluated as "gs://bucket/Y", because "Y" without a preceding "%"does not map to the strftime() format.

At each scheduled batch pipeline execution time, the placeholderportion of the input path is evaluated to the current (ortime-shifted) datetime. Date valuesare evaluated using the current date in the time zone of the scheduled job.If the evaluated path matches the path of an input file, the file ispicked up for processing by the batch pipeline at the scheduled time.

  • Example: A batch pipeline is scheduled to repeat at the start of each hourPST. If you parameterize the input path asgs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv,on April 15, 2021, 6PM PST, the input path is evaluated togs://bucket-name/2021-04-15/prefix-18_00.csv.

Use time shift parameters

You can use + or - minute or hour time shift parameters.To support matching an input path with an evaluated datetime that isshifted before or after the current datetime of the pipeline schedule,enclose these parameters in curly braces.Use the format {[+|-][0-9]+[m|h]}. The batch pipeline continues to repeat at itsscheduled time, but the input path is evaluated with the specifiedtime offset.

  • Example: A batch pipeline is scheduled to repeat at the start of each hourPST. If you parameterize the input path asgs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h},on April 15, 2021, 6PM PST, the input path is evaluated togs://bucket-name/2021-04-15/prefix-16_00.csv.

Data pipeline roles

For Dataflow data pipeline operations to succeed, you need thenecessary IAM roles, as follows:

  1. You need the appropriate role to perform operations:

  2. The service account used by Cloud Scheduler needs to have the roles/iam.serviceAccountUser role, whether the service account is user-specified or the default Compute Engine service account. For more information, see Data pipeline roles.

  3. You need to be able to act as the service account used by Cloud Scheduler andDataflow by being granted theroles/iam.serviceAccountUserrole on that account. If you don't select a service account forCloud Scheduler and Dataflow, thedefault Compute Engine service accountis used.

Create a data pipeline

You can create a Dataflow data pipeline in two ways:

  1. Import a job, or
  2. Create a data pipeline

The data pipelines setup page: When you first access the Dataflowpipelines feature in the Google Cloud console, a setup page opens. Enable thelisted APIs to create data pipelines.

Import a job

You can import a Dataflow batch or streaming job that is based on aclassic or flex templateand make it a data pipeline.

  1. In the Google Cloud console, go to the Dataflow Jobspage.

    Go to Jobs

  2. Select a completed job, then on the Job Detailspage, select +Import as a pipeline.

  3. On the Create pipeline from template page, the parameters are populatedwith the options of the imported job.

  4. For a batch job, in the Schedule your pipeline section,provide a recurrence schedule. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If it's not specified, the default Compute Engine service account is used.

Create a data pipeline

  1. In the Google Cloud console, go to the DataflowData pipelines page.

    Go to Data pipelines

  2. Select +Create data pipeline.

  3. On the Create pipeline from template page, provide a pipeline name, and fill in the other templateselection and parameter fields.

  4. For a batch job, in the Schedule your pipeline section,provide a recurrence schedule. Providing an email account address for the Cloud Scheduler, which is used to schedule batch runs, is optional. If a value is not specified, the default Compute Engine service account is used.

Create a batch data pipeline

To create this sample batch data pipeline, you musthave access to the following resources in your project:

This example pipeline uses theCloud Storage Text to BigQuerybatch pipeline template. This template reads files in CSV format fromCloud Storage, runs a transform, then inserts values intoa BigQuery table with three columns.

  1. Create the following files on your local drive:

    1. A bq_three_column_table.json file that contains the following schemaof the destination BigQuery table.

      { "BigQuery Schema": [ { "name": "col1", "type": "STRING" }, { "name": "col2", "type": "STRING" }, { "name": "col3", "type": "INT64" } ]}
    2. A split_csv_3cols.js JavaScript file, which implements asimple transformation on the input data before insertion into BigQuery.

      function transform(line) { var values = line.split(','); var obj = new Object(); obj.col1 = values[0]; obj.col2 = values[1]; obj.col3 = values[2]; var jsonString = JSON.stringify(obj); return jsonString;}
    3. A file01.csv CSV file with several records that are inserted into theBigQuery table.

      b8e5087a,74,275317a52c051,4a,25846672de80f,cd,76981111b92bf,2e,104653ff658424,f0,149364e6c17c75,84,38840833f5a69,8f,76892d8c833ff,7d,2013867d3da7fb,d5,819193836d29b,70,181524ca66e6e5,d7,172076c8475eb6,03,247282558294df,f3,155392737b82a8,c7,23552382c8f5dc,35,46803957ab17f9,5e,480350cbcdaf84,bd,35412752b55391,eb,423078825b8863,62,8816026f16d4f,fd,397783
  2. Use the gcloud storage cp command to copy the files to folders ina Cloud Storage bucket in your project, as follows:

    1. Copy bq_three_column_table.json and split_csv_3cols.js togs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Copy file01.csv to gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. In the Google Cloud console, go to the Cloud Storage Bucketspage.

    Go to Buckets

  4. To create a tmp folder in your Cloud Storage bucket,select your folder name to open the Bucket details page,then click Create folder.

    Work with Dataflow data pipelines | Google Cloud (3)

  5. In the Google Cloud console, go to the DataflowData pipelines page.

    Go to Data pipelines

  6. Select Create data pipeline. Enter or select the following itemson the Create pipeline from template page:

    1. For Pipeline name, enter text_to_bq_batch_data_pipeline.
    2. For Regional endpoint, select a Compute Engine region.The source and destination regions must match. Therefore, yourCloud Storage bucket and BigQuery table must be in the same region.
    3. For Dataflow template, in Process Data in Bulk (batch), selectText Files on Cloud Storage to BigQuery.

    4. For Schedule your pipeline, select a schedule, such as Hourly at minute 25,in your timezone. You can edit the schedule after you submit the pipeline.Providing an email account address for the Cloud Scheduler,which is used to schedule batch runs, is optional. If it's notspecified, thedefault Compute Engine service accountis used.

    5. In Required parameters, enter the following:

      1. For JavaScript UDF path in Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
      2. For JSON path:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
      3. For JavaScript UDF name: transform
      4. For BigQuery output table:
        PROJECT_ID:DATASET_ID.three_column_table
      5. For Cloud Storage input path:
        BUCKET_ID/inputs/file01.csv
      6. For Temporary BigQuery directory:
        BUCKET_ID/tmp
      7. For Temporary location:
        BUCKET_ID/tmp
    6. Click Create pipeline.

  7. Confirm pipeline and template information and viewcurrent and previous history from the Pipeline details page.

    Work with Dataflow data pipelines | Google Cloud (4)

You can also run a batch pipeline on demand using the Run button in the Dataflow Pipelines console.

Create a sample streaming data pipeline

You can create a sample streaming data pipeline by following thesample batch pipeline instructions,with the following differences:

  • For Pipeline schedule, don't specify a schedule for a streamingdata pipeline. The Dataflow streaming job is started immediately.
  • For Dataflow template, in Process Data Continuously (stream), selectText Files on Cloud Storage to BigQuery.
  • For Worker machine type, the pipeline processes the initial set offiles matching the gs://BUCKET_ID/inputs/file01.csv pattern andany additional files matching this pattern that you upload tothe inputs/ folder. If the size of CSV files exceeds several GB,to avoid possible out-of-memory errors, select a machine type with higher memorythan the default n1-standard-4 machine type, such as n1-highmem-8.

Troubleshooting

This section shows you how to resolve issues with Dataflow datapipelines.

Data pipeline job fails to launch

When you use data pipelines to create a recurring job schedule, yourDataflow job might not launch, and a 503 status error appears inthe Cloud Scheduler log files.

This issue occurs when Dataflow is temporarily unable to run the job.

To work around this issue, configure Cloud Scheduler to retry the job. Becausethe issue is temporary, when the job is retried, it might succeed. Formore information about setting retry values in Cloud Scheduler, seeCreate a job.

Investigate pipeline objectives violations

The following sections describe how to investigate pipelines that don't meetperformance objectives.

Recurring batch pipelines

For an initial analysis of the health of your pipeline,on the Pipeline info page in the Google Cloud console, use theIndividual job status and Thread time per step graphs.These graphs are located in the pipeline status panel.

Example investigation:

  1. You have a recurring batch pipeline that runs every hour at 3 minutes pastthe hour. Each job normally runs for approximately 9 minutes.You have an objective for all jobs to complete in less than 10 minutes.

  2. The job status graph shows that a job ran for more than 10 minutes.

  3. In the Update/Execution history table, find the job that ran during thehour of interest. Click through to the Dataflow job details page.On that page, find the longer running stage, and then look in the logs forpossible errors to determine the cause of the delay.

Streaming pipelines

For an initial analysis of the health of your pipeline,on the Pipeline Details page, in the Pipeline info tab,use the data freshness graph. This graph is located in the pipeline status panel.

Example investigation:

  1. You have a streaming pipeline that normally produces an output with adata freshnessof 20 seconds.

  2. You set an objective of having a 30-second data freshness guarantee.When you review the data freshness graph, you notice that between 9 and 10 AM,data freshness jumped to almost 40 seconds.

    Work with Dataflow data pipelines | Google Cloud (5)

  3. Switch to the Pipeline metrics tab, then view the CPU Utilizationand Memory Utilization graphs for further analysis.

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2024-01-31 UTC.

Work with Dataflow data pipelines  |  Google Cloud (2024)

FAQs

How does Cloud Dataflow work? ›

Dataflow uses a data pipeline model, where data moves through a series of stages. Stages can include reading data from a source, transforming and aggregating the data, and writing the results to a destination. Pipelines can range from very simple to more complex processing.

What is a pipeline in Dataflow? ›

Google Cloud Dataflow is a managed service used to execute data processing pipelines. It provides a unified model for defining parallel data processing pipelines that can run batch or streaming data. In Cloud Dataflow, a pipeline is a sequence of steps that reads, transforms, and writes data.

How do you trigger a Dataflow job using cloud function? ›

The How
  1. Step 1: Let's create prerequisite resources. ...
  2. Step 2: Create the Dataflow pipeline. ...
  3. Step 3: Create the Cloud Function. ...
  4. Step 4: Launch it!
Apr 17, 2023

Is GCP Dataflow an ETL tool? ›

As a fully managed, fast, and cost-effective data processing tool used with Apache Beam, Cloud Dataflow allows users to develop and execute a range of data processing patterns, Extract-Transform-Load (ETL), and batch and streaming.

What is the difference between dataflow and Kafka? ›

Google Cloud Dataflow is a fully managed service for stream and batch processing, while Kafka is a distributed streaming platform used for building real-time data pipelines and applications. Let's explore the key differences between them.

What is the difference between cloud dataflow and airflow? ›

Airflow relies on task parallelism, where multiple tasks can be executed simultaneously, while Google Cloud Dataflow leverages data parallelism, which allows processing multiple chunks of data in parallel. This makes Google Cloud Dataflow highly scalable for processing large datasets.

What are the main 3 stages in data pipeline? ›

Data pipelines consist of three essential elements: a source or sources, processing steps, and a destination.

What is the difference between data pipeline and Dataflow? ›

While the terms might sound interchangeable, data flows are a technical term for an evolved data pipeline that is more flexible and responsive for needs, unrestricted by type of data system at the source or destination. For this reason ETL, ELT, and other types of data pipelines aren't relevant to data flows.

What is data pipeline vs cicd pipeline? ›

The goal of a data pipeline is to ensure the efficient and reliable flow of data through a series of orchestrated steps. Release Pipeline: A release pipeline, also known as a deployment pipeline or CI/CD pipeline, is primarily focused on automating the build, testing, and deployment of software applications.

How do I run cloud Dataflow? ›

To run a custom template:
  1. Go to the Dataflow page in the Google Cloud console.
  2. Click CREATE JOB FROM TEMPLATE.
  3. Select Custom Template from the Dataflow template drop-down menu.
  4. Enter a job name in the Job Name field.
  5. Enter the Cloud Storage path to your template file in the template Cloud Storage path field.

How do I run Dataflow in Google Cloud? ›

For this GCP Dataflow tutorial, you must create a new GCP project for BigQuery and your GCP Dataflow pipeline.
  1. Step 1: Signup/Login. Through any supported web browser, navigate to the Google Cloud website, and create a new account if you don't have an existing one. ...
  2. Step 2: Creating the Project. ...
  3. Step 3: Add your APIs.
Mar 26, 2024

How do I start a Dataflow job? ›

  1. On this page.
  2. Before you begin.
  3. Set up your environment.
  4. Get the Apache Beam SDK.
  5. Run the pipeline locally.
  6. Run the pipeline on the Dataflow service.
  7. View your results.
  8. Modify the pipeline code.

What is the best cloud ETL tool? ›

33 Best ETL Tools In 2023
  1. Informatica PowerCenter – Cloud data management solution. ...
  2. Microsoft SQL Server Integration Services – Enterprise ETL platform. ...
  3. Talend Data Fabric – Enterprise data integration with open-source ETL tool. ...
  4. Integrate.io (XPlenty) – ETL tool for e-commerce. ...
  5. Stitch – Modern, managed ETL service.

What is the difference between Google Dataflow and AWS data pipeline? ›

Google Cloud Dataflow is designed to scale automatically based on the data processing needs. AWS Data Pipeline provides high throughput and low latency for batch processing of data. Google Cloud Dataflow provides high-performance data processing capabilities with near-real-time data processing.

What is the best ETL tool in GCP? ›

GCP Dataflow Use Cases for ETL

Dataflow is a powerful tool for ETL projects that require real-time or batch data processing, data enrichment, data migration, and data transformation. By practicing with these project ideas, GCP Data Engineers can gain hands-on experience with Dataflow and enhance their skills.

What is the process of Dataflow? ›

Data flow (flow, dataflow) shows the transfer of information (sometimes also material) from one part of the system to another. The symbol of the flow is the arrow. The flow should have a name that determines what information (or what material) is being moved.

Why use Google Cloud Dataflow? ›

Key Takeaways. GCP Dataflow provides serverless, scalable data processing using Apache Beam and other GCP services like BigQuery and Pub/Sub. Dataflow reduces operational overhead but is limited by Apache Beam's capabilities and locks users into Google's ecosystem.

Is cloud dataflow serverless? ›

Unified stream and batch data processing that's serverless, fast, and cost-effective.

How does data flow analysis work? ›

Data-flow analysis is a technique for gathering information about the possible set of values calculated at various points in a computer program. A program's control-flow graph (CFG) is used to determine those parts of a program to which a particular value assigned to a variable might propagate.

Top Articles
Latest Posts
Article information

Author: Zonia Mosciski DO

Last Updated:

Views: 6141

Rating: 4 / 5 (51 voted)

Reviews: 82% of readers found this page helpful

Author information

Name: Zonia Mosciski DO

Birthday: 1996-05-16

Address: Suite 228 919 Deana Ford, Lake Meridithberg, NE 60017-4257

Phone: +2613987384138

Job: Chief Retail Officer

Hobby: Tai chi, Dowsing, Poi, Letterboxing, Watching movies, Video gaming, Singing

Introduction: My name is Zonia Mosciski DO, I am a enchanting, joyous, lovely, successful, hilarious, tender, outstanding person who loves writing and wants to share my knowledge and understanding with you.