The Ultimate Apache Airflow Cheatsheet: A Comprehensive Guide for Data Engineers

Introduction

Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. It allows you to define complex directed acyclic graphs (DAGs) of tasks and their dependencies, execute them on a schedule, and distribute task execution across worker processes. Airflow provides a rich user interface, robust monitoring capabilities, and integration with various data systems, making it the go-to solution for orchestrating complex data pipelines and ETL processes.

Core Concepts of Apache Airflow

Basic Components

  • DAG (Directed Acyclic Graph): A collection of tasks with directed dependencies
  • Operator: A template for a predefined task (e.g., PythonOperator, BashOperator)
  • Task/Task Instance: A parameterized instance of an operator
  • Workflow: The combination of DAGs, operators, and schedules
  • Scheduler: The component that triggers scheduled workflows
  • Executor: Determines how tasks are executed (e.g., in parallel, on different machines)
  • Worker: The process that executes the task
  • Web Server: Provides the Airflow UI for monitoring and administration
  • Metadata Database: Stores state information and execution history

Airflow Architecture

ComponentFunctionKey Characteristics
SchedulerMonitors and triggers tasksHeart of Airflow, determines what to run when
Web ServerProvides UIFlask-based, shows DAGs, task status, logs
Metadata DBStores workflow stateUsually PostgreSQL, MySQL, or SQLite
ExecutorExecutes tasksDifferent implementations for different environments
DAG DirectoryStores DAG filesPython files that define workflows

Executors

ExecutorBest ForCharacteristics
SequentialExecutorDevelopment, testingSingle process, no parallelism, SQLite compatible
LocalExecutorSmall deploymentsParallelism on single machine, requires PostgreSQL/MySQL
CeleryExecutorProduction, scalabilityDistributed tasks using Celery, horizontal scaling
KubernetesExecutorContainer-based environmentsDynamic pods for each task, good isolation
DaskExecutorData science workflowsLeverage Dask distributed computing

DAG Definition & Configuration

Basic DAG Structure

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# Default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email': ['email@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG definition
dag = DAG(
    'my_example_dag',
    default_args=default_args,
    description='A simple example DAG',
    schedule_interval='0 0 * * *',  # Daily at midnight
    catchup=False,
    tags=['example'],
)

# Task definitions
def my_function():
    return "Hello from Airflow!"

task1 = PythonOperator(
    task_id='my_task',
    python_callable=my_function,
    dag=dag,
)

# Task dependencies
task1

Common DAG Parameters

ParameterDescriptionExample
dag_idUnique identifier for the DAG'etl_workflow'
schedule_intervalWhen to run the DAG'0 0 * * *' or @daily
start_dateFirst execution datedatetime(2023, 1, 1)
end_dateDate after which the DAG stops runningdatetime(2023, 12, 31)
catchupWhether to backfill missed runsFalse (default is True)
default_argsDefault parameters for tasksDictionary of parameters
tagsLabels for filtering in the UI['production', 'data_warehouse']
max_active_runsMaximum concurrent DAG runs1 (default depends on config)
concurrencyMaximum task instances per DAG run16 (default depends on config)
descriptionHuman-readable description'ETL workflow for user data'
paramsRuntime configuration parameters{'table': 'users'}

Schedule Interval Options

ExpressionDescriptionEquivalent Cron
@onceSchedule once and only onceN/A
@hourlyRun once an hour at the beginning of the hour0 * * * *
@dailyRun once a day at midnight0 0 * * *
@weeklyRun once a week at midnight on Sunday0 0 * * 0
@monthlyRun once a month at midnight on the first day0 0 1 * *
@yearlyRun once a year at midnight on January 10 0 1 1 *
NoneOnly run manually triggeredN/A
Cron expressionCustom schedule using cron syntax*/10 * * * * (every 10 mins)
timedeltaTime interval between executionstimedelta(hours=1)

Task Definition & Operators

Common Operators

OperatorPurposeExample
BashOperatorExecute bash commandsBashOperator(task_id='print_date', bash_command='date')
PythonOperatorExecute Python functionsPythonOperator(task_id='process', python_callable=process_func)
EmptyOperatorDummy operator for groupingEmptyOperator(task_id='start')
PostgresOperatorExecute SQL against PostgresPostgresOperator(task_id='query', sql='SELECT * FROM table')
MySqlOperatorExecute SQL against MySQLMySqlOperator(task_id='query', sql='SELECT * FROM table')
SQLiteOperatorExecute SQL against SQLiteSQLiteOperator(task_id='query', sql='SELECT * FROM table')
DockerOperatorRun a command in a Docker containerDockerOperator(task_id='docker_task', image='python:3.9')
SSHOperatorExecute commands on remote hostSSHOperator(task_id='ssh_task', ssh_conn_id='ssh_default', command='ls')
EmailOperatorSend an emailEmailOperator(task_id='email', to='user@example.com', subject='Alert')
SimpleHttpOperatorSend HTTP requestSimpleHttpOperator(task_id='get_data', endpoint='api/data')

