Introduction to Apache Beam
Apache Beam is an open-source, unified programming model for defining and executing data processing pipelines. It provides a portable API layer that enables batch and streaming data processing with multiple execution engines (called “runners”).
Why Apache Beam Matters:
- Write once, run anywhere (portability across execution engines)
- Unified batch and streaming processing model
- Advanced windowing and triggering capabilities
- Rich ecosystem of connectors and transforms
- Language-agnostic (Java, Python, Go, SQL)
- Engine-independent (Flink, Spark, Dataflow, Samza, etc.)
- Production-ready with built-in fault tolerance
Core Concepts & Building Blocks
Fundamental Components
Component | Description | Role in Pipeline |
---|
Pipeline | Complete data processing job | Contains entire graph of operations |
PCollection | Immutable, distributed dataset | Input/output for transformations |
PTransform | Operation that transforms data | Processes PCollections |
ParDo | Parallel processing transformation | Processes elements independently |
DoFn | Function applied to each element | Contains element-wise processing logic |
Window | Subdivision of dataset by time | Groups elements for processing |
Trigger | Condition for window output | Controls when results materialize |
Runner | Execution engine | Translates and executes the pipeline |
PCollection Properties
- Immutability: Cannot be modified after creation
- Element Type: Homogeneous elements with a specific type
- Distributed: Elements partitioned across compute resources
- Bounded/Unbounded: Fixed size (batch) or unlimited (streaming)
- Timestamps: Each element has an associated event time
- Windowing: Elements can be grouped into time-based windows
Window Types
Window Type | Description | Use Cases |
---|
Fixed | Non-overlapping windows of fixed size | Regular interval processing |
Sliding | Overlapping windows of fixed size | Moving averages, trend detection |
Session | Dynamic windows based on activity | User session analysis |
Global | Single window for all data | Batch processing, complete data views |
Custom | User-defined windowing function | Special domain requirements |
Pipeline Construction & Execution
Basic Pipeline Structure (Java)
// Create pipeline
Pipeline pipeline = Pipeline.create(options);
// Apply transforms
PCollection<String> input = pipeline.apply(TextIO.read().from("input.txt"));
PCollection<String> processed = input.apply(ParDo.of(new MyProcessingFn()));
processed.apply(TextIO.write().to("output.txt"));
// Execute pipeline
pipeline.run();
Basic Pipeline Structure (Python)
# Create pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
# Apply transforms
input = (pipeline
| 'Read' >> beam.io.ReadFromText('input.txt'))
processed = (input
| 'Process' >> beam.ParDo(MyProcessingFn()))
processed | 'Write' >> beam.io.WriteToText('output.txt')
Pipeline Options
// Java
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setProject("my-project-id");
dataflowOptions.setRegion("us-central1");
dataflowOptions.setJobName("my-job");
# Python
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.region = 'us-central1'
google_cloud_options.job_name = 'my-job'
Available Runners
Runner | Engine | Best For |
---|
DirectRunner | Local JVM | Development, testing, debugging |
DataflowRunner | Google Cloud Dataflow | Production Google Cloud workloads |
FlinkRunner | Apache Flink | Production on-prem, open-source |
SparkRunner | Apache Spark | Integration with Spark ecosystem |
SamzaRunner | Apache Samza | Samza-based deployments |
JetRunner | Hazelcast Jet | Low-latency processing |
Core Transforms
Element-Wise Transforms
Transform | Description | Example (Java) | Example (Python) |
---|
ParDo | Parallel processing of each element | input.apply(ParDo.of(new MyFn())) | `input |
Map | 1:1 transform of each element | input.apply(MapElements.into(...).via(...)) | `input |
FlatMap | 1:N transform of each element | input.apply(FlatMapElements.into(...).via(...)) | `input |
Filter | Keep elements that pass a predicate | input.apply(Filter.by(...)) | `input |
Aggregation Transforms
Transform | Description | Example (Java) | Example (Python) |
---|
GroupByKey | Group elements by key | kvPairs.apply(GroupByKey.<K, V>create()) | `kv_pairs |
Combine | Combine elements using associative function | input.apply(Combine.globally(Sum.ofIntegers())) | `input |
Count | Count elements | input.apply(Count.globally()) | `input |
Mean | Calculate average | input.apply(Mean.globally()) | `input |
Multi-Collection Transforms
Transform | Description | Example (Java) | Example (Python) |
---|
CoGroupByKey | Join multiple collections on key | KeyedPCollectionTuple.of(...).apply(CoGroupByKey.create()) | `({‘A’: pcoll1, ‘B’: pcoll2} |
Flatten | Merge multiple collections | PCollectionList.of(pcoll1).and(pcoll2).apply(Flatten.pCollections()) | `(pcoll1, pcoll2) |
Partition | Split collection into multiple parts | input.apply(Partition.of(3, new PartitionFn())) | `input |
Windowing & Triggering
Windowing (Java)
PCollection<KV<String, Integer>> input = ...;
// Fixed window (every 1 minute)
PCollection<KV<String, Integer>> windowed = input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1))));
// Sliding window (5 minute windows, sliding every 1 minute)
PCollection<KV<String, Integer>> sliding = input
.apply(Window.<KV<String, Integer>>into(
SlidingWindows.of(Duration.standardMinutes(5))
.every(Duration.standardMinutes(1))));
// Session window (with 10 minute gap-duration)
PCollection<KV<String, Integer>> sessions = input
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(10))));
Windowing (Python)
input = ...
# Fixed window (every 1 minute)
windowed = input | beam.WindowInto(beam.window.FixedWindows(60))
# Sliding window (5 minute windows, sliding every 1 minute)
sliding = input | beam.WindowInto(beam.window.SlidingWindows(300, 60))
# Session window (with 10 minute gap-duration)
sessions = input | beam.WindowInto(beam.window.Sessions(600))
Triggers & Accumulation Mode
// Java: Custom trigger with early, late firings and accumulation mode
PCollection<KV<String, Integer>> triggered = input
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardHours(1))
.accumulatingFiredPanes());
# Python: Custom trigger with early, late firings and accumulation mode
triggered = input | beam.WindowInto(
beam.window.FixedWindows(60),
trigger=Trigger.AfterWatermark(
early=Trigger.AfterProcessingTime(30),
late=Trigger.AfterCount(1)
),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=3600
)
State & Timers
Stateful Processing (Java)
public class StatefulFn extends DoFn<KV<String, Integer>, KV<String, Integer>> {
@StateId("sum")
private final StateSpec<ValueState<Integer>> sumState = StateSpecs.value();
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("sum") ValueState<Integer> sumState) {
Integer current = sumState.read();
if (current == null) {
current = 0;
}
Integer newSum = current + c.element().getValue();
sumState.write(newSum);
c.output(KV.of(c.element().getKey(), newSum));
}
}
Stateful Processing (Python)
class StatefulFn(beam.DoFn):
SUM_STATE = CombiningStateSpec('sum', sum, 0)
def process(self, element, sum_state=beam.DoFn.StateParam(SUM_STATE)):
key, value = element
sum_state.add(value)
yield key, sum_state.read()
Timers (Java)
public class TimerDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>> {
@StateId("buffer")
private final StateSpec<ValueState<Integer>> bufferState = StateSpecs.value();
@TimerId("flushTimer")
private final TimerSpec flushTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("buffer") ValueState<Integer> bufferState,
@TimerId("flushTimer") Timer flushTimer) {
Integer currentValue = bufferState.read();
Integer newValue = c.element().getValue();
if (currentValue == null) {
currentValue = 0;
}
bufferState.write(currentValue + newValue);
// Set timer to flush state after 1 minute
flushTimer.offset(Duration.standardMinutes(1)).setRelative();
}
@OnTimer("flushTimer")
public void onFlushTimer(
OnTimerContext context,
@StateId("buffer") ValueState<Integer> bufferState) {
Integer value = bufferState.read();
if (value != null) {
context.output(KV.of(context.key(), value));
bufferState.clear();
}
}
}
Schema & SQL Support
Schema Definition (Java)
// Using a POJO
public class User {
public String name;
public int age;
public User() {} // Required for serialization
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
// Register and use schema
PCollection<User> users = ...;
users.setSchema(
Schema.builder()
.addStringField("name")
.addInt32Field("age")
.build(),
TypeDescriptors.javaClasses());
// Use schema transforms
PCollection<Row> adults = users.apply(
Filter.by(row -> row.getInt32("age") >= 18));
SQL Transforms
// Java
PCollection<Row> users = ...;
PCollection<Row> result = users.apply(
SqlTransform.query(
"SELECT name, age FROM PCOLLECTION WHERE age >= 18 ORDER BY age DESC"));
# Python
users = ...
result = users | beam.SqlTransform(
"SELECT name, age FROM PCOLLECTION WHERE age >= 18 ORDER BY age DESC")
I/O Transforms
Data Source/Sink | Read Transform (Java) | Write Transform (Java) |
---|
Text File | TextIO.read().from("path") | TextIO.write().to("path") |
Avro | AvroIO.read(User.class).from("path") | AvroIO.write(User.class).to("path") |
Parquet | ParquetIO.read(schema).from("path") | ParquetIO.write(schema).to("path") |
BigQuery | BigQueryIO.read().from("table") | BigQueryIO.write().to("table") |
Kafka | KafkaIO.read().withBootstrapServers("host") | KafkaIO.write().withBootstrapServers("host") |
JDBC | JdbcIO.read().withDataSourceConfiguration(...) | JdbcIO.write().withDataSourceConfiguration(...) |
MongoDB | MongoDbIO.read().withUri("uri") | MongoDbIO.write().withUri("uri") |
Redis | RedisIO.read().withEndpoint("host", port) | RedisIO.write().withEndpoint("host", port) |
PubSub | PubsubIO.readMessages().fromTopic("topic") | PubsubIO.writeMessages().to("topic") |
Cassandra | CassandraIO.read().withHosts(...).withKeyspace(...) | CassandraIO.write().withHosts(...).withKeyspace(...) |
Common Patterns & Solutions
Joining Data (Java)
PCollection<KV<String, User>> users = ...;
PCollection<KV<String, Order>> orders = ...;
// Join collections
PCollection<KV<String, CoGbkResult>> joined = KeyedPCollectionTuple
.of(userTag, users)
.and(orderTag, orders)
.apply(CoGroupByKey.create());
// Process joined data
PCollection<String> results = joined.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String userId = c.element().getKey();
CoGbkResult result = c.element().getValue();
Iterable<User> userList = result.getAll(userTag);
Iterable<Order> orderList = result.getAll(orderTag);
// Process joined data...
User user = Iterables.getOnlyElement(userList, null);
if (user != null) {
for (Order order : orderList) {
c.output(userId + "," + user.name + "," + order.amount);
}
}
}
}));
Joining Data (Python)
users = ... # PCollection of (user_id, user_data)
orders = ... # PCollection of (user_id, order_data)
# Join collections
joined = {'users': users, 'orders': orders} | beam.CoGroupByKey()
# Process joined data
def process_join(element):
user_id, join_data = element
user_list = list(join_data['users'])
order_list = list(join_data['orders'])
if user_list:
user = user_list[0]
for order in order_list:
yield user_id + "," + user['name'] + "," + str(order['amount'])
results = joined | beam.FlatMap(process_join)
Side Inputs
// Java
PCollection<KV<String, Integer>> mainInput = ...;
PCollection<String> sideInput = ...;
// Use as view
PCollectionView<List<String>> sideInputView =
sideInput.apply(View.asList());
mainInput.apply(ParDo.of(new DoFn<KV<String, Integer>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Integer> element = c.element();
List<String> sideInputList = c.sideInput(sideInputView);
// Use side input data with main input
if (sideInputList.contains(element.getKey())) {
c.output(element.getKey() + ":" + element.getValue());
}
}
}).withSideInputs(sideInputView));
# Python
main_input = ...
side_input = ...
# Create view
side_input_view = beam.pvalue.AsList(side_input)
def process_with_side_input(element, side_input_list):
key, value = element
if key in side_input_list:
yield key + ":" + str(value)
result = main_input | beam.ParDo(
process_with_side_input, side_input_list=side_input_view)
Streaming WordCount (Complete Example)
// Java streaming word count
public class StreamingWordCount {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines = pipeline
.apply("ReadLines", TextIO.read().from("gs://bucket/input"))
.apply("Window", Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<KV<String, Long>> counts = lines
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("\\s+"))))
.apply("CountWords", Count.perElement());
counts.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply("WriteResults", TextIO.write().to("gs://bucket/output"));
pipeline.run();
}
}
# Python streaming word count
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam import window
def run():
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as pipeline:
counts = (
pipeline
| "ReadLines" >> beam.io.ReadFromText("gs://bucket/input")
| "Window" >> beam.WindowInto(window.FixedWindows(60))
| "ExtractWords" >> beam.FlatMap(lambda line: line.split())
| "CountWords" >> beam.combiners.Count.PerElement()
| "FormatResults" >> beam.Map(lambda word_count:
f"{word_count[0]}: {word_count[1]}")
| "WriteResults" >> beam.io.WriteToText("gs://bucket/output")
)
if __name__ == "__main__":
run()
Performance Optimization
Best Practices
Area | Best Practice | Explanation |
---|
Data Size | Minimize data movement | Filter early, project only necessary fields |
Fusion | Be aware of fusion opportunities | Sequential steps can be fused by runner |
Parallelism | Control parallelism explicitly when needed | Use withMaxParallelism , withHintParallelism |
Coder | Register custom coders | Improves serialization performance |
Combine | Use Combine rather than GroupByKey + processing | More efficient for associative operations |
Hot Keys | Prevent hot keys | Add artificial key components, use salting |
Windowing | Choose appropriate window size | Too small: overhead; Too large: latency or memory issues |
Side Inputs | Limit size of side inputs | Large side inputs broadcast to all workers |
State | Use built-in state API | More efficient than manual state management |
Optimizing GroupByKey
// Hot key detection and prevention
PCollection<KV<String, Integer>> input = ...;
// Before grouping, add salt to keys to distribute load
PCollection<KV<String, Integer>> salted = input.apply(ParDo.of(
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Integer> elem = c.element();
String key = elem.getKey();
// Add random salt (0-9) to key
String saltedKey = key + "-" + (int) (Math.random() * 10);
c.output(KV.of(saltedKey, elem.getValue()));
}
}));
// Group by salted key
PCollection<KV<String, Iterable<Integer>>> grouped =
salted.apply(GroupByKey.create());
// After grouping, remove salt from keys
PCollection<KV<String, Iterable<Integer>>> unsalted =
grouped.apply(ParDo.of(
new DoFn<KV<String, Iterable<Integer>>, KV<String, Iterable<Integer>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<Integer>> elem = c.element();
String saltedKey = elem.getKey();
// Remove salt from key
String originalKey = saltedKey.substring(0, saltedKey.lastIndexOf("-"));
c.output(KV.of(originalKey, elem.getValue()));
}
}));
Debugging & Monitoring
Logging & Metrics
// Java: Custom metrics
public class MetricsDoFn extends DoFn<String, String> {
private final Counter processedElements =
Metrics.counter(MetricsDoFn.class, "processedElements");
private final Distribution elementLength =
Metrics.distribution(MetricsDoFn.class, "elementLength");
private final Gauge currentTime =
Metrics.gauge(MetricsDoFn.class, "currentTimeMillis");
@ProcessElement
public void processElement(ProcessContext c) {
String element = c.element();
// Update metrics
processedElements.inc();
elementLength.update(element.length());
currentTime.set(System.currentTimeMillis());
// Log elements for debugging
LOG.info("Processing element: " + element);
c.output(element.toUpperCase());
}
}
# Python: Custom metrics
class MetricsDoFn(beam.DoFn):
def __init__(self):
self.processed_elements = Metrics.counter(self.__class__, 'processed_elements')
self.element_length = Metrics.distribution(self.__class__, 'element_length')
self.current_time = Metrics.gauge(self.__class__, 'current_time_millis')
def process(self, element):
# Update metrics
self.processed_elements.inc(1)
self.element_length.update(len(element))
self.current_time.set(int(time.time() * 1000))
# Log elements for debugging
logging.info(f"Processing element: {element}")
yield element.upper()
Testing Pipelines
// Java: Testing with PAssert
@Test
public void testWordCount() {
Pipeline p = TestPipeline.create();
PCollection<String> input = p.apply(Create.of("hello world", "hello beam"));
PCollection<KV<String, Long>> counts = input
.apply(FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("\\s+"))))
.apply(Count.perElement());
PAssert.that(counts).containsInAnyOrder(
KV.of("hello", 2L),
KV.of("world", 1L),
KV.of("beam", 1L));
p.run();
}
# Python: Testing with assert_that
def test_word_count():
with TestPipeline() as p:
input_data = p | beam.Create(['hello world', 'hello beam'])
counts = (
input_data
| beam.FlatMap(lambda line: line.split())
| beam.combiners.Count.PerElement()
)
assert_that(counts, equal_to([
('hello', 2),
('world', 1),
('beam', 1)
]))
Resources for Further Learning
Official Documentation
Tutorials & Examples
Books
- “Streaming Systems” by Tyler Akidau, Slava Chernyak, and Reuven Lax
- “Streaming Architecture” by Ted Dunning and Ellen Friedman
- “Big Data: Principles and best practices of scalable realtime data systems” by Nathan Marz
Communities
Conferences & Presentations