Introduction
Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. It allows you to define complex directed acyclic graphs (DAGs) of tasks and their dependencies, execute them on a schedule, and distribute task execution across worker processes. Airflow provides a rich user interface, robust monitoring capabilities, and integration with various data systems, making it the go-to solution for orchestrating complex data pipelines and ETL processes.
Core Concepts of Apache Airflow
Basic Components
- DAG (Directed Acyclic Graph): A collection of tasks with directed dependencies
- Operator: A template for a predefined task (e.g., PythonOperator, BashOperator)
- Task/Task Instance: A parameterized instance of an operator
- Workflow: The combination of DAGs, operators, and schedules
- Scheduler: The component that triggers scheduled workflows
- Executor: Determines how tasks are executed (e.g., in parallel, on different machines)
- Worker: The process that executes the task
- Web Server: Provides the Airflow UI for monitoring and administration
- Metadata Database: Stores state information and execution history
Airflow Architecture
Component | Function | Key Characteristics |
---|
Scheduler | Monitors and triggers tasks | Heart of Airflow, determines what to run when |
Web Server | Provides UI | Flask-based, shows DAGs, task status, logs |
Metadata DB | Stores workflow state | Usually PostgreSQL, MySQL, or SQLite |
Executor | Executes tasks | Different implementations for different environments |
DAG Directory | Stores DAG files | Python files that define workflows |
Executors
Executor | Best For | Characteristics |
---|
SequentialExecutor | Development, testing | Single process, no parallelism, SQLite compatible |
LocalExecutor | Small deployments | Parallelism on single machine, requires PostgreSQL/MySQL |
CeleryExecutor | Production, scalability | Distributed tasks using Celery, horizontal scaling |
KubernetesExecutor | Container-based environments | Dynamic pods for each task, good isolation |
DaskExecutor | Data science workflows | Leverage Dask distributed computing |
DAG Definition & Configuration
Basic DAG Structure
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# Default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email': ['email@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG definition
dag = DAG(
'my_example_dag',
default_args=default_args,
description='A simple example DAG',
schedule_interval='0 0 * * *', # Daily at midnight
catchup=False,
tags=['example'],
)
# Task definitions
def my_function():
return "Hello from Airflow!"
task1 = PythonOperator(
task_id='my_task',
python_callable=my_function,
dag=dag,
)
# Task dependencies
task1
Common DAG Parameters
Parameter | Description | Example |
---|
dag_id | Unique identifier for the DAG | 'etl_workflow' |
schedule_interval | When to run the DAG | '0 0 * * *' or @daily |
start_date | First execution date | datetime(2023, 1, 1) |
end_date | Date after which the DAG stops running | datetime(2023, 12, 31) |
catchup | Whether to backfill missed runs | False (default is True ) |
default_args | Default parameters for tasks | Dictionary of parameters |
tags | Labels for filtering in the UI | ['production', 'data_warehouse'] |
max_active_runs | Maximum concurrent DAG runs | 1 (default depends on config) |
concurrency | Maximum task instances per DAG run | 16 (default depends on config) |
description | Human-readable description | 'ETL workflow for user data' |
params | Runtime configuration parameters | {'table': 'users'} |
Schedule Interval Options
Expression | Description | Equivalent Cron |
---|
@once | Schedule once and only once | N/A |
@hourly | Run once an hour at the beginning of the hour | 0 * * * * |
@daily | Run once a day at midnight | 0 0 * * * |
@weekly | Run once a week at midnight on Sunday | 0 0 * * 0 |
@monthly | Run once a month at midnight on the first day | 0 0 1 * * |
@yearly | Run once a year at midnight on January 1 | 0 0 1 1 * |
None | Only run manually triggered | N/A |
Cron expression | Custom schedule using cron syntax | */10 * * * * (every 10 mins) |
timedelta | Time interval between executions | timedelta(hours=1) |
Task Definition & Operators
Common Operators
Operator | Purpose | Example |
---|
BashOperator | Execute bash commands | BashOperator(task_id='print_date', bash_command='date') |
PythonOperator | Execute Python functions | PythonOperator(task_id='process', python_callable=process_func) |
EmptyOperator | Dummy operator for grouping | EmptyOperator(task_id='start') |
PostgresOperator | Execute SQL against Postgres | PostgresOperator(task_id='query', sql='SELECT * FROM table') |
MySqlOperator | Execute SQL against MySQL | MySqlOperator(task_id='query', sql='SELECT * FROM table') |
SQLiteOperator | Execute SQL against SQLite | SQLiteOperator(task_id='query', sql='SELECT * FROM table') |
DockerOperator | Run a command in a Docker container | DockerOperator(task_id='docker_task', image='python:3.9') |
SSHOperator | Execute commands on remote host | SSHOperator(task_id='ssh_task', ssh_conn_id='ssh_default', command='ls') |
EmailOperator | Send an email | EmailOperator(task_id='email', to='user@example.com', subject='Alert') |
SimpleHttpOperator | Send HTTP request | SimpleHttpOperator(task_id='get_data', endpoint='api/data') |
Specialized Operators
Category | Notable Operators |
---|
Cloud Provider | S3ToRedshiftOperator , GoogleCloudStorageToBigQueryOperator , AzureDataFactoryRunPipelineOperator |
Database | SqlSensor , MongoToS3Operator , SnowflakeOperator , PrestoOperator |
File Transfer | FTPFileTransmitOperator , SFTPOperator , LocalFilesystemToS3Operator |
Big Data | SparkSubmitOperator , HiveOperator , PigOperator , PrestoOperator |
Messaging | PubSubPublishOperator , SlackAPIPostOperator , TelegramOperator |
Containers | KubernetesPodOperator , DockerOperator , ECSOperator |
Task Dependencies
Syntax | Description | Example |
---|
>> | Upstream task | task1 >> task2 (task1 runs before task2) |
<< | Downstream task | task2 << task1 (equivalent to above) |
List syntax | Multiple dependencies | task1 >> [task2, task3] >> task4 |
set_upstream() | Set task dependency | task2.set_upstream(task1) |
set_downstream() | Set task dependency | task1.set_downstream(task2) |
XComs: Cross-Communication Between Tasks
# Task that pushes a value to XCom
def push_value(**context):
value = "some_value"
context['task_instance'].xcom_push(key='my_key', value=value)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_value,
provide_context=True,
dag=dag,
)
# Task that pulls a value from XCom
def pull_value(**context):
value = context['task_instance'].xcom_pull(task_ids='push_task', key='my_key')
print(f"Retrieved value: {value}")
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_value,
provide_context=True,
dag=dag,
)
push_task >> pull_task
TaskFlow API (Airflow 2.0+)
Basic TaskFlow Example
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example']
)
def taskflow_etl():
@task
def extract():
data = {"1001": 301, "1002": 302}
return data
@task
def transform(order_data):
transformed = {k: v * 2 for k, v in order_data.items()}
return transformed
@task
def load(transformed_data):
print(f"Transformed data: {transformed_data}")
order_data = extract()
transformed_data = transform(order_data)
load(transformed_data)
taskflow_etl_dag = taskflow_etl()
TaskFlow vs. Traditional Syntax
Feature | Traditional | TaskFlow API |
---|
Task Definition | Create operator instances | Use Python decorators |
Dependencies | Explicit using >> or << | Implicit through function calls |
XComs | Manual push/pull | Automatic through return values |
Context | Pass provide_context=True | Available by default |
Multiple Outputs | Use multiple XComs | Return dictionary |
Connection & Variable Management
Connections
Action | Web UI | CLI | Code |
---|
Create | Admin → Connections → + | airflow connections add | create_connection() |
Use | N/A | N/A | BaseHook.get_connection('conn_id') |
List | Admin → Connections | airflow connections list | BaseHook.get_connections() |
Test | Admin → Connections → Test | N/A | Custom code with try/except |
Connection Structure
Field | Description | Example |
---|
Conn Id | Unique identifier | postgres_dwh |
Conn Type | Type of connection | postgres |
Host | Server hostname/IP | dwh.example.com |
Schema | Database schema | public |
Login | Username | airflow |
Password | Password | ******** |
Port | Connection port | 5432 |
Extra | JSON with additional params | {"sslmode": "require"} |
Variables
Action | Web UI | CLI | Code |
---|
Create | Admin → Variables → + | airflow variables set | Variable.set() |
Get | Admin → Variables | airflow variables get | Variable.get() |
List | Admin → Variables | airflow variables list | Variable.get_all() |
Delete | Admin → Variables → × | airflow variables delete | Variable.delete() |
Variable Usage
from airflow.models import Variable
# Get a variable (with default)
table_name = Variable.get("table_name", default_var="default_table")
# Get variable as JSON
config = Variable.get("my_config", deserialize_json=True)
print(f"Database: {config['database']}")
# Set a variable
Variable.set("status", "ready")
# Set JSON variable
Variable.set("complex_config", {"env": "prod", "retry": 3}, serialize_json=True)
Error Handling & SLAs
Retry Configuration
Parameter | Description | Default | Example |
---|
retries | Number of retries | 0 | retries=3 |
retry_delay | Time between retries | timedelta(seconds=300) | retry_delay=timedelta(minutes=5) |
retry_exponential_backoff | Exponential delay increase | False | retry_exponential_backoff=True |
max_retry_delay | Maximum delay | None | max_retry_delay=timedelta(hours=1) |
execution_timeout | Max execution time | None | execution_timeout=timedelta(hours=2) |
Error Callbacks
def on_failure_callback(context):
task_instance = context['task_instance']
task_id = task_instance.task_id
dag_id = task_instance.dag_id
execution_date = context['execution_date']
exception = context.get('exception')
# Send alert, log to monitoring system, etc.
print(f"Task {task_id} in DAG {dag_id} failed on {execution_date}: {exception}")
task = PythonOperator(
task_id='my_task',
python_callable=my_function,
on_failure_callback=on_failure_callback,
dag=dag,
)
SLA (Service Level Agreement)
# DAG with SLA
dag = DAG(
'sla_dag',
default_args=default_args,
schedule_interval='@hourly',
sla_miss_callback=alert_on_sla_miss,
)
# Task with SLA
task = PythonOperator(
task_id='time_sensitive_task',
python_callable=process_data,
sla=timedelta(minutes=30), # Must complete within 30 minutes
dag=dag,
)
Sensors & Triggers
Common Sensors
Sensor | Purpose | Example |
---|
FileSensor | Wait for file to appear | FileSensor(task_id='wait_for_file', filepath='/data/file.csv') |
S3KeySensor | Wait for S3 key | S3KeySensor(task_id='wait_for_s3', bucket_key='data/files/file.csv') |
SqlSensor | Wait for SQL query to return results | SqlSensor(task_id='wait_for_data', conn_id='postgres', sql='SELECT...') |
HttpSensor | Wait for HTTP endpoint | HttpSensor(task_id='check_api', http_conn_id='api', endpoint='status') |
ExternalTaskSensor | Wait for another DAG task | ExternalTaskSensor(task_id='wait_for_etl', external_dag_id='etl') |
DateTimeSensor | Wait until specified datetime | DateTimeSensor(task_id='wait_until', target_time=datetime(2023,1,1,12,0)) |
TimeDeltaSensor | Wait for time period | TimeDeltaSensor(task_id='wait_1h', delta=timedelta(hours=1)) |
Sensor Modes
Mode | Behavior | Use Case |
---|
poke (default) | Runs repeatedly in same process | Short polling intervals, quick checks |
reschedule | Releases worker slot between checks | Longer intervals, resource efficiency |
Deferrable Operators (Airflow 2.2+)
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
class DeferrableSleepSensor(BaseSensorOperator):
def execute(self, context):
self.defer(
trigger=TimeDeltaTrigger(timedelta(seconds=60)),
method_name="execute_complete"
)
def execute_complete(self, context, event=None):
return True
Dynamic DAG Generation
Using Factory Functions
def create_dag(dag_id, schedule, default_args):
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
)
t1 >> t2
return dag
# Create multiple DAGs
for i in range(3):
dag_id = f'dynamic_dag_{i}'
globals()[dag_id] = create_dag(
dag_id=dag_id,
schedule='@daily',
default_args={'start_date': datetime(2023, 1, 1)}
)
Using Variables for Configuration
from airflow.models import Variable
# Get configuration from Airflow Variable
config = Variable.get("etl_config", deserialize_json=True)
# Create tasks dynamically based on config
for table in config['tables']:
task = PythonOperator(
task_id=f"process_{table['name']}",
python_callable=process_table,
op_kwargs={'table_name': table['name'], 'columns': table['columns']},
dag=dag,
)
# Link to appropriate upstream tasks
if 'dependencies' in table:
for dependency in table['dependencies']:
globals()[f"process_{dependency}"] >> task
Testing DAGs & Tasks
Unit Testing DAGs
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
def setUp(self):
self.dagbag = DagBag(dag_folder='dags', include_examples=False)
def test_import_dags(self):
self.assertFalse(
len(self.dagbag.import_errors),
f"DAG import errors: {self.dagbag.import_errors}"
)
def test_dag_structure(self):
dag_id = "my_dag_id"
dag = self.dagbag.get_dag(dag_id)
self.assertIsNotNone(dag)
self.assertEqual(len(dag.tasks), 3)
# Check specific tasks
task_ids = [task.task_id for task in dag.tasks]
self.assertIn("task1", task_ids)
self.assertIn("task2", task_ids)
# Check dependencies
task1 = dag.get_task("task1")
task2 = dag.get_task("task2")
self.assertIn(task2, task1.downstream_list)
Testing Task Functions
import unittest
from unittest.mock import patch, MagicMock
from my_dag_file import process_data_task
class TestTaskFunctions(unittest.TestCase):
@patch('my_dag_file.read_data')
def test_process_data_task(self, mock_read_data):
# Setup mock
mock_read_data.return_value = {"key": "value"}
# Run function
result = process_data_task()
# Assertions
self.assertEqual(result, {"processed_key": "processed_value"})
mock_read_data.assert_called_once()
Command Line Interface (CLI)
DAG Management
Command | Description | Example |
---|
airflow dags list | List all DAGs | airflow dags list |
airflow dags trigger | Trigger a DAG run | airflow dags trigger my_dag |
airflow dags pause | Pause a DAG | airflow dags pause my_dag |
airflow dags unpause | Unpause a DAG | airflow dags unpause my_dag |
airflow dags show | Show DAG details | airflow dags show my_dag |
airflow dags backfill | Run DAG for a date range | airflow dags backfill -s 2023-01-01 -e 2023-01-07 my_dag |
Task Management
Command | Description | Example |
---|
airflow tasks list | List tasks in DAG | airflow tasks list my_dag |
airflow tasks test | Test a task | airflow tasks test my_dag task1 2023-01-01 |
airflow tasks run | Run a task instance | airflow tasks run my_dag task1 2023-01-01 |
airflow tasks clear | Clear task instances | airflow tasks clear my_dag -t task1 |
airflow tasks state | Get task state | airflow tasks state my_dag task1 2023-01-01 |
airflow tasks failed-deps | Get task dependency failures | airflow tasks failed-deps my_dag task1 2023-01-01 |
Airflow Management
Command | Description | Example |
---|
airflow db init | Initialize database | airflow db init |
airflow db upgrade | Upgrade database | airflow db upgrade |
airflow db reset | Reset database | airflow db reset -y |
airflow scheduler | Start scheduler | airflow scheduler |
airflow webserver | Start webserver | airflow webserver -p 8080 |
airflow celery worker | Start Celery worker | airflow celery worker |
airflow config list | List configurations | airflow config list |
airflow info | Show Airflow info | airflow info |
airflow version | Show version | airflow version |
Deployment & Best Practices
Production Deployment Options
Deployment | Pros | Cons | Best For |
---|
Self-managed | Complete control | High maintenance | Custom requirements |
Docker Compose | Easy setup, isolated | Limited scalability | Small to medium deployments |
Kubernetes (Helm) | Scalable, resilient | Complex setup | Large deployments |
Managed services (Cloud Composer, MWAA) | Low maintenance | Vendor lock-in | Teams needing reliability |
Security Best Practices
- Store sensitive information in encrypted variables or secrets backends
- Use least privilege principle for connections
- Implement role-based access control (RBAC)
- Audit DAG code changes through version control
- Regularly update Airflow to the latest version
- Use secure connection protocols (SSH, HTTPS)
- Implement network segmentation for Airflow components
Performance Optimization
- Scale executors appropriately for workload
- Use
catchup=False
for DAGs that don’t need backfilling - Optimize sensor usage with
reschedule
mode - Consider
depends_on_past=False
unless strictly needed - Set appropriate
pool
for resource-intensive tasks - Use
SubDagOperator
carefully (or avoid) due to potential bottlenecks - Monitor and tune database performance
Code Organization
airflow_project/
├── dags/ # DAG definition files
│ ├── etl_dag.py
│ └── reporting_dag.py
├── plugins/ # Custom plugins
│ ├── __init__.py
│ ├── operators/
│ │ ├── __init__.py
│ │ └── custom_operators.py
│ ├── hooks/
│ │ └── custom_hooks.py
│ └── sensors/
│ └── custom_sensors.py
├── include/ # Supporting files
│ ├── sql/
│ │ └── queries.sql
│ └── scripts/
│ └── processing.py
├── tests/ # Test files
│ ├── dags/
│ │ └── test_dag_integrity.py
│ └── operators/
│ └── test_custom_operators.py
└── requirements.txt # Python dependencies
Resources for Further Learning
Official Documentation
Books & Publications
- “Data Pipelines with Apache Airflow” by Bas Harenslak and Julian de Ruiter
- “Fundamentals of Data Engineering” (Airflow chapters) by Joe Reis and Matt Housley
- “Building Data Pipelines with Apache Airflow” (Manning Publications)
Online Courses & Tutorials
- Airflow on Pluralsight, Udemy, Coursera, and DataCamp
- Astronomer Academy Airflow Certification Course
- YouTube tutorials from Astronomer, Cloud providers, and community members
Community Resources
Quick Tips for DAG Development
- Start with simple DAGs and gradually add complexity
- Test tasks individually before combining into a DAG
- Use meaningful task_ids for better debugging
- Monitor task duration and resource usage
- Set explicit dependencies rather than relying on execution order
- Document your DAGs with clear descriptions
- Use tags to organize DAGs in the UI
- Parameterize your DAGs when possible
- Follow a consistent style for DAG and task naming
- Consider using the TaskFlow API for simpler code