Specialized Operators

CategoryNotable Operators
Cloud ProviderS3ToRedshiftOperator, GoogleCloudStorageToBigQueryOperator, AzureDataFactoryRunPipelineOperator
DatabaseSqlSensor, MongoToS3Operator, SnowflakeOperator, PrestoOperator
File TransferFTPFileTransmitOperator, SFTPOperator, LocalFilesystemToS3Operator
Big DataSparkSubmitOperator, HiveOperator, PigOperator, PrestoOperator
MessagingPubSubPublishOperator, SlackAPIPostOperator, TelegramOperator
ContainersKubernetesPodOperator, DockerOperator, ECSOperator

Task Dependencies

SyntaxDescriptionExample
>>Upstream tasktask1 >> task2 (task1 runs before task2)
<<Downstream tasktask2 << task1 (equivalent to above)
List syntaxMultiple dependenciestask1 >> [task2, task3] >> task4
set_upstream()Set task dependencytask2.set_upstream(task1)
set_downstream()Set task dependencytask1.set_downstream(task2)

XComs: Cross-Communication Between Tasks

# Task that pushes a value to XCom
def push_value(**context):
    value = "some_value"
    context['task_instance'].xcom_push(key='my_key', value=value)
    
push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_value,
    provide_context=True,
    dag=dag,
)

# Task that pulls a value from XCom
def pull_value(**context):
    value = context['task_instance'].xcom_pull(task_ids='push_task', key='my_key')
    print(f"Retrieved value: {value}")
    
pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_value,
    provide_context=True,
    dag=dag,
)

push_task >> pull_task

TaskFlow API (Airflow 2.0+)

Basic TaskFlow Example

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example']
)
def taskflow_etl():
    
    @task
    def extract():
        data = {"1001": 301, "1002": 302}
        return data
    
    @task
    def transform(order_data):
        transformed = {k: v * 2 for k, v in order_data.items()}
        return transformed
    
    @task
    def load(transformed_data):
        print(f"Transformed data: {transformed_data}")
    
    order_data = extract()
    transformed_data = transform(order_data)
    load(transformed_data)

taskflow_etl_dag = taskflow_etl()

TaskFlow vs. Traditional Syntax

FeatureTraditionalTaskFlow API
Task DefinitionCreate operator instancesUse Python decorators
DependenciesExplicit using >> or <<Implicit through function calls
XComsManual push/pullAutomatic through return values
ContextPass provide_context=TrueAvailable by default
Multiple OutputsUse multiple XComsReturn dictionary

Connection & Variable Management

Connections

ActionWeb UICLICode
CreateAdmin → Connections → +airflow connections addcreate_connection()
UseN/AN/ABaseHook.get_connection('conn_id')
ListAdmin → Connectionsairflow connections listBaseHook.get_connections()
TestAdmin → Connections → TestN/ACustom code with try/except

Connection Structure

FieldDescriptionExample
Conn IdUnique identifierpostgres_dwh
Conn TypeType of connectionpostgres
HostServer hostname/IPdwh.example.com
SchemaDatabase schemapublic
LoginUsernameairflow
PasswordPassword********
PortConnection port5432
ExtraJSON with additional params{"sslmode": "require"}

Variables

ActionWeb UICLICode
CreateAdmin → Variables → +airflow variables setVariable.set()
GetAdmin → Variablesairflow variables getVariable.get()
ListAdmin → Variablesairflow variables listVariable.get_all()
DeleteAdmin → Variables → ×airflow variables deleteVariable.delete()

Variable Usage

from airflow.models import Variable

# Get a variable (with default)
table_name = Variable.get("table_name", default_var="default_table")

# Get variable as JSON
config = Variable.get("my_config", deserialize_json=True)
print(f"Database: {config['database']}")

# Set a variable
Variable.set("status", "ready")

# Set JSON variable
Variable.set("complex_config", {"env": "prod", "retry": 3}, serialize_json=True)

Error Handling & SLAs

Retry Configuration

ParameterDescriptionDefaultExample
retriesNumber of retries0retries=3
retry_delayTime between retriestimedelta(seconds=300)retry_delay=timedelta(minutes=5)
retry_exponential_backoffExponential delay increaseFalseretry_exponential_backoff=True
max_retry_delayMaximum delayNonemax_retry_delay=timedelta(hours=1)
execution_timeoutMax execution timeNoneexecution_timeout=timedelta(hours=2)

Error Callbacks

