Complete Apache Beam Cheat Sheet: Concepts, Transforms & Best Practices

Introduction to Apache Beam

Apache Beam is an open-source, unified programming model for defining and executing data processing pipelines. It provides a portable API layer that enables batch and streaming data processing with multiple execution engines (called “runners”).

Why Apache Beam Matters:

  • Write once, run anywhere (portability across execution engines)
  • Unified batch and streaming processing model
  • Advanced windowing and triggering capabilities
  • Rich ecosystem of connectors and transforms
  • Language-agnostic (Java, Python, Go, SQL)
  • Engine-independent (Flink, Spark, Dataflow, Samza, etc.)
  • Production-ready with built-in fault tolerance

Core Concepts & Building Blocks

Fundamental Components

ComponentDescriptionRole in Pipeline
PipelineComplete data processing jobContains entire graph of operations
PCollectionImmutable, distributed datasetInput/output for transformations
PTransformOperation that transforms dataProcesses PCollections
ParDoParallel processing transformationProcesses elements independently
DoFnFunction applied to each elementContains element-wise processing logic
WindowSubdivision of dataset by timeGroups elements for processing
TriggerCondition for window outputControls when results materialize
RunnerExecution engineTranslates and executes the pipeline

PCollection Properties

  • Immutability: Cannot be modified after creation
  • Element Type: Homogeneous elements with a specific type
  • Distributed: Elements partitioned across compute resources
  • Bounded/Unbounded: Fixed size (batch) or unlimited (streaming)
  • Timestamps: Each element has an associated event time
  • Windowing: Elements can be grouped into time-based windows

Window Types

Window TypeDescriptionUse Cases
FixedNon-overlapping windows of fixed sizeRegular interval processing
SlidingOverlapping windows of fixed sizeMoving averages, trend detection
SessionDynamic windows based on activityUser session analysis
GlobalSingle window for all dataBatch processing, complete data views
CustomUser-defined windowing functionSpecial domain requirements

Pipeline Construction & Execution

Basic Pipeline Structure (Java)

// Create pipeline
Pipeline pipeline = Pipeline.create(options);

// Apply transforms
PCollection<String> input = pipeline.apply(TextIO.read().from("input.txt"));
PCollection<String> processed = input.apply(ParDo.of(new MyProcessingFn()));
processed.apply(TextIO.write().to("output.txt"));

// Execute pipeline
pipeline.run();

Basic Pipeline Structure (Python)

# Create pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
    
    # Apply transforms
    input = (pipeline 
             | 'Read' >> beam.io.ReadFromText('input.txt'))
    processed = (input 
                 | 'Process' >> beam.ParDo(MyProcessingFn()))
    processed | 'Write' >> beam.io.WriteToText('output.txt')

Pipeline Options

// Java
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setProject("my-project-id");
dataflowOptions.setRegion("us-central1");
dataflowOptions.setJobName("my-job");
# Python
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.region = 'us-central1'
google_cloud_options.job_name = 'my-job'

Available Runners

RunnerEngineBest For
DirectRunnerLocal JVMDevelopment, testing, debugging
DataflowRunnerGoogle Cloud DataflowProduction Google Cloud workloads
FlinkRunnerApache FlinkProduction on-prem, open-source
SparkRunnerApache SparkIntegration with Spark ecosystem
SamzaRunnerApache SamzaSamza-based deployments
JetRunnerHazelcast JetLow-latency processing

Core Transforms

Element-Wise Transforms

TransformDescriptionExample (Java)Example (Python)
ParDoParallel processing of each elementinput.apply(ParDo.of(new MyFn()))`input
Map1:1 transform of each elementinput.apply(MapElements.into(...).via(...))`input
FlatMap1:N transform of each elementinput.apply(FlatMapElements.into(...).via(...))`input
FilterKeep elements that pass a predicateinput.apply(Filter.by(...))`input

Aggregation Transforms

