Quickstart: Create a Dataflow pipeline using Python  |  Google Cloud (2024)

In this quickstart, you learn how to use the Apache Beam SDK for Python to build a programthat defines a pipeline. Then, you run the pipeline by using a direct local runner or a cloud-based runner such as Dataflow.

For step-by-step guidance for this task directly in the Google Cloud console, click Guide me:

Guide me

The following sections take you through the same steps as clicking Guide me.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install and initialize the Google Cloud CLI.
  3. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  4. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  5. Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  6. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  7. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  8. Install and initialize the Google Cloud CLI.
  9. Create or select a Google Cloud project.

    • Create a Cloud project:

      gcloud projects create PROJECT_ID
    • Select the Cloud project that you created:

      gcloud config set project PROJECT_ID
  10. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  11. Enable the Dataflow, Compute Engine, Cloud Logging,Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore,and Cloud Resource Manager APIs:

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  12. Create authentication credentials for your Google Account:

    gcloud auth application-default login
  13. Grant roles to your Google Account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace EMAIL_ADDRESS with your email address.
    • Replace ROLE with each individual role.
  14. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Set the storage location to the following: US (United States).
    • Replace BUCKET_NAME with a unique bucket name. Don't include sensitive information in the bucket name because the bucket namespace is global and publicly visible.
    gsutil mb -c STANDARD -l US gs://BUCKET_NAME
  15. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles: roles/dataflow.admin, roles/dataflow.worker, and roles/storage.objectAdmin.

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace PROJECT_NUMBER with your project number. To find your project number, see Identify projects or use the gcloud projects describe command.
    • Replace SERVICE_ACCOUNT_ROLE with each individual role.
  16. Copy the Google Cloud project ID and the Cloud Storage bucket name. You need these values later in this document.

Set up your environment

In this section, use the command prompt to set up an isolated Python virtual environment to run your pipeline projectby using venv.This process lets you isolate the dependencies of one project from the dependencies of other projects.

If you don't have a command prompt readily available, you can use Cloud Shell.Cloud Shell already has the package manager for Python 3 installed, so you can skip to creating a virtual environment.

To install Python and then create a virtual environment, follow these steps:

  1. Check that you have Python 3 and pip running in your system:
    python --versionpython -m pip --version
  2. If required, install Python 3 and then set up a Python virtual environment: follow the instructionsprovided in the Installing Python and Setting up venv sections of theSetting up a Python development environment page.

After you complete the quickstart, you can deactivate the virtual environment by running deactivate.

Get the Apache Beam SDK

The Apache Beam SDK is an open source programming model for data pipelines. You define apipeline with an Apache Beam program and then choose a runner, such as Dataflow, to run your pipeline.

To download and install the Apache Beam SDK, follow these steps:

  1. Verify that you are in the Python virtual environment that you created in the preceding section.Ensure that the prompt starts with <env_name>, where env_nameis the name of the virtual environment.
  2. Install the Python wheel packaging standard:
    pip install wheel
  3. Install the latest version of the Apache Beam SDK for Python:
  4. pip install 'apache-beam[gcp]'

    Depending on the connection, your installation might take a while.

Run the pipeline locally

To see how a pipeline runs locally, use a ready-made Python module for the wordcountexample that is included with the apache_beam package.

The wordcount pipeline example does the following:

  1. Takes a text file as input.

    This text file is located in a Cloud Storage bucket with theresource name gs://dataflow-samples/shakespeare/kinglear.txt.

  2. Parses each line into words.
  3. Performs a frequency count on the tokenized words.

To stage the wordcount pipeline locally, follow these steps:

  1. From your local terminal, run the wordcount example:
    python -m apache_beam.examples.wordcount \ --output outputs
  2. View the output of the pipeline:
    more outputs*
  3. To exit, press q.
Running the pipeline locally lets you test and debug your Apache Beam program.You can view the wordcount.py source codeon Apache Beam GitHub.

Run the pipeline on the Dataflow service

In this section, run the wordcount example pipeline from theapache_beam package on the Dataflow service. Thisexample specifies DataflowRunner as the parameter for--runner.
  • Run the pipeline:
    python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/

    Replace the following:

    • DATAFLOW_REGION: the regional endpoint where you want to deploy the Dataflow job—for example, europe-west1

      The --region flag overrides the default region that is set in the metadata server, your local client, or environment variables.

    • STORAGE_BUCKET: the Cloud Storage name that you copied earlier
    • PROJECT_ID: the Google Cloud project ID that you copied earlier

