Guide to Integrating Google Pub/Sub with Apache Airflow

Guide to Integrating Google Pub/Sub with Apache Airflow

Google PubSub serves as a fully-managed real-time messaging service. It allows applications to send and receive messages between independent systems. Apache Airflow, an open-source platform, enables users to programmatically author, schedule, and monitor workflows. Integrating Google PubSub with Apache Airflow enhances the ability to manage data pipelines efficiently. This integration provides robust event-driven capabilities. Users can automate workflows based on specific events. By the end of this guide, readers will gain the skills to set up and utilize the apache-airflow-providers-google package effectively.

Prerequisites

Knowledge Requirements

Basic understanding of Google Cloud Platform

A basic understanding of Google Cloud Platform (GCP) is essential. Familiarity with GCP services, such as creating projects and managing resources, will facilitate the integration process. Users should know how to navigate the Google Cloud Console and understand fundamental concepts like IAM roles and permissions.

Familiarity with Apache Airflow

Knowledge of Apache Airflow is also necessary. Users should understand how to create Directed Acyclic Graphs (DAGs) and manage tasks within Airflow. Familiarity with Airflow's user interface and its core components, such as operators and sensors, will be beneficial.

Tools and Accounts

Google Cloud account setup

To proceed, users need a Google Cloud account. Follow these steps to set up an account:

  1. Visit the Google Cloud Console.
  2. Sign in with a Google account or create a new one.
  3. Create a new project by selecting the project dropdown and clicking "New Project."
  4. Enable billing for the project to access all GCP services.

Ensure that the project has the necessary permissions to create and manage Pub/Sub topics and subscriptions.

Apache Airflow installed

Users must have Apache Airflow installed on their system. Follow these steps for installation:

  1. Install Airflow using pip:

    pip install apache-airflow
    
  2. Initialize the Airflow database:

    airflow db init
    
  3. Start the Airflow web server:

    airflow webserver --port 8080
    
  4. Start the Airflow scheduler:

    airflow scheduler
    

Setting Up Google Pub/Sub

Creating a Pub/Sub Topic

Steps to create a topic in Google Cloud Console

  1. Open the Google Cloud Console.
  2. Navigate to the Pub/Sub section by selecting the hamburger menu and choosing "Pub/Sub" under the "Big Data" category.
  3. Click on the Create Topic button.
  4. Enter a unique name for the topic in the Topic ID field.
  5. Optionally, add labels to help organize and manage topics.
  6. Click Create to finalize the topic creation.

Configuring topic settings

  1. After creating the topic, navigate to the topic details page.
  2. Click on the Edit button to modify settings.
  3. Configure message retention duration to specify how long messages should be retained.
  4. Set permissions by adding IAM roles to control access to the topic.
  5. Enable message ordering if message sequence is crucial for the application.
  6. Save changes to apply the new settings.

Creating a Pub/Sub Subscription

Steps to create a subscription

  1. In the Google Cloud Console, navigate to the Pub/Sub section.
  2. Select the topic for which you want to create a subscription.
  3. Click on the Create Subscription button.
  4. Enter a unique name for the subscription in the Subscription ID field.
  5. Choose the desired delivery type (e.g., Pull or Push).
  6. For Push subscriptions, provide an endpoint URL to receive messages.
  7. Click Create to finalize the subscription creation.

Configuring subscription settings

  1. Navigate to the subscription details page.
  2. Click on the Edit button to modify settings.
  3. Configure the acknowledgment deadline to specify how long subscribers have to acknowledge received messages.
  4. Set the message retention duration to determine how long unacknowledged messages are retained.
  5. Enable dead-letter policy to handle undeliverable messages.
  6. Adjust retry policy settings to manage message redelivery attempts.
  7. Save changes to apply the new settings.

By following these steps, users can set up Google Pub/Sub topics and subscriptions effectively. This setup forms the foundation for integrating with Apache Airflow, enabling robust event-driven workflows.

Configuring Apache Airflow

Installing Necessary Packages

Installing Google Cloud libraries

To integrate Google Pub/Sub with Apache Airflow, install the necessary Google Cloud libraries. Use the following command to install the apache-airflow-providers-google package:

pip install apache-airflow-providers-google

