Learn more:
-
Professional Data Engineer
CourseUp-to-date course for the new Google Cloud Professional Data Engineer Certification exam.
$10 / month
-
Associate Cloud Engineer
CoursePrepares you to pass the Associate Cloud Engineer certification exam.
$10 / month
-
Lifetime Access
BundlePay once and access all current and future courses I publish, forever. Any future courses I publish will automatically be added to your access.
$200
Cloud Composer in GCP
Cloud Composer: A Managed Apache Airflow Service
Cloud Composer is a managed implementation of Apache Airflow. The key advantage of Cloud Composer being managed is that Google handles much of the infrastructure and maintenance for you. But you do still have to configure certain things, such as the scaling parameters. You could think of Composer as "low-ops" rather than "no-ops.”
Airflow is an open-source, programmatic framework designed to create, schedule, manage, and monitor data workflows. It was actually originally developed by Airbnb before being added to the Apache ecosystem.
Why Airflow?
So, why was Airflow developed? What purpose does it serve, such that it was developed into a managed service in GCP?
Apache Airflow is useful because big data pipelines are often complex and involve multiple steps. You often have to create resources across multiple services. Modern data workflows rarely rely on just one service or system. The steps in a pipeline often have complex dependencies, where certain ones must be completed before others can start. You often have to automatically remove resources that are no longer needed once they complete, which is crucial for keeping your cloud environment efficient and cost-effective. Data teams also need a central view for the entire team to monitor workflows. And then there's also the fact that a lot of data pipelines have complex scheduling, whether it's time-based—like running tasks at specific intervals—or event-driven, where certain conditions trigger workflows.
Airflow helps with all of these needs.
Key Concepts
Directed Acyclic Graph (DAG)
In Cloud Composer, one of the central concepts is the DAG, which stands for Directed Acyclic Graph. A DAG is a collection of tasks that you want to run, and it's organized in a way that reflects the dependencies and relationships between those tasks. Each task in a DAG can have upstream or downstream dependencies. In other words, some tasks must be completed before others can begin, and this structure ensures the proper execution order. DAGs are essentially Python files that allow you to define this.
A DAG allows you to define a sequence of operations in a logical flow, ensuring tasks are executed in the correct order, based on their dependencies. This makes it really useful for orchestrating complex workflows in Cloud Composer.
Airflow Monitoring Dashboard
The Airflow Monitoring Dashboard is available in Cloud Composer. You can view and manage all your DAGs in one central place. On this screen, you can see a list of all your DAGs, along with details like when each DAG last ran, the next scheduled run, and the status of recent tasks. You can also filter your DAGs, pause or unpause them, and access more specific actions for each DAG.
A few of the things you can do in the dashboard are:
-
View and manage your DAGs:You can monitor all your workflows at a glance, checking their current state—whether they're active, paused, or failed.
-
Monitor task and workflow status:The dashboard allows you to see the real-time status of each task in your DAG, which helps you track the progress of your workflows and identify any issues.
- Rerun failed tasks: If a task in a workflow fails, you can rerun that task directly from this dashboard, making it easy to troubleshoot and resolve failures without needing to re-run the entire DAG.
The Airflow Monitoring Dashboard gives you full visibility into your workflows and makes it easy to manage your DAGs and tasks.
Composer API vs. Airflow API
It's important to understand the difference between the Composer API and the Airflow API.
The Composer API is what you use to manage the Composer environment itself. Think of it as handling the infrastructure and environment settings. For example, with the Composer API, you can create a new Composer environment, update the Composer version, and scale workers, meaning you can increase or decrease the number of workers to handle tasks as needed.
The Airflow API focuses on managing the Airflow pipelines themselves. So, while the Composer API handles the infrastructure, the Airflow API manages the workflows. For example, you would use the Airflow API to trigger DAGs, monitor runs (checking the status of specific DAGs or tasks), and manage individual tasks (like rerunning a failed task or checking a task's status).
In short, the Composer API is for managing the overall environment, and the Airflow API is for managing the DAGs and tasks that run inside of the environment.
Cloud Composer Architecture
Cloud Composer is basically these three technologies together: Apache Airflow, combined with Google Kubernetes Engine, combined with Google Cloud Storage.
Let's go through the role each plays.
- As you already know, Airflowprovides the software framework that Composer uses to define, schedule, and monitor workflows, which are represented as Directed Acyclic Graphs or DAGs.
-
Google Kubernetes Engine, or GKE, clusters manage the containers that Composer runs Airflow in. When we say that workers or nodes are added or subtracted, it is this cluster that they are added to or taken away from.
- Cloud Storage stores important data like DAG files, logs, and configuration files. Airflow reads from Cloud Storage, ensuring that your workflows and logs are kept outside of the runtime environment, which helps with reliability and scalability. This provides the persistent and scalable storage for Composer and Airflow, and it does so outside of the environment itself, which makes the entire system more flexible.
In summary, Airflow defines and manages the workflows, GKE handles the resource scaling, and GCS stores all the necessary files and logs.
Hosting Pipeline Files
Let's talk about how exactly you host pipeline files for Composer and Airflow to use.
Composer is automatically configured to look for files in the storage bucket that is automatically created by Cloud Composer upon environment creation. This bucket is dedicated to your Composer environment. In fact, the bucket's name always follows a specific format. Composer is essentially constantly checking for updates to this bucket. So when you upload a file, it's immediately detected by Composer and ready to run it if you want it to.
Every Composer environment gets its own dedicated bucket, automatically created when the environment is set up. Pipeline task files and DAGs are hosted in this bucket and automatically read by Composer and Airflow. You don't need to manually tell Composer to look for the file—Composer is constantly checking the bucket for updates or new files.
Triggering DAGs
There are three main ways to trigger a DAG in Airflow:
-
Scheduling it.Airflow uses cron notation. This allows you to specify any interval for the DAG to run—daily, weekly at specific times, monthly, every hour at the 5-minute mark, any custom time interval based on your needs.
-
Triggering it programmatically.For instance, you can use Cloud Functions to trigger a DAG when a specific event occurs, or trigger upon new data being ingested somewhere. In general, you can trigger DAGs through the Airflow REST API, which means you can basically do it anywhere.
-
Triggering it manually, on demand. This is done through the Airflow UI hosted in the Composer environment. There's a "Trigger DAG" button next to each DAG, letting you start a DAG immediately without waiting for its schedule or a programmatic trigger.
Cloud Storage Events Triggering DAGs
Uploading an object to Cloud Storage can trigger a series of actions, starting with a Cloud Function and ultimately kicking off a Cloud Composer DAG.
Here's how it works:
- An object is uploaded to a storage bucket. This is the initial event.
- When a file is uploaded or changed in Cloud Storage, it generates a Cloud Storage Notification, which triggers a Cloud Function.
- The Cloud Function then kicks off its script. The function makes a call to the Airflow API to trigger a DAG within Cloud Composer. This DAG is responsible for processing the new data that was uploaded to the bucket.
Essentially, Cloud Storage notifications are the starting point for this workflow, and the process is automated from there. This kind of setup can be useful for automating data pipelines or any time you need to process new data dynamically as it comes into your storage bucket.
Orchestrating Google Cloud Services
Let's talk about how Cloud Composer can help orchestrate complex workflows by integrating different Google Cloud services. Keep in mind that these are just some examples to give you an idea of how Composer works as a high-level orchestration tool.
-
Dataflow:Using Cloud Composer, you can ensure that your Dataflow job only starts after a previous BigQuery task has completed successfully. This helps maintain the integrity of the workflow and ensures that Dataflow only processes the correct data.
-
BigQuery:You could transfer data to BigQuery from Cloud Storage at regular intervals. Composer could automate this process, so you don't have to manually initiate these transfers.
-
Retries:With Cloud Composer, you can automate retries for tasks across services. This automation is crucial for maintaining workflow reliability without constant oversight.
- Vertex AI: After data ingestion completes, you can use Composer to trigger model training in Vertex AI. This ensures your ML pipelines proceed smoothly without manual intervention.
The most important takeaway is that Cloud Composer is uniquely capable of managing dependencies between different services and steps in your process. It's not just orchestrating tasks; it's ensuring each part of your pipeline works together in the right way.
Multi-Cloud Orchestration
With Cloud Composer, you're not limited to orchestrating workflows just within Google Cloud. Composer is built on Apache Airflow, which is a cloud-agnostic orchestration tool, meaning it can work across multiple cloud platforms. Airflow has pre-built operators for many major cloud providers, including Google Cloud, AWS, and Microsoft Azure. This allows you to manage workflows that span different cloud environments.
The key here is that Composer helps you manage dependencies between tasks running in multiple clouds, ensuring that each step happens in the right order. This flexibility is especially useful for companies using multiple cloud providers or migrating workloads between clouds.
Task Management in Airflow
Handling Task Successes and Failures
Let's dive into how you can handle task successes and failures in Airflow. One of the most important ways to do so is through Airflow callbacks. There are two that you need to know:
-
on_failure_callback
: This is triggered when a task fails, and it allows you to run a custom function to run when that happens, which could be something like sending alerts or retrying workflows. -
on_success_callback
: This triggers a custom function when a task is completed successfully. You can use this to log success events or even trigger downstream tasks.
These callbacks can be used in an overall strategy with Cloud Monitoring. You would use on_failure_callback
to monitor and alert task failures across workflows, while on_success_callback
helps log successes and track overall performance, which you can view in Cloud Monitoring dashboards. This gives you greater control over how your tasks are managed, whether they succeed or fail.
BigQuery Operators
BigQuery operators in Airflow allow you to manage BigQuery jobs as part of your workflows in Cloud Composer, automating interactions with BigQuery. Although Cloud Composer wraps Airflow, providing a managed environment, it's Airflow that directly connects and controls BigQuery through these operators.
Airflow operators make it possible to execute jobs or queries, manage datasets, manage tables, and even validate BigQuery data.
Probably the most important BigQuery operator you need to know about is the BigQueryInsertJobOperator
. This operator enables you to execute various BigQuery jobs, such as querying, loading data, or exporting data, all from your Airflow pipeline.
Email On Failure
email_on_failure
is an important feature in Airflow and Cloud Composer for monitoring task failures in your pipelines. It's a DAG parameter, so different from Airflow operators. It is defined in your DAG file. If it is set to True
, an email will be sent automatically to the recipients defined in the DAG's email argument. This ensures you or your team can be alerted the moment a failure happens, helping you take immediate action. It's another way to get real-time notifications whenever a task fails.
Retries
Retries are an important feature in Airflow. They can be added to any task to help ensure your workflows run smoothly even when something goes wrong. For any task, you can configure how many times Airflow should attempt a retry after a failure. This is especially helpful for tasks that might fail due to temporary issues, such as network delays or transient errors.
In addition to setting the number of retries, you can also configure the retry delay, which is the amount of time Airflow waits between retry attempts. This could be a fixed amount of time or follow an exponential backoff strategy, where the delay grows with each attempt.
So, when you're designing workflows, adding retries gives you flexibility in handling task failures and making sure that minor hiccups don't cause the entire pipeline to fail. It helps to keep things moving forward.
Handling Notifications When Using Third-Party Services
Whenever you integrate an external service, such as an API or another cloud platform, into your Airflow pipeline, you might want to be notified each time it's invoked. One of the most effective ways to track these invocations is by logging an event in Cloud Logging. And then after logging these events, you can configure a Cloud Monitoring metric to track the frequency of those logs.
To do this, use the Google Cloud Logging client to send logs from the task each time it is invoked. Initialize the logging client and create a logger. Inside the task function, log the specific invocation with details like the task's ID, which comes from the task_instance
field in Airflow's execution context. This context can be automatically passed, allowing you to access metadata like the task ID and log it effectively.
While it's not shown here, this log_third_party_task
function could be called within an Airflow operator that executes the third-party service. This would ensure that logs are sent to Cloud Logging each time the task is executed.
This approach allows you to monitor the usage of the third-party service, track performance, and get notified in real time about its invocation by leveraging Cloud Logging and Cloud Monitoring together.
Key IAM Roles
Let's go over the key IAM roles that you should know for Cloud Composer:
-
Composer Admin:This role provides full control over the creation, updating, and management of Composer environments.
-
Composer Developer:This allows users to deploy, modify, and manage DAGs within Composer environments, making it essential for those involved in pipeline development.
-
Composer Viewer:This grants read-only access to view Composer environments and configurations. This is ideal for users who need visibility but not the ability to make changes.
-
Composer User:This lets users run and schedule DAGs without modifying the Composer environment itself. It's perfect for operational users who manage workflows but don't need to adjust the infrastructure.
-
Composer Environment and Storage Accessor:This role is for those who need access to the Cloud Storage buckets where Composer environments store DAGs, logs, and other files.
- Composer Worker: This gives permissions to run Composer environment VMs. It's for service accounts, not user accounts.
All these roles are granted at the project level to control the Composer environments effectively.
Learn more:
-
Professional Data Engineer
CourseUp-to-date course for the new Google Cloud Professional Data Engineer Certification exam.
$10 / month
-
Associate Cloud Engineer
CoursePrepares you to pass the Associate Cloud Engineer certification exam.
$10 / month
-
Lifetime Access
BundlePay once and access all current and future courses I publish, forever. Any future courses I publish will automatically be added to your access.
$200