View your results

When you run a pipeline using Dataflow, your results are stored in a Cloud Storage bucket.In this section, verify that the pipeline is running by using either the Google Cloud console or the local terminal.

Google Cloud console

To view your results in Google Cloud console, follow these steps:

  1. In the Google Cloud console, go to the Dataflow Jobs page.

    Go to Jobs

    The Jobs page displays details of your wordcount job, including a status ofRunning at first, and then Succeeded.

  2. Go to the Cloud Storage Browser page.

    Go to Browser

  3. From the list of buckets in your project, click the storage bucket that you created earlier.

    In the wordcount directory, the output files that your job created are displayed.

Local terminal

To view the results from your terminal, use the gsutil tool. You can also run the commands from Cloud Shell.

  1. List the output files:
    gsutil ls -lh "gs://STORAGE_BUCKET/results/outputs*" 
  2. Replace STORAGE_BUCKET with the name of the Cloud Storage bucket usedin the pipeline program.

  3. View the results in the output files:
    gsutil cat "gs://STORAGE_BUCKET/results/outputs*"

Modify the pipeline code

The wordcount pipeline in the previous examples distinguishes between uppercase and lowercase words. The following steps show how to modify the pipeline so that the wordcount pipeline is not case-sensitive.
  1. On your local machine, download the latest copy of thewordcount code from the Apache Beam GitHub repository.
  2. From the local terminal, run the pipeline:
    python wordcount.py --output outputs
  3. View the results:
    more outputs*
  4. To exit, press q.
  5. In an editor of your choice, open the wordcount.py file.
  6. Inside the run function, examine the pipeline steps:
    counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))

    After split, the lines are split into words as strings.

  7. To lowercase the strings, modify the line after split:
    counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    This modification maps the str.lower function onto every word. This line is equivalent to beam.Map(lambda word: str.lower(word)).
  8. Save the file and run the modified wordcount job:
    python wordcount.py --output outputs
  9. View the results of the modified pipeline:
    more outputs*
  10. To exit, press q.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Cloud project with the resources.

  1. In the Google Cloud console, go to the Cloud Storage Browser page.

    Go to Browser

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click delete Delete, and then follow the instructions.
  4. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  5. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke
  6. If you keep your project, revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles: roles/dataflow.admin, roles/dataflow.worker, and roles/storage.objectAdmin.

    gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=ROLE

What's next

  • Read about the Apache Beam programming model.
  • Interactively develop a pipeline using an Apache Beam notebook.
  • Learn how to design and create your own pipeline.
  • Work through the WordCount and Mobile Gaming examples.

Apache Beam™ is a trademark of The Apache Software Foundation or its affiliates in the United States and/or other countries.

Quickstart: Create a Dataflow pipeline using Python  |  Google Cloud (2024)

FAQs

How do you create a Dataflow pipeline in GCP? ›

Import a job. You can import a Dataflow batch or streaming job that is based on a classic or flex template and make it a data pipeline. Go to the Dataflow Jobs page in the Google Cloud console, select a completed job, then on the Job Details page, select +Import as a pipeline.

How do I use cloud composer to trigger a Dataflow job? ›

Launching Dataflow pipelines with Cloud Composer
  1. Create an empty BigQuery Table with a schema definition.
  2. Create a Storage Bucket.
  3. Create a JSON-formatted BigQuery schema for your output table.
  4. Create a JavaScript(.js) file to format your data.
  5. Node.js.
  6. Create your input file.

How do you run a Dataflow pipeline? ›

  1. Custom unbounded sources. If your pipeline uses a custom unbounded source, the source must inform the Dataflow service about backlog. ...
  2. Enable streaming autoscaling. To enable Horizontal Autoscaling, set the following execution parameters when you start your pipeline: ...
  3. Enable streaming autoscaling.

How does Google Cloud Dataflow work? ›

Cloud Dataflow is a serverless data processing service that runs jobs written using the Apache Beam libraries. When you run a job on Cloud Dataflow, it spins up a cluster of virtual machines, distributes the tasks in your job to the VMs, and dynamically scales the cluster based on how the job is performing.

