Apache Storm: The Comprehensive Cheat Sheet

Introduction

Apache Storm is a distributed, fault-tolerant, real-time computation system designed to process vast amounts of data with low latency. Developed initially at BackType and later acquired by Twitter, Storm was made open source and is now a top-level Apache project. It’s particularly valuable for real-time analytics, continuous computation, distributed RPC, and ETL (Extract, Transform, Load) operations.

Unlike batch processing systems such as Hadoop, Storm processes data as it arrives, making it ideal for applications requiring immediate insights from streaming data. Its distributed nature allows it to scale horizontally across clusters of machines, providing both high throughput and fault tolerance.

Core Concepts

ConceptDescription
TopologyThe overall processing logic defined as a directed acyclic graph (DAG) of spouts and bolts
StreamAn unbounded sequence of tuples that is processed in parallel across the cluster
TupleThe basic data unit in Storm, essentially a named list of values
SpoutA source of streams; reads data from external sources and emits tuples into the topology
BoltProcesses input streams and produces output streams; implements the processing logic
TaskAn instance of a spout or bolt; multiple tasks can run for one component
WorkerA JVM process that executes a subset of a topology
ExecutorA thread within a worker process that runs one or more tasks
Stream GroupingDefines how stream tuples are distributed from one component to another

Storm Architecture

Cluster Components

ComponentDescription
NimbusMaster node that distributes code, assigns tasks, and monitors for failures
SupervisorWorker node that starts/stops worker processes as directed by Nimbus
ZooKeeperCoordinates the cluster and stores cluster state and statistics
UIWeb interface for monitoring and managing topologies

Process Hierarchy

Cluster → Topologies → Workers → Executors → Tasks

Setting Up Storm

Installation Prerequisites

  • Java 8+ installed
  • Python 2.6.6+
  • ZooKeeper cluster set up

Basic Installation Steps

# Download Storm
wget https://downloads.apache.org/storm/apache-storm-x.y.z/apache-storm-x.y.z.tar.gz

# Extract
tar -xzf apache-storm-x.y.z.tar.gz
cd apache-storm-x.y.z

# Configure Storm (edit conf/storm.yaml)

Minimal Configuration (storm.yaml)

storm.zookeeper.servers:
  - "zk1.example.com"
  - "zk2.example.com"
  - "zk3.example.com"

storm.zookeeper.port: 2181
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["nimbus1.example.com", "nimbus2.example.com"]
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

Building Storm Topologies

Maven Dependency

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.4.0</version>
    <scope>provided</scope>
</dependency>

Basic Topology Structure (Java)

// Define a spout
SpoutDeclarer spout = new RandomSentenceSpout();
builder.setSpout("spout", spout, 2);

// Define a bolt
BoltDeclarer bolt = new SplitSentenceBolt();
builder.setBolt("split", bolt, 4).shuffleGrouping("spout");

// Define another bolt receiving data from the previous one
builder.setBolt("count", new WordCountBolt(), 4).fieldsGrouping("split", new Fields("word"));

// Create the topology
TopologyBuilder builder = new TopologyBuilder();
StormTopology topology = builder.createTopology();

// Submit the topology
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopology("word-count", conf, topology);

Simplified Topology Example (Java)

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        // Add the spout, with a parallelism hint of 5 executors
        builder.setSpout("randomSentence", new RandomSentenceSpout(), 5);
        
        // Add the bolt, subscribing to the spout's output stream and with a parallelism hint of 8
        builder.setBolt("splitSentence", new SplitSentenceBolt(), 8)
               .shuffleGrouping("randomSentence");
        
        // Add another bolt, subscribing to the SplitSentence bolt's output
        builder.setBolt("wordCount", new WordCountBolt(), 12)
               .fieldsGrouping("splitSentence", new Fields("word"));
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        
        if (args != null && args.length > 0) {
            // Remote cluster submission
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            // Local cluster for testing
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

Spout Implementation

Basic Spout (Java)

public class RandomSentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private Random random;
    private String[] sentences = {
        "the cow jumped over the moon",
        "an apple a day keeps the doctor away",
        "four score and seven years ago",
        "snow white and the seven dwarfs",
        "i am at two with nature"
    };

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.random = new Random();
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String sentence = sentences[random.nextInt(sentences.length)];
        collector.emit(new Values(sentence));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}

Bolt Implementation

Basic Bolt (Java)

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Word Count Bolt (Java)

public class WordCountBolt extends BaseRichBolt {
    private Map<String, Integer> counts = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = counts.getOrDefault(word, 0) + 1;
        counts.put(word, count);
        collector.emit(new Values(word, count));
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

Stream Groupings

Grouping TypeDescriptionUse Case
Shuffle GroupingTuples randomly distributed across bolt tasksLoad balancing
Fields GroupingTuples grouped by specified field valuesAggregating counts by key
All GroupingReplicates tuples to all bolt tasksBroadcasting signals
Global GroupingAll tuples go to a single task (lowest ID)Global operations
Direct GroupingSource decides which task receives tupleCustom routing logic
Local or Shuffle GroupingLike shuffle but prefers local processesReducing network traffic
Partial Key GroupingLoad balancing version of fields groupingBetter load distribution
None GroupingSame as shuffle groupingLegacy purposes

Reliability & Tuple Processing

Reliability Mechanisms