This package includes operators, hooks, and sensors for interacting with Google Cloud services, including Pub/Sub.

Verifying installation

Verify the installation to ensure that the required packages are correctly installed. Use the following command to list the installed packages:

pip list | grep apache-airflow-providers-google

The output should display the installed version of the apache-airflow-providers-google package. This confirms that the package is ready for use in Apache Airflow.

Setting Up Airflow Connections

Configuring Google Cloud connection in Airflow

Configure a connection in Apache Airflow to enable communication with Google Cloud. Follow these steps:

  1. Open the Airflow web interface by navigating to http://localhost:8080 in a web browser.

  2. Click on the Admin tab and select Connections.

  3. Click on the + button to add a new connection.

  4. Fill in the connection details:

    • Conn Id: Enter a unique identifier for the connection (e.g., google_cloud_default).
    • Conn Type: Select Google Cloud.
    • Project Id: Enter the Google Cloud project ID.
    • Keyfile Path: Provide the path to the service account key file (JSON format).
    • Scopes: Enter the required scopes (e.g., https://www.googleapis.com/auth/pubsub).

Save the connection details to establish a link between Apache Airflow and Google Cloud.

Testing the connection

Test the configured connection to ensure proper setup. Use the following steps:

  1. In the Airflow web interface, navigate to the Admin tab and select Connections.
  2. Locate the newly created connection and click on the Test button.
  3. Airflow will attempt to connect to Google Cloud using the provided credentials.

A successful test indicates that the connection is correctly configured. This connection allows Apache Airflow to interact with Google Pub/Sub, enabling seamless integration for event-driven workflows.

Integrating Google Pub/Sub with Airflow

Creating an Airflow DAG

Writing the DAG code

Creating a Directed Acyclic Graph (DAG) in Apache Airflow allows users to define workflows. Start by importing necessary modules and defining default arguments:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.pubsub import [PubSubPublishOperator](https://www.getorchestra.io/guides/airflow-operator-series-apache-airflow-providers-gcp-pub-sub-example), [PubSubPullOperator](https://www.getorchestra.io/guides/airflow-operator-series-apache-airflow-providers-gcp-pub-sub-example)

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 1,
}

dag = DAG(
    'pubsub_example_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval='@daily',
)

Define tasks within the DAG. Use PubSubPublishOperator to publish messages to a Google Pub/Sub topic:

publish_task = PubSubPublishOperator(
    task_id='publish_message',
    project='your-gcp-project-id',
    topic='your-topic-name',
    messages=[{'data': b'Hello, World!'}],
    dag=dag,
)

Use PubSubPullOperator to pull messages from a subscription:

pull_task = PubSubPullOperator(
    task_id='pull_message',
    project='your-gcp-project-id',
    subscription='your-subscription-name',
    max_messages=5,
    dag=dag,
)

Set task dependencies to ensure proper execution order:

publish_task >> pull_task

Scheduling the DAG

Schedule the DAG to run at specific intervals. Use the schedule_interval parameter in the DAG definition. For example, to run the DAG daily, set schedule_interval='@daily'. This ensures that the workflow executes automatically based on the defined schedule.

Using Google Pub/Sub Operators

Implementing PubSubPublishOperator

The PubSubPublishOperator publishes messages to a Google Pub/Sub topic. Define the operator within the DAG code. Specify parameters such as project, topic, and messages. Here is an example:

publish_task = PubSubPublishOperator(
    task_id='publish_message',
    project='your-gcp-project-id',
    topic='your-topic-name',
    messages=[{'data': b'Hello, World!'}],
    dag=dag,
)

The task_id parameter assigns a unique identifier to the task. The project parameter specifies the Google Cloud project ID. The topic parameter identifies the Pub/Sub topic. The messages parameter contains the message data to publish.

Implementing PubSubPullOperator

The PubSubPullOperator pulls messages from a Google Pub/Sub subscription. Define the operator within the DAG code. Specify parameters such as project, subscription, and max_messages. Here is an example:

pull_task = PubSubPullOperator(
    task_id='pull_message',
    project='your-gcp-project-id',
    subscription='your-subscription-name',
    max_messages=5,
    dag=dag,
)

The task_id parameter assigns a unique identifier to the task. The project parameter specifies the Google Cloud project ID. The subscription parameter identifies the Pub/Sub subscription. The max_messages parameter sets the maximum number of messages to pull.

By following these steps, users can effectively integrate Google Pub/Sub with Apache Airflow. This integration enables robust event-driven workflows, enhancing data pipeline management.

Example Use Case

Real-World Scenario

Description of the use case

Consider a scenario where a company needs to process real-time data from multiple sources. The company uses Google Pub/Sub to ingest events and Apache Airflow to orchestrate the data pipeline. The goal involves processing incoming messages and storing the results in a data warehouse.

Steps to implement the use case

  1. Create a Pub/Sub Topic: Open the Google Cloud Console. Navigate to the Pub/Sub section. Click on "Create Topic." Enter a unique name for the topic. Click "Create."

  2. Create a Pub/Sub Subscription: Select the created topic. Click on "Create Subscription." Enter a unique name for the subscription. Choose "Pull" as the delivery type. Click "Create."

  3. Install Necessary Packages: Open a terminal. Run the command:

    pip install apache-airflow-providers-google
    
  4. Configure Airflow Connection: Open the Airflow web interface. Navigate to the "Admin" tab. Select "Connections." Click on the "+" button. Fill in the connection details:

    • Conn Id: google_cloud_default
    • Conn Type: Google Cloud
    • Project Id: Enter the Google Cloud project ID.
    • Keyfile Path: Provide the path to the service account key file.
    • Scopes: https://www.googleapis.com/auth/pubsub
  5. Write the DAG Code: Create a new Python file for the DAG. Import necessary modules:

    from airflow import DAGfrom airflow.utils.dates import days_agofrom airflow.providers.google.cloud.operators.pubsub import PubSubPublishOperator, PubSubPullOperator
    

    Define default arguments and the DAG:

    default_args = {    'owner': 'airflow',    'start_date': days_ago(1),    'retries': 1,}dag = DAG(    'pubsub_example_dag',    default_args=default_args,    description='A simple tutorial DAG',    schedule_interval='@daily',)
    

    Define tasks within the DAG:

    publish_task = PubSubPublishOperator(    task_id='publish_message',    project='your-gcp-project-id',    topic='your-topic-name',    messages=[{'data': b'Hello, World!'}],    dag=dag,)pull_task = PubSubPullOperator(    task_id='pull_message',    project='your-gcp-project-id',    subscription='your-subscription-name',    max_messages=5,    dag=dag,)publish_task >> pull_task
    
  6. Deploy the DAG: Save the Python file in the Airflow DAGs folder. Airflow will automatically detect the new DAG.

Monitoring and Troubleshooting

Monitoring the workflow

Monitoring the workflow ensures that tasks execute as expected. Open the Airflow web interface. Navigate to the "DAGs" tab. Select the created DAG. View the status of each task. The interface provides visual indicators for task success, failure, and retries.

Common issues and solutions

  1. Connection Errors: Verify the connection details in the Airflow web interface. Ensure the correct project ID and keyfile path.
  2. Permission Issues: Check IAM roles in the Google Cloud Console. Ensure the service account has the necessary permissions for Pub/Sub.
  3. Message Acknowledgment Failures: Adjust the acknowledgment deadline in the Pub/Sub subscription settings. Increase the duration if necessary.
  4. Task Failures: Review the task logs in the Airflow web interface. Identify the error messages. Modify the DAG code to address the issues.

By following these steps, users can implement a real-world use case for integrating Google Pub/Sub with Apache Airflow. This integration facilitates efficient data pipeline management and real-time data processing.

The integration of Google Pub/Sub with Apache Airflow provides a powerful solution for managing event-driven workflows. Users can leverage the capabilities of both technologies to automate and streamline data pipelines. This combination offers enhanced scalability, reliability, and flexibility.

Integrating these tools allows for real-time data processing and efficient resource management. Users can benefit from automated task scheduling and robust monitoring features. This setup ensures seamless communication between independent systems.

Exploring further use cases and advanced configurations will unlock additional potential. Users are encouraged to experiment with different scenarios to fully utilize the capabilities of Google Pub/Sub and Apache Airflow.

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.