What is Dataflow in Python? ›

Dataflow is a managed service for executing a wide variety of data processing patterns. These pipelines are created using the Apache Beam programming model which allows for both batch and streaming processing.

What is difference between pipeline and Dataflow? ›

Data moves from one component to the next via a series of pipes. Data flows through each pipe from left to right. A "pipeline" is a series of pipes that connect components together so they form a protocol.

What is data pipeline in AWS? ›

AWS Data Pipeline is a web service that helps you reliably process and move data between different AWS compute and storage services, as well as on-premises data sources, at specified intervals.

What code can Dataflow be written in? ›

Feedback: Cloud Dataflow code can be written in Go, Java, and Python.

What is the difference between cloud composer and Dataflow? ›

Cloud Composer is a cross platform orchestration tool that supports AWS, Azure and GCP (and more) with management, scheduling and processing abilities. Cloud Dataflow handles tasks. Cloud Composer manages entire processes coordinating tasks that may involve BigQuery, Dataflow, Dataproc, Storage, on-premises, etc.

What is difference between Dataproc and Dataflow? ›

Dataproc should be used if the processing has any dependencies to tools in the Hadoop ecosystem. Dataflow/Beam provides a clear separation between processing logic and the underlying execution engine.

What is the difference between Airflow and Dataflow? ›

Airflow is a platform to programmatically author, schedule, and monitor workflows. Cloud Dataflow is a fully-managed service on Google Cloud that can be used for data processing. You can write your Dataflow code and then use Airflow to schedule and monitor Dataflow job.

How do you use pipeline in Python? ›

pipeline module called Pipeline. It takes 2 important parameters, stated as follows: The Stepslist: List of (name, transform) tuples (implementing fit/transform) that are chained, in the order in which they are chained, with the last object an estimator.

How do I run a Dataflow locally? ›

GCP Prerequisites
  1. Create a New project.
  2. You need to create a Billing Account.
  3. Link Billing Account With this project.
  4. Enable All the APIs that we need to run the dataflow on GCP.
  5. Download the Google SDK.
  6. Create GCP Storage Buckets for source and sinks.
Sep 21, 2020

How do I create a Dataflow job? ›

Custom templates
  1. Go to the Dataflow page in the Google Cloud console.
  2. Click CREATE JOB FROM TEMPLATE.
  3. Select Custom Template from the Dataflow template drop-down menu.
  4. Enter a job name in the Job Name field.
  5. Enter the Cloud Storage path to your template file in the template Cloud Storage path field.

Is dataflow an ETL tool? ›

As a fully managed, fast, and cost-effective data processing tool used with Apache Beam, Cloud Dataflow allows users to develop and execute a range of data processing patterns, Extract-Transform-Load (ETL), and batch and streaming.

What is the programming framework used with cloud dataflow? ›

What is the programming framework used with Cloud Dataflow? Cloud Dataflow supports fast, simplified pipeline development by using expressive Java and Python APIs in the Apache Beam SDK.

Why is dataflow used? ›

Dataflow templates allow you to easily share your pipelines with team members and across your organization or take advantage of many Google-provided templates to implement simple but useful data processing tasks. This includes Change Data Capture templates for streaming analytics use cases.

What is a Dataflow template? ›

Dataflow templates allow you to package a Dataflow pipeline for deployment. Anyone with the correct permissions can then use the template to deploy the packaged pipeline. You can create your own custom Dataflow templates, and Google provides pre-built templates for common scenarios.

Is Dataflow same as Apache beam? ›

The Apache Beam model provides useful abstractions that insulate you from low-level details of distributed processing, such as coordinating individual workers, sharding datasets, and other such tasks. Dataflow fully manages these low-level details.

What is Dataflow SQL? ›

The Dataflow SQL query syntax is similar to BigQuery standard SQL. You can use the Dataflow SQL streaming extensions to aggregate data from continuously updating Dataflow sources like Pub/Sub. For example, the following query counts the passengers in a Pub/Sub stream of taxi rides every minute: SELECT.

How do you make an ETL pipeline in Python? ›

An ETL pipeline consists of three general components:
  1. Extract — get data from a source such as an API. ...
  2. Transform — structure, format, or clean the data, depending on what we need it for and how it needs to be delivered. ...
  3. Load— write the data to an external destination where it can be used by another application.