TransformDescriptionExample (Java)Example (Python)
GroupByKeyGroup elements by keykvPairs.apply(GroupByKey.<K, V>create())`kv_pairs
CombineCombine elements using associative functioninput.apply(Combine.globally(Sum.ofIntegers()))`input
CountCount elementsinput.apply(Count.globally())`input
MeanCalculate averageinput.apply(Mean.globally())`input

Multi-Collection Transforms

TransformDescriptionExample (Java)Example (Python)
CoGroupByKeyJoin multiple collections on keyKeyedPCollectionTuple.of(...).apply(CoGroupByKey.create())`({‘A’: pcoll1, ‘B’: pcoll2}
FlattenMerge multiple collectionsPCollectionList.of(pcoll1).and(pcoll2).apply(Flatten.pCollections())`(pcoll1, pcoll2)
PartitionSplit collection into multiple partsinput.apply(Partition.of(3, new PartitionFn()))`input

Windowing & Triggering

Windowing (Java)

PCollection<KV<String, Integer>> input = ...;

// Fixed window (every 1 minute)
PCollection<KV<String, Integer>> windowed = input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))));

// Sliding window (5 minute windows, sliding every 1 minute)
PCollection<KV<String, Integer>> sliding = input
    .apply(Window.<KV<String, Integer>>into(
        SlidingWindows.of(Duration.standardMinutes(5))
                     .every(Duration.standardMinutes(1))));

// Session window (with 10 minute gap-duration)
PCollection<KV<String, Integer>> sessions = input
    .apply(Window.<KV<String, Integer>>into(
        Sessions.withGapDuration(Duration.standardMinutes(10))));

Windowing (Python)

input = ...

# Fixed window (every 1 minute)
windowed = input | beam.WindowInto(beam.window.FixedWindows(60))

# Sliding window (5 minute windows, sliding every 1 minute)
sliding = input | beam.WindowInto(beam.window.SlidingWindows(300, 60))

# Session window (with 10 minute gap-duration)
sessions = input | beam.WindowInto(beam.window.Sessions(600))

Triggers & Accumulation Mode

// Java: Custom trigger with early, late firings and accumulation mode
PCollection<KV<String, Integer>> triggered = input
    .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1)))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardHours(1))
        .accumulatingFiredPanes());
# Python: Custom trigger with early, late firings and accumulation mode
triggered = input | beam.WindowInto(
    beam.window.FixedWindows(60),
    trigger=Trigger.AfterWatermark(
        early=Trigger.AfterProcessingTime(30),
        late=Trigger.AfterCount(1)
    ),
    accumulation_mode=AccumulationMode.ACCUMULATING,
    allowed_lateness=3600
)

State & Timers

Stateful Processing (Java)

public class StatefulFn extends DoFn<KV<String, Integer>, KV<String, Integer>> {
  @StateId("sum")
  private final StateSpec<ValueState<Integer>> sumState = StateSpecs.value();
  
  @ProcessElement
  public void processElement(
      ProcessContext c, 
      @StateId("sum") ValueState<Integer> sumState) {
    Integer current = sumState.read();
    if (current == null) {
      current = 0;
    }
    Integer newSum = current + c.element().getValue();
    sumState.write(newSum);
    c.output(KV.of(c.element().getKey(), newSum));
  }
}

Stateful Processing (Python)

class StatefulFn(beam.DoFn):
  SUM_STATE = CombiningStateSpec('sum', sum, 0)
  
  def process(self, element, sum_state=beam.DoFn.StateParam(SUM_STATE)):
    key, value = element
    sum_state.add(value)
    yield key, sum_state.read()

Timers (Java)

public class TimerDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>> {
  @StateId("buffer")
  private final StateSpec<ValueState<Integer>> bufferState = StateSpecs.value();
  
  @TimerId("flushTimer")
  private final TimerSpec flushTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
  
  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("buffer") ValueState<Integer> bufferState,
      @TimerId("flushTimer") Timer flushTimer) {
    
    Integer currentValue = bufferState.read();
    Integer newValue = c.element().getValue();
    if (currentValue == null) {
      currentValue = 0;
    }
    bufferState.write(currentValue + newValue);
    
    // Set timer to flush state after 1 minute
    flushTimer.offset(Duration.standardMinutes(1)).setRelative();
  }
  
  @OnTimer("flushTimer")
  public void onFlushTimer(
      OnTimerContext context,
      @StateId("buffer") ValueState<Integer> bufferState) {
    
    Integer value = bufferState.read();
    if (value != null) {
      context.output(KV.of(context.key(), value));
      bufferState.clear();
    }
  }
}

Schema & SQL Support

Schema Definition (Java)

// Using a POJO
public class User {
  public String name;
  public int age;
  
  public User() {}  // Required for serialization
  
  public User(String name, int age) {
    this.name = name;
    this.age = age;
  }
}

// Register and use schema
PCollection<User> users = ...;
users.setSchema(
    Schema.builder()
      .addStringField("name")
      .addInt32Field("age")
      .build(),
    TypeDescriptors.javaClasses());

// Use schema transforms
PCollection<Row> adults = users.apply(
    Filter.by(row -> row.getInt32("age") >= 18));

SQL Transforms

// Java
PCollection<Row> users = ...;
PCollection<Row> result = users.apply(
    SqlTransform.query(
        "SELECT name, age FROM PCOLLECTION WHERE age >= 18 ORDER BY age DESC"));
# Python
users = ...
result = users | beam.SqlTransform(
    "SELECT name, age FROM PCOLLECTION WHERE age >= 18 ORDER BY age DESC")

I/O Transforms

Data Source/SinkRead Transform (Java)Write Transform (Java)
Text FileTextIO.read().from("path")TextIO.write().to("path")
AvroAvroIO.read(User.class).from("path")AvroIO.write(User.class).to("path")
ParquetParquetIO.read(schema).from("path")ParquetIO.write(schema).to("path")
BigQueryBigQueryIO.read().from("table")BigQueryIO.write().to("table")
KafkaKafkaIO.read().withBootstrapServers("host")KafkaIO.write().withBootstrapServers("host")
JDBCJdbcIO.read().withDataSourceConfiguration(...)JdbcIO.write().withDataSourceConfiguration(...)
MongoDBMongoDbIO.read().withUri("uri")MongoDbIO.write().withUri("uri")
RedisRedisIO.read().withEndpoint("host", port)RedisIO.write().withEndpoint("host", port)
PubSubPubsubIO.readMessages().fromTopic("topic")PubsubIO.writeMessages().to("topic")
CassandraCassandraIO.read().withHosts(...).withKeyspace(...)CassandraIO.write().withHosts(...).withKeyspace(...)

Common Patterns & Solutions

Joining Data (Java)

PCollection<KV<String, User>> users = ...;
PCollection<KV<String, Order>> orders = ...;

// Join collections
PCollection<KV<String, CoGbkResult>> joined = KeyedPCollectionTuple
    .of(userTag, users)
    .and(orderTag, orders)
    .apply(CoGroupByKey.create());

// Process joined data
PCollection<String> results = joined.apply(ParDo.of(
    new DoFn<KV<String, CoGbkResult>, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        String userId = c.element().getKey();
        CoGbkResult result = c.element().getValue();
        Iterable<User> userList = result.getAll(userTag);
        Iterable<Order> orderList = result.getAll(orderTag);
        
        // Process joined data...
        User user = Iterables.getOnlyElement(userList, null);
        if (user != null) {
          for (Order order : orderList) {
            c.output(userId + "," + user.name + "," + order.amount);
          }
        }
      }
    }));

Joining Data (Python)

users = ...  # PCollection of (user_id, user_data)
orders = ... # PCollection of (user_id, order_data)

# Join collections
joined = {'users': users, 'orders': orders} | beam.CoGroupByKey()

# Process joined data
def process_join(element):
    user_id, join_data = element
    user_list = list(join_data['users'])
    order_list = list(join_data['orders'])
    
    if user_list:
        user = user_list[0]
        for order in order_list:
            yield user_id + "," + user['name'] + "," + str(order['amount'])

results = joined | beam.FlatMap(process_join)

Side Inputs

// Java
PCollection<KV<String, Integer>> mainInput = ...;
PCollection<String> sideInput = ...;

// Use as view
PCollectionView<List<String>> sideInputView = 
    sideInput.apply(View.asList());

mainInput.apply(ParDo.of(new DoFn<KV<String, Integer>, String>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String, Integer> element = c.element();
    List<String> sideInputList = c.sideInput(sideInputView);
    
    // Use side input data with main input
    if (sideInputList.contains(element.getKey())) {
      c.output(element.getKey() + ":" + element.getValue());
    }
  }
}).withSideInputs(sideInputView));
# Python
main_input = ...
side_input = ...

# Create view
side_input_view = beam.pvalue.AsList(side_input)

def process_with_side_input(element, side_input_list):
    key, value = element
    if key in side_input_list:
        yield key + ":" + str(value)

result = main_input | beam.ParDo(
    process_with_side_input, side_input_list=side_input_view)

Streaming WordCount (Complete Example)

// Java streaming word count
public class StreamingWordCount {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    options.setStreaming(true);
    
    Pipeline pipeline = Pipeline.create(options);
    
    PCollection<String> lines = pipeline
        .apply("ReadLines", TextIO.read().from("gs://bucket/input"))
        .apply("Window", Window.into(FixedWindows.of(Duration.standardMinutes(1))));
        
    PCollection<KV<String, Long>> counts = lines
        .apply("ExtractWords", FlatMapElements
            .into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("\\s+"))))
        .apply("CountWords", Count.perElement());
    
    counts.apply("FormatResults", MapElements
        .into(TypeDescriptors.strings())
        .via((KV<String, Long> wordCount) -> 
            wordCount.getKey() + ": " + wordCount.getValue()))
        .apply("WriteResults", TextIO.write().to("gs://bucket/output"));
    
    pipeline.run();
  }
}
# Python streaming word count
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam import window

def run():
    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True
    
    with beam.Pipeline(options=options) as pipeline:
        counts = (
            pipeline
            | "ReadLines" >> beam.io.ReadFromText("gs://bucket/input")
            | "Window" >> beam.WindowInto(window.FixedWindows(60))
            | "ExtractWords" >> beam.FlatMap(lambda line: line.split())
            | "CountWords" >> beam.combiners.Count.PerElement()
            | "FormatResults" >> beam.Map(lambda word_count: 
                f"{word_count[0]}: {word_count[1]}")
            | "WriteResults" >> beam.io.WriteToText("gs://bucket/output")
        )
        
if __name__ == "__main__":
    run()

Performance Optimization

Best Practices

AreaBest PracticeExplanation
Data SizeMinimize data movementFilter early, project only necessary fields
FusionBe aware of fusion opportunitiesSequential steps can be fused by runner
ParallelismControl parallelism explicitly when neededUse withMaxParallelism, withHintParallelism
CoderRegister custom codersImproves serialization performance
CombineUse Combine rather than GroupByKey + processingMore efficient for associative operations
Hot KeysPrevent hot keysAdd artificial key components, use salting
WindowingChoose appropriate window sizeToo small: overhead; Too large: latency or memory issues
Side InputsLimit size of side inputsLarge side inputs broadcast to all workers
StateUse built-in state APIMore efficient than manual state management

Optimizing GroupByKey

// Hot key detection and prevention
PCollection<KV<String, Integer>> input = ...;

// Before grouping, add salt to keys to distribute load
PCollection<KV<String, Integer>> salted = input.apply(ParDo.of(
    new DoFn<KV<String, Integer>, KV<String, Integer>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        KV<String, Integer> elem = c.element();
        String key = elem.getKey();
        // Add random salt (0-9) to key
        String saltedKey = key + "-" + (int) (Math.random() * 10);
        c.output(KV.of(saltedKey, elem.getValue()));
      }
    }));

// Group by salted key
PCollection<KV<String, Iterable<Integer>>> grouped = 
    salted.apply(GroupByKey.create());

// After grouping, remove salt from keys
PCollection<KV<String, Iterable<Integer>>> unsalted = 
    grouped.apply(ParDo.of(
        new DoFn<KV<String, Iterable<Integer>>, KV<String, Iterable<Integer>>>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            KV<String, Iterable<Integer>> elem = c.element();
            String saltedKey = elem.getKey();
            // Remove salt from key
            String originalKey = saltedKey.substring(0, saltedKey.lastIndexOf("-"));
            c.output(KV.of(originalKey, elem.getValue()));
          }
        }));

Debugging & Monitoring

Logging & Metrics

// Java: Custom metrics
public class MetricsDoFn extends DoFn<String, String> {
  private final Counter processedElements = 
      Metrics.counter(MetricsDoFn.class, "processedElements");
  private final Distribution elementLength = 
      Metrics.distribution(MetricsDoFn.class, "elementLength");
  private final Gauge currentTime = 
      Metrics.gauge(MetricsDoFn.class, "currentTimeMillis");
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    String element = c.element();
    
    // Update metrics
    processedElements.inc();
    elementLength.update(element.length());
    currentTime.set(System.currentTimeMillis());
    
    // Log elements for debugging
    LOG.info("Processing element: " + element);
    
    c.output(element.toUpperCase());
  }
}
# Python: Custom metrics
class MetricsDoFn(beam.DoFn):
  def __init__(self):
    self.processed_elements = Metrics.counter(self.__class__, 'processed_elements')
    self.element_length = Metrics.distribution(self.__class__, 'element_length')
    self.current_time = Metrics.gauge(self.__class__, 'current_time_millis')
    
  def process(self, element):
    # Update metrics
    self.processed_elements.inc(1)
    self.element_length.update(len(element))
    self.current_time.set(int(time.time() * 1000))
    
    # Log elements for debugging
    logging.info(f"Processing element: {element}")
    
    yield element.upper()

Testing Pipelines

// Java: Testing with PAssert
@Test
public void testWordCount() {
  Pipeline p = TestPipeline.create();
  
  PCollection<String> input = p.apply(Create.of("hello world", "hello beam"));
  PCollection<KV<String, Long>> counts = input
      .apply(FlatMapElements.into(TypeDescriptors.strings())
          .via((String line) -> Arrays.asList(line.split("\\s+"))))
      .apply(Count.perElement());
  
  PAssert.that(counts).containsInAnyOrder(
      KV.of("hello", 2L),
      KV.of("world", 1L),
      KV.of("beam", 1L));
  
  p.run();
}
# Python: Testing with assert_that
def test_word_count():
  with TestPipeline() as p:
    input_data = p | beam.Create(['hello world', 'hello beam'])
    counts = (
        input_data
        | beam.FlatMap(lambda line: line.split())
        | beam.combiners.Count.PerElement()
    )
    
    assert_that(counts, equal_to([
        ('hello', 2),
        ('world', 1),
        ('beam', 1)
    ]))

Resources for Further Learning

Official Documentation

Tutorials & Examples

Books

  • “Streaming Systems” by Tyler Akidau, Slava Chernyak, and Reuven Lax
  • “Streaming Architecture” by Ted Dunning and Ellen Friedman
  • “Big Data: Principles and best practices of scalable realtime data systems” by Nathan Marz

Communities

Conferences & Presentations

Scroll to Top