def on_failure_callback(context):
    task_instance = context['task_instance']
    task_id = task_instance.task_id
    dag_id = task_instance.dag_id
    execution_date = context['execution_date']
    exception = context.get('exception')
    
    # Send alert, log to monitoring system, etc.
    print(f"Task {task_id} in DAG {dag_id} failed on {execution_date}: {exception}")

task = PythonOperator(
    task_id='my_task',
    python_callable=my_function,
    on_failure_callback=on_failure_callback,
    dag=dag,
)

SLA (Service Level Agreement)

# DAG with SLA
dag = DAG(
    'sla_dag',
    default_args=default_args,
    schedule_interval='@hourly',
    sla_miss_callback=alert_on_sla_miss,
)

# Task with SLA
task = PythonOperator(
    task_id='time_sensitive_task',
    python_callable=process_data,
    sla=timedelta(minutes=30),  # Must complete within 30 minutes
    dag=dag,
)

Sensors & Triggers

Common Sensors

SensorPurposeExample
FileSensorWait for file to appearFileSensor(task_id='wait_for_file', filepath='/data/file.csv')
S3KeySensorWait for S3 keyS3KeySensor(task_id='wait_for_s3', bucket_key='data/files/file.csv')
SqlSensorWait for SQL query to return resultsSqlSensor(task_id='wait_for_data', conn_id='postgres', sql='SELECT...')
HttpSensorWait for HTTP endpointHttpSensor(task_id='check_api', http_conn_id='api', endpoint='status')
ExternalTaskSensorWait for another DAG taskExternalTaskSensor(task_id='wait_for_etl', external_dag_id='etl')
DateTimeSensorWait until specified datetimeDateTimeSensor(task_id='wait_until', target_time=datetime(2023,1,1,12,0))
TimeDeltaSensorWait for time periodTimeDeltaSensor(task_id='wait_1h', delta=timedelta(hours=1))

Sensor Modes

ModeBehaviorUse Case
poke (default)Runs repeatedly in same processShort polling intervals, quick checks
rescheduleReleases worker slot between checksLonger intervals, resource efficiency

Deferrable Operators (Airflow 2.2+)

from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger

class DeferrableSleepSensor(BaseSensorOperator):
    def execute(self, context):
        self.defer(
            trigger=TimeDeltaTrigger(timedelta(seconds=60)),
            method_name="execute_complete"
        )
    
    def execute_complete(self, context, event=None):
        return True

Dynamic DAG Generation

Using Factory Functions

def create_dag(dag_id, schedule, default_args):
    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
    
    with dag:
        t1 = BashOperator(
            task_id='print_date',
            bash_command='date',
        )
        
        t2 = BashOperator(
            task_id='sleep',
            bash_command='sleep 5',
        )
        
        t1 >> t2
    
    return dag

# Create multiple DAGs
for i in range(3):
    dag_id = f'dynamic_dag_{i}'
    globals()[dag_id] = create_dag(
        dag_id=dag_id,
        schedule='@daily',
        default_args={'start_date': datetime(2023, 1, 1)}
    )

Using Variables for Configuration

from airflow.models import Variable

# Get configuration from Airflow Variable
config = Variable.get("etl_config", deserialize_json=True)

# Create tasks dynamically based on config
for table in config['tables']:
    task = PythonOperator(
        task_id=f"process_{table['name']}",
        python_callable=process_table,
        op_kwargs={'table_name': table['name'], 'columns': table['columns']},
        dag=dag,
    )
    
    # Link to appropriate upstream tasks
    if 'dependencies' in table:
        for dependency in table['dependencies']:
            globals()[f"process_{dependency}"] >> task

Testing DAGs & Tasks

Unit Testing DAGs

import unittest
from airflow.models import DagBag

class TestDagIntegrity(unittest.TestCase):
    def setUp(self):
        self.dagbag = DagBag(dag_folder='dags', include_examples=False)
        
    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            f"DAG import errors: {self.dagbag.import_errors}"
        )
        
    def test_dag_structure(self):
        dag_id = "my_dag_id"
        dag = self.dagbag.get_dag(dag_id)
        self.assertIsNotNone(dag)
        self.assertEqual(len(dag.tasks), 3)
        
        # Check specific tasks
        task_ids = [task.task_id for task in dag.tasks]
        self.assertIn("task1", task_ids)
        self.assertIn("task2", task_ids)
        
        # Check dependencies
        task1 = dag.get_task("task1")
        task2 = dag.get_task("task2")
        self.assertIn(task2, task1.downstream_list)

Testing Task Functions

import unittest
from unittest.mock import patch, MagicMock
from my_dag_file import process_data_task