  • Acking System: Storm tracks tuple trees and ensures all tuples are processed
  • Timeout Mechanism: Tuples not acknowledged within a configurable timeout are considered failed
  • Guaranteed Message Processing: Tuples are replayed on failure

Tuple Acknowledgement

// In a bolt's execute method:
collector.ack(tuple);   // Mark successful processing
collector.fail(tuple);  // Mark failed processing

Reliability Configurations

# In storm.yaml or Config object
topology.max.spout.pending: 1000  # Max tuples pending per spout task
topology.message.timeout.secs: 30  # Tuple timeout
topology.acker.executors: 20  # Number of acker tasks

Storm CLI Commands

# Submit topology
storm jar topology.jar org.example.WordCountTopology wordcount-topology

# List running topologies
storm list

# Kill topology
storm kill topology-name [-w wait-time-secs]

# Deactivate topology (pause processing)
storm deactivate topology-name

# Reactivate topology (resume processing)
storm activate topology-name

# Rebalance topology (change parallelism)
storm rebalance topology-name [-n workers] [-e component=tasks]

# View topology info
storm topologyinfo topology-name

Configuration Options

Key Configuration Parameters

ParameterDescriptionDefault
topology.workersNumber of worker processes1
topology.acker.executorsNumber of acker tasksnull (1 per worker)
topology.max.spout.pendingMax in-flight tuples per spoutnull (unbounded)
topology.message.timeout.secsTuple processing timeout30
topology.debugEnable debug loggingfalse
storm.local.dirDirectory for local storage$STORM_HOME/storm-local
supervisor.slots.portsWorker ports available[6700, 6701, 6702, 6703]
worker.childoptsJVM options for workers“”
nimbus.childoptsJVM options for Nimbus“”
supervisor.childoptsJVM options for Supervisor“”

Setting Configurations

// Programmatically in Java
Config conf = new Config();
conf.setNumWorkers(3);
conf.setMaxSpoutPending(5000);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);

Performance Tuning

Worker & Executor Configuration

  • Allocate more executors (threads) for bottleneck components
  • Allocate reasonable number of workers based on cluster size
  • Favor more executors over more workers for most cases

Memory Tuning

# In storm.yaml or as JVM options
worker.childopts: "-Xmx4g"  # Set worker JVM heap size

Parallelism Levels

LevelDescriptionSetting Method
WorkersNumber of processesConfig.setNumWorkers()
ExecutorsNumber of threadsParallelism hint in setSpout/setBolt
TasksNumber of instancessetNumTasks() method on bolt/spout

Rebalancing Example

# Increase parallelism of 'split-bolt' to 20 executors and worker count to 5
storm rebalance mytopology -n 5 -e split-bolt=20

Common Challenges & Solutions

ChallengeSolution
Tuple TimeoutIncrease topology.message.timeout.secs; check for bottlenecks
Out of MemoryIncrease worker heap size; reduce topology.max.spout.pending
Data SkewUse Partial Key Grouping; partitioning keys differently
Slow ProcessingIncrease parallelism of bottleneck components
Lost ZooKeeper ConnectionCheck ZooKeeper health; increase session timeout
Tuple Replay StormsImplement backpressure; improve error handling

Integration with Other Systems

Kafka Integration

<!-- Maven dependency -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>2.4.0</version>
</dependency>
// Kafka Spout configuration
KafkaSpoutConfig<String, String> kafkaConfig = KafkaSpoutConfig
    .builder(bootstrapServers, Pattern.compile("topic-*"))
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-kafka-group")
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
    .build();

builder.setSpout("kafka-spout", new KafkaSpout<>(kafkaConfig), 4);

HDFS Integration

<!-- Maven dependency -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-hdfs</artifactId>
    <version>2.4.0</version>
</dependency>
// HDFS Bolt configuration
HdfsBolt bolt = new HdfsBolt()
    .withFsUrl("hdfs://localhost:9000")
    .withFileNameFormat(new DefaultFileNameFormat()
        .withPath("/storm/data/")
        .withPrefix("data")
        .withExtension(".txt"))
    .withRecordFormat(new DelimitedRecordFormat()
        .withFieldDelimiter(","))
    .withRotationPolicy(new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES))
    .withSyncPolicy(new CountSyncPolicy(1000));

builder.setBolt("hdfs-bolt", bolt, 4).shuffleGrouping("previous-bolt");

Monitoring & Debugging

UI Dashboard

Storm UI provides information on:

  • Topology summary
  • Spout/bolt statistics
  • Executors and component details
  • Topology visualization
  • Worker logs

JMX Metrics

# In storm.yaml
storm.daemon.metrics.reporter.plugins:
  - "org.apache.storm.metrics2.reporters.JmxStormReporter"

Log Settings

# In storm.yaml or log4j2 configuration
logs.users:
  - "user1"
logs.groups:
  - "group1"

Best Practices

  1. Design

    • Keep topologies simple and focused on a single purpose
    • Process data as close to the source as possible
    • Be mindful of state in bolts (use windowing or state management)
    • Use micro-batching for higher throughput when possible
  2. Development

    • Test locally with LocalCluster before deploying
    • Implement proper error handling in spouts and bolts
    • Use Storm’s metrics API to expose custom metrics
    • Log appropriately for debugging (but avoid excessive logging)
  3. Operations

    • Monitor with Storm UI and external monitoring tools
    • Start with modest parallelism and scale based on metrics
    • Implement backpressure mechanisms
    • Use reliable sources and sinks for critical applications
    • Create topologies that can be updated without losing state
  4. Performance

    • Use fields grouping judiciously (can cause skew)
    • Batch tuple emissions when possible
    • Prefer in-memory computations over external lookups
    • Consider using Trident for exactly-once processing

Resources for Further Learning

Scroll to Top