Introduction
Apache Storm is a distributed, fault-tolerant, real-time computation system designed to process vast amounts of data with low latency. Developed initially at BackType and later acquired by Twitter, Storm was made open source and is now a top-level Apache project. It’s particularly valuable for real-time analytics, continuous computation, distributed RPC, and ETL (Extract, Transform, Load) operations.
Unlike batch processing systems such as Hadoop, Storm processes data as it arrives, making it ideal for applications requiring immediate insights from streaming data. Its distributed nature allows it to scale horizontally across clusters of machines, providing both high throughput and fault tolerance.
Core Concepts
| Concept | Description |
|---|---|
| Topology | The overall processing logic defined as a directed acyclic graph (DAG) of spouts and bolts |
| Stream | An unbounded sequence of tuples that is processed in parallel across the cluster |
| Tuple | The basic data unit in Storm, essentially a named list of values |
| Spout | A source of streams; reads data from external sources and emits tuples into the topology |
| Bolt | Processes input streams and produces output streams; implements the processing logic |
| Task | An instance of a spout or bolt; multiple tasks can run for one component |
| Worker | A JVM process that executes a subset of a topology |
| Executor | A thread within a worker process that runs one or more tasks |
| Stream Grouping | Defines how stream tuples are distributed from one component to another |
Storm Architecture
Cluster Components
| Component | Description |
|---|---|
| Nimbus | Master node that distributes code, assigns tasks, and monitors for failures |
| Supervisor | Worker node that starts/stops worker processes as directed by Nimbus |
| ZooKeeper | Coordinates the cluster and stores cluster state and statistics |
| UI | Web interface for monitoring and managing topologies |
Process Hierarchy
Cluster → Topologies → Workers → Executors → Tasks
Setting Up Storm
Installation Prerequisites
- Java 8+ installed
- Python 2.6.6+
- ZooKeeper cluster set up
Basic Installation Steps
# Download Storm
wget https://downloads.apache.org/storm/apache-storm-x.y.z/apache-storm-x.y.z.tar.gz
# Extract
tar -xzf apache-storm-x.y.z.tar.gz
cd apache-storm-x.y.z
# Configure Storm (edit conf/storm.yaml)
Minimal Configuration (storm.yaml)
storm.zookeeper.servers:
- "zk1.example.com"
- "zk2.example.com"
- "zk3.example.com"
storm.zookeeper.port: 2181
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["nimbus1.example.com", "nimbus2.example.com"]
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
Building Storm Topologies
Maven Dependency
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
Basic Topology Structure (Java)
// Define a spout
SpoutDeclarer spout = new RandomSentenceSpout();
builder.setSpout("spout", spout, 2);
// Define a bolt
BoltDeclarer bolt = new SplitSentenceBolt();
builder.setBolt("split", bolt, 4).shuffleGrouping("spout");
// Define another bolt receiving data from the previous one
builder.setBolt("count", new WordCountBolt(), 4).fieldsGrouping("split", new Fields("word"));
// Create the topology
TopologyBuilder builder = new TopologyBuilder();
StormTopology topology = builder.createTopology();
// Submit the topology
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopology("word-count", conf, topology);
Simplified Topology Example (Java)
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// Add the spout, with a parallelism hint of 5 executors
builder.setSpout("randomSentence", new RandomSentenceSpout(), 5);
// Add the bolt, subscribing to the spout's output stream and with a parallelism hint of 8
builder.setBolt("splitSentence", new SplitSentenceBolt(), 8)
.shuffleGrouping("randomSentence");
// Add another bolt, subscribing to the SplitSentence bolt's output
builder.setBolt("wordCount", new WordCountBolt(), 12)
.fieldsGrouping("splitSentence", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
// Remote cluster submission
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
// Local cluster for testing
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
Spout Implementation
Basic Spout (Java)
public class RandomSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;
private String[] sentences = {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String sentence = sentences[random.nextInt(sentences.length)];
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
Bolt Implementation
Basic Bolt (Java)
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Word Count Bolt (Java)
public class WordCountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.getOrDefault(word, 0) + 1;
counts.put(word, count);
collector.emit(new Values(word, count));
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
Stream Groupings
| Grouping Type | Description | Use Case |
|---|---|---|
| Shuffle Grouping | Tuples randomly distributed across bolt tasks | Load balancing |
| Fields Grouping | Tuples grouped by specified field values | Aggregating counts by key |
| All Grouping | Replicates tuples to all bolt tasks | Broadcasting signals |
| Global Grouping | All tuples go to a single task (lowest ID) | Global operations |
| Direct Grouping | Source decides which task receives tuple | Custom routing logic |
| Local or Shuffle Grouping | Like shuffle but prefers local processes | Reducing network traffic |
| Partial Key Grouping | Load balancing version of fields grouping | Better load distribution |
| None Grouping | Same as shuffle grouping | Legacy purposes |
Reliability & Tuple Processing
Reliability Mechanisms
- Acking System: Storm tracks tuple trees and ensures all tuples are processed
- Timeout Mechanism: Tuples not acknowledged within a configurable timeout are considered failed
- Guaranteed Message Processing: Tuples are replayed on failure
Tuple Acknowledgement
// In a bolt's execute method:
collector.ack(tuple); // Mark successful processing
collector.fail(tuple); // Mark failed processing
Reliability Configurations
# In storm.yaml or Config object
topology.max.spout.pending: 1000 # Max tuples pending per spout task
topology.message.timeout.secs: 30 # Tuple timeout
topology.acker.executors: 20 # Number of acker tasks
Storm CLI Commands
# Submit topology
storm jar topology.jar org.example.WordCountTopology wordcount-topology
# List running topologies
storm list
# Kill topology
storm kill topology-name [-w wait-time-secs]
# Deactivate topology (pause processing)
storm deactivate topology-name
# Reactivate topology (resume processing)
storm activate topology-name
# Rebalance topology (change parallelism)
storm rebalance topology-name [-n workers] [-e component=tasks]
# View topology info
storm topologyinfo topology-name
Configuration Options
Key Configuration Parameters
| Parameter | Description | Default |
|---|---|---|
topology.workers | Number of worker processes | 1 |
topology.acker.executors | Number of acker tasks | null (1 per worker) |
topology.max.spout.pending | Max in-flight tuples per spout | null (unbounded) |
topology.message.timeout.secs | Tuple processing timeout | 30 |
topology.debug | Enable debug logging | false |
storm.local.dir | Directory for local storage | $STORM_HOME/storm-local |
supervisor.slots.ports | Worker ports available | [6700, 6701, 6702, 6703] |
worker.childopts | JVM options for workers | “” |
nimbus.childopts | JVM options for Nimbus | “” |
supervisor.childopts | JVM options for Supervisor | “” |
Setting Configurations
// Programmatically in Java
Config conf = new Config();
conf.setNumWorkers(3);
conf.setMaxSpoutPending(5000);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
Performance Tuning
Worker & Executor Configuration
- Allocate more executors (threads) for bottleneck components
- Allocate reasonable number of workers based on cluster size
- Favor more executors over more workers for most cases
Memory Tuning
# In storm.yaml or as JVM options
worker.childopts: "-Xmx4g" # Set worker JVM heap size
Parallelism Levels
| Level | Description | Setting Method |
|---|---|---|
| Workers | Number of processes | Config.setNumWorkers() |
| Executors | Number of threads | Parallelism hint in setSpout/setBolt |
| Tasks | Number of instances | setNumTasks() method on bolt/spout |
Rebalancing Example
# Increase parallelism of 'split-bolt' to 20 executors and worker count to 5
storm rebalance mytopology -n 5 -e split-bolt=20
Common Challenges & Solutions
| Challenge | Solution |
|---|---|
| Tuple Timeout | Increase topology.message.timeout.secs; check for bottlenecks |
| Out of Memory | Increase worker heap size; reduce topology.max.spout.pending |
| Data Skew | Use Partial Key Grouping; partitioning keys differently |
| Slow Processing | Increase parallelism of bottleneck components |
| Lost ZooKeeper Connection | Check ZooKeeper health; increase session timeout |
| Tuple Replay Storms | Implement backpressure; improve error handling |
Integration with Other Systems
Kafka Integration
<!-- Maven dependency -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.4.0</version>
</dependency>
// Kafka Spout configuration
KafkaSpoutConfig<String, String> kafkaConfig = KafkaSpoutConfig
.builder(bootstrapServers, Pattern.compile("topic-*"))
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-kafka-group")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.build();
builder.setSpout("kafka-spout", new KafkaSpout<>(kafkaConfig), 4);
HDFS Integration
<!-- Maven dependency -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>2.4.0</version>
</dependency>
// HDFS Bolt configuration
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(new DefaultFileNameFormat()
.withPath("/storm/data/")
.withPrefix("data")
.withExtension(".txt"))
.withRecordFormat(new DelimitedRecordFormat()
.withFieldDelimiter(","))
.withRotationPolicy(new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES))
.withSyncPolicy(new CountSyncPolicy(1000));
builder.setBolt("hdfs-bolt", bolt, 4).shuffleGrouping("previous-bolt");
Monitoring & Debugging
UI Dashboard
Storm UI provides information on:
- Topology summary
- Spout/bolt statistics
- Executors and component details
- Topology visualization
- Worker logs
JMX Metrics
# In storm.yaml
storm.daemon.metrics.reporter.plugins:
- "org.apache.storm.metrics2.reporters.JmxStormReporter"
Log Settings
# In storm.yaml or log4j2 configuration
logs.users:
- "user1"
logs.groups:
- "group1"
Best Practices
Design
- Keep topologies simple and focused on a single purpose
- Process data as close to the source as possible
- Be mindful of state in bolts (use windowing or state management)
- Use micro-batching for higher throughput when possible
Development
- Test locally with LocalCluster before deploying
- Implement proper error handling in spouts and bolts
- Use Storm’s metrics API to expose custom metrics
- Log appropriately for debugging (but avoid excessive logging)
Operations
- Monitor with Storm UI and external monitoring tools
- Start with modest parallelism and scale based on metrics
- Implement backpressure mechanisms
- Use reliable sources and sinks for critical applications
- Create topologies that can be updated without losing state
Performance
- Use fields grouping judiciously (can cause skew)
- Batch tuple emissions when possible
- Prefer in-memory computations over external lookups
- Consider using Trident for exactly-once processing