How do you create a simple data pipeline? ›

Here's how I did it:
  1. Step 1: Get an API key. ...
  2. Step 2: Keep your API key private. ...
  3. Step 3: Create a new empty Database. ...
  4. Step 4: Retrieve the data and save as a json file. ...
  5. Step 5: From json file to dictionary to Pandas Dataframe. ...
  6. Step 6: ETL with Pandas. ...
  7. Step 7: Update the Database. ...
  8. Done!
Oct 12, 2020

How do you run a pipeline in Python? ›

High level steps:
  1. Create a package for your python package.
  2. Build the project.
  3. Upload to PyPI.
  4. Create a Dockerfile and install the python package.
  5. Run the script generated from StreamSets deployment with your custom image.
  6. Create a pipeline with Jython evaluator.
Mar 23, 2022

What is a Python data pipeline? ›

Processing Data Streams With Python. A streaming data pipeline transmits data from source to destination instantaneously (in real-time), making it relevant to the data processing steps. Streaming data pipelines are used to feed data into data warehouses or disseminate to a data stream.

Is Python good for ETL? ›

Python has been dominating the ETL space for a few years now. There are easily more than a hundred Python ETL Tools that act as Frameworks, Libraries, or Software for ETL. ETL is an essential part of your Data Stack processes. It lets you activate the data transfer between systems.

How do you create a data pipeline in AWS? ›

Open the AWS Data Pipeline console at https://console.aws.amazon.com/datapipeline/ .
  1. Choose Get started now (if this is your first pipeline) or Create new pipeline.
  2. In Name, enter a name for the pipeline (for example, CopyMyS3Data ).
  3. In Description, enter a description.
  4. Choose a Source for your pipeline defintion.

What are the three steps to create a data pipeline? ›

Data pipelines consist of three essential elements: a source or sources, processing steps, and a destination.
...
Elements
  1. Sources. Sources are where data comes from. ...
  2. Processing steps. ...
  3. Destination.

Who creates a data pipeline? ›

That's why data analysts and data engineers turn to data pipelining. This article gives you everything you need to know about data pipelining, including what it means, how it's put together, data pipeline tools, why we need them, and how to design one.

How do I run a dataflow locally? ›

GCP Prerequisites
  1. Create a New project.
  2. You need to create a Billing Account.
  3. Link Billing Account With this project.
  4. Enable All the APIs that we need to run the dataflow on GCP.
  5. Download the Google SDK.
  6. Create GCP Storage Buckets for source and sinks.
Sep 21, 2020

How do you make a pipeline in Sklearn? ›

  1. Step:1 Import libraries. from sklearn.svm import SVC. # StandardScaler subtracts the mean from each features and then scale to unit variance. ...
  2. Step:2 Data Preparation. X, y = make_classification(random_state=0) X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
  3. Step:3. Create Pipeline.
Jul 20, 2022

What is data pipeline examples? ›

Data Pipeline Examples

The complexity and design of data pipelines varies according to their intended purpose. For example, Macy's streams change data from on-premise databases to Google Cloud to provide a unified experience for their customers — whether they're shopping online or in-store.

What is a 5 stage pipeline? ›

Basic five-stage pipeline in a RISC machine (IF = Instruction Fetch, ID = Instruction Decode, EX = Execute, MEM = Memory access, WB = Register write back). The vertical axis is successive instructions; the horizontal axis is time.

What is data pipeline in AWS? ›

AWS Data Pipeline is a web service that helps you reliably process and move data between different AWS compute and storage services, as well as on-premises data sources, at specified intervals.

Top Articles
Latest Posts
Article information

Author: Twana Towne Ret

Last Updated:

Views: 6683

Rating: 4.3 / 5 (64 voted)

Reviews: 87% of readers found this page helpful

Author information

Name: Twana Towne Ret

Birthday: 1994-03-19

Address: Apt. 990 97439 Corwin Motorway, Port Eliseoburgh, NM 99144-2618

Phone: +5958753152963

Job: National Specialist

Hobby: Kayaking, Photography, Skydiving, Embroidery, Leather crafting, Orienteering, Cooking

Introduction: My name is Twana Towne Ret, I am a famous, talented, joyous, perfect, powerful, inquisitive, lovely person who loves writing and wants to share my knowledge and understanding with you.