class TestTaskFunctions(unittest.TestCase):
    @patch('my_dag_file.read_data')
    def test_process_data_task(self, mock_read_data):
        # Setup mock
        mock_read_data.return_value = {"key": "value"}
        
        # Run function
        result = process_data_task()
        
        # Assertions
        self.assertEqual(result, {"processed_key": "processed_value"})
        mock_read_data.assert_called_once()

Command Line Interface (CLI)

DAG Management

CommandDescriptionExample
airflow dags listList all DAGsairflow dags list
airflow dags triggerTrigger a DAG runairflow dags trigger my_dag
airflow dags pausePause a DAGairflow dags pause my_dag
airflow dags unpauseUnpause a DAGairflow dags unpause my_dag
airflow dags showShow DAG detailsairflow dags show my_dag
airflow dags backfillRun DAG for a date rangeairflow dags backfill -s 2023-01-01 -e 2023-01-07 my_dag

Task Management

CommandDescriptionExample
airflow tasks listList tasks in DAGairflow tasks list my_dag
airflow tasks testTest a taskairflow tasks test my_dag task1 2023-01-01
airflow tasks runRun a task instanceairflow tasks run my_dag task1 2023-01-01
airflow tasks clearClear task instancesairflow tasks clear my_dag -t task1
airflow tasks stateGet task stateairflow tasks state my_dag task1 2023-01-01
airflow tasks failed-depsGet task dependency failuresairflow tasks failed-deps my_dag task1 2023-01-01

Airflow Management

CommandDescriptionExample
airflow db initInitialize databaseairflow db init
airflow db upgradeUpgrade databaseairflow db upgrade
airflow db resetReset databaseairflow db reset -y
airflow schedulerStart schedulerairflow scheduler
airflow webserverStart webserverairflow webserver -p 8080
airflow celery workerStart Celery workerairflow celery worker
airflow config listList configurationsairflow config list
airflow infoShow Airflow infoairflow info
airflow versionShow versionairflow version

Deployment & Best Practices

Production Deployment Options

DeploymentProsConsBest For
Self-managedComplete controlHigh maintenanceCustom requirements
Docker ComposeEasy setup, isolatedLimited scalabilitySmall to medium deployments
Kubernetes (Helm)Scalable, resilientComplex setupLarge deployments
Managed services (Cloud Composer, MWAA)Low maintenanceVendor lock-inTeams needing reliability

Security Best Practices

  • Store sensitive information in encrypted variables or secrets backends
  • Use least privilege principle for connections
  • Implement role-based access control (RBAC)
  • Audit DAG code changes through version control
  • Regularly update Airflow to the latest version
  • Use secure connection protocols (SSH, HTTPS)
  • Implement network segmentation for Airflow components

Performance Optimization

  • Scale executors appropriately for workload
  • Use catchup=False for DAGs that don’t need backfilling
  • Optimize sensor usage with reschedule mode
  • Consider depends_on_past=False unless strictly needed
  • Set appropriate pool for resource-intensive tasks
  • Use SubDagOperator carefully (or avoid) due to potential bottlenecks
  • Monitor and tune database performance

Code Organization

airflow_project/
├── dags/                    # DAG definition files
│   ├── etl_dag.py
│   └── reporting_dag.py
├── plugins/                 # Custom plugins
│   ├── __init__.py
│   ├── operators/
│   │   ├── __init__.py
│   │   └── custom_operators.py
│   ├── hooks/
│   │   └── custom_hooks.py
│   └── sensors/
│       └── custom_sensors.py
├── include/                 # Supporting files
│   ├── sql/
│   │   └── queries.sql
│   └── scripts/
│       └── processing.py
├── tests/                   # Test files
│   ├── dags/
│   │   └── test_dag_integrity.py
│   └── operators/
│       └── test_custom_operators.py
└── requirements.txt         # Python dependencies

Resources for Further Learning

Official Documentation

Books & Publications

  • “Data Pipelines with Apache Airflow” by Bas Harenslak and Julian de Ruiter
  • “Fundamentals of Data Engineering” (Airflow chapters) by Joe Reis and Matt Housley
  • “Building Data Pipelines with Apache Airflow” (Manning Publications)

Online Courses & Tutorials

  • Airflow on Pluralsight, Udemy, Coursera, and DataCamp
  • Astronomer Academy Airflow Certification Course
  • YouTube tutorials from Astronomer, Cloud providers, and community members

Community Resources

Quick Tips for DAG Development

  • Start with simple DAGs and gradually add complexity
  • Test tasks individually before combining into a DAG
  • Use meaningful task_ids for better debugging
  • Monitor task duration and resource usage
  • Set explicit dependencies rather than relying on execution order
  • Document your DAGs with clear descriptions
  • Use tags to organize DAGs in the UI
  • Parameterize your DAGs when possible
  • Follow a consistent style for DAG and task naming
  • Consider using the TaskFlow API for simpler code
Scroll to Top