Introduction: What is ActiveMQ and Why It Matters
Apache ActiveMQ is an open-source, multi-protocol message broker written in Java that implements the Java Message Service (JMS) specification. It serves as middleware that facilitates asynchronous communication and loose coupling between distributed applications and services. ActiveMQ enables reliable message exchange between different components of a system, supporting various messaging patterns including point-to-point, publish/subscribe, and request/reply. This message-oriented middleware is crucial for building scalable, resilient, and high-performance distributed systems by decoupling producers from consumers, handling peak loads, and ensuring message delivery even during component failures.
Core Concepts of ActiveMQ
Concept | Description |
---|---|
Message Broker | Central server that receives, stores, and forwards messages between applications |
Destination | Addressing entity where messages are sent (queue or topic) |
Queue | Point-to-point destination where each message is delivered to exactly one consumer |
Topic | Publish/subscribe destination where each message is delivered to all active subscribers |
Producer | Application component that sends messages to destinations |
Consumer | Application component that receives messages from destinations |
Message | Data unit exchanged between applications (headers, properties, payload) |
Connection | Network connection between client and broker |
Session | Single-threaded context for producing and consuming messages |
Persistence | Storage mechanism ensuring messages survive broker restarts |
Acknowledgment | Confirmation mechanism for message processing |
Setting Up ActiveMQ: Step-by-Step
- Download and install ActiveMQ
- Download latest release from ActiveMQ website
- Extract archive to desired location
- Set
ACTIVEMQ_HOME
environment variable (optional but recommended)
- Start the broker
- Navigate to
$ACTIVEMQ_HOME/bin
directory - Run
./activemq start
(Linux/Mac) oractivemq.bat start
(Windows) - Default web console: http://localhost:8161/admin (user: admin, password: admin)
- Navigate to
- Configure basic settings
- Edit
$ACTIVEMQ_HOME/conf/activemq.xml
for broker configuration - Modify
$ACTIVEMQ_HOME/conf/jetty.xml
for web console settings - Adjust
$ACTIVEMQ_HOME/conf/users.properties
andgroups.properties
for security
- Edit
- Set up persistence
- Choose storage mechanism (KahaDB, JDBC, etc.)
- Configure in activemq.xml using
<persistenceAdapter>
element - Example KahaDB configuration:xml
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
- Define destinations
- Create queues and topics via configuration or dynamically at runtime
- Configuration example:xml
<destinations> <queue physicalName="ORDERS.QUEUE"/> <topic physicalName="PRICE.UPDATES"/> </destinations>
Connection Methods and Protocols
JMS API (Java)
// Connection factory setup
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
// Session and destination
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("EXAMPLE.QUEUE");
// Producer example
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Hello ActiveMQ");
producer.send(message);
// Consumer example
MessageConsumer consumer = session.createConsumer(destination);
TextMessage received = (TextMessage) consumer.receive(1000);
System.out.println(received.getText());
// Cleanup
consumer.close();
session.close();
connection.close();
AMQP 1.0
// Using Qpid JMS client
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672");
Connection connection = factory.createConnection("username", "password");
// Rest similar to JMS example
STOMP
// JavaScript example with stomp.js
var client = Stomp.client("ws://localhost:61614/stomp");
client.connect("username", "password", function() {
client.subscribe("/queue/example", function(message) {
console.log("Received: " + message.body);
});
client.send("/queue/example", {}, "Hello STOMP");
});
MQTT
// Using Eclipse Paho client
MqttClient client = new MqttClient("tcp://localhost:1883", "ClientID");
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("username");
options.setPassword("password".toCharArray());
client.connect(options);
// Publish
client.publish("topic/example", "Hello MQTT".getBytes(), 1, false);
// Subscribe
client.subscribe("topic/example", (topic, message) -> {
System.out.println("Received: " + new String(message.getPayload()));
});
Message Types and Structures
JMS Message Types
- TextMessage: String payload
- BytesMessage: Stream of uninterpreted bytes
- MapMessage: Name-value pairs
- ObjectMessage: Serialized Java object
- StreamMessage: Primitive Java types in stream
- Message: Headers and properties only (no body)
Message Components
Message = Headers + Properties + Body
Headers (Standard JMS):
- JMSDestination
- JMSDeliveryMode (PERSISTENT/NON_PERSISTENT)
- JMSExpiration
- JMSPriority (0-9)
- JMSMessageID
- JMSTimestamp
- JMSCorrelationID
- JMSReplyTo
- JMSType
- JMSRedelivered
Properties:
- JMS defined (JMSXxxxx)
- Provider-specific
- Application-specific
Key Configuration Options by Category
Transport Connectors
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614"/>
</transportConnectors>
Memory Settings
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
Security Settings
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
<authenticationUser username="publisher" password="publisher" groups="publishers"/>
<authenticationUser username="consumer" password="consumer" groups="consumers"/>
</users>
</simpleAuthenticationPlugin>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="consumers" write="publishers" admin="admins"/>
<authorizationEntry topic=">" read="consumers" write="publishers" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
Destination Policies
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Comparing Deployment Architectures
Architecture | Description | Best For | Considerations |
---|---|---|---|
Standalone Broker | Single broker instance | Small applications, development | Single point of failure |
Master-Slave | Hot standby replica | Production with HA needs | Failover support needed in clients |
Network of Brokers | Multiple connected brokers | Distributed systems, high throughput | Complex configuration, network overhead |
Broker Cluster | Multiple brokers acting as one | Load balancing, scalability | Client connection routing |
Embedded Broker | Broker within application | Self-contained applications | Limited management features |
Docker/Kubernetes | Containerized deployment | Cloud-native applications | Resource allocation, persistence challenges |
Common Messaging Patterns with ActiveMQ
Point-to-Point (Queue)
// Producer
Queue queue = session.createQueue("ORDERS.QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Order #12345");
producer.send(message);
// Consumer
MessageConsumer consumer = session.createConsumer(queue);
TextMessage received = (TextMessage) consumer.receive(); // blocks until message arrives
Publish/Subscribe (Topic)
// Publisher
Topic topic = session.createTopic("PRICE.UPDATES");
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage("AAPL:150.75");
producer.send(message);
// Subscriber
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
try {
System.out.println("Received: " + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
Request/Reply
// Requestor
Queue requestQueue = session.createQueue("REQUESTS");
Queue replyQueue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(requestQueue);
MessageConsumer consumer = session.createConsumer(replyQueue);
TextMessage request = session.createTextMessage("GetQuote:IBM");
request.setJMSReplyTo(replyQueue);
String correlationId = UUID.randomUUID().toString();
request.setJMSCorrelationID(correlationId);
producer.send(request);
// Responder
MessageConsumer requestConsumer = session.createConsumer(requestQueue);
requestConsumer.setMessageListener(requestMessage -> {
try {
TextMessage txtMsg = (TextMessage) requestMessage;
Destination replyDestination = txtMsg.getJMSReplyTo();
String correlationId = txtMsg.getJMSCorrelationID();
MessageProducer replyProducer = session.createProducer(replyDestination);
TextMessage replyMessage = session.createTextMessage("IBM:145.80");
replyMessage.setJMSCorrelationID(correlationId);
replyProducer.send(replyMessage);
} catch (JMSException e) {
e.printStackTrace();
}
});
Durable Subscription
// Creating durable subscription
connection.setClientID("ClientXYZ");
Topic topic = session.createTopic("NEWS.UPDATES");
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubscriptionName");
// Later reconnecting to the same subscription
MessageConsumer reconnectedConsumer = session.createDurableSubscriber(topic, "SubscriptionName");
// Removing subscription when no longer needed
session.unsubscribe("SubscriptionName");
Message Grouping
// Producer with grouping
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Order item for customer 1001");
message.setStringProperty("JMSXGroupID", "Customer-1001");
producer.send(message);
Common Challenges and Solutions
Challenge: Message Loss
Solutions:
- Use persistent delivery mode:
producer.setDeliveryMode(DeliveryMode.PERSISTENT)
- Configure proper acknowledgment mode:
Session.CLIENT_ACKNOWLEDGE
and callmessage.acknowledge()
- Implement dead letter queues: Configure
<deadLetterStrategy>
in policy entries - Use transactions:
session = connection.createSession(true, Session.SESSION_TRANSACTED)
andsession.commit()
Challenge: Performance Bottlenecks
Solutions:
- Use non-persistent delivery for non-critical messages:
DeliveryMode.NON_PERSISTENT
- Implement producer flow control:
<policyEntry producerFlowControl="true"
- Tune prefetch values:
consumer.setPrefetchSize(10)
- Batch messages using transactions
- Configure appropriate memory settings
- Use async consumers:
consumer.setMessageListener(message -> { ... })
Challenge: High Memory Usage
Solutions:
- Configure memory limits:
<memoryUsage limit="64 mb"/>
- Implement producer flow control
- Set appropriate destination policies
- Use cursor-based message stores for topics:
<pendingSubscriberPolicy><vmCursor/></pendingSubscriberPolicy>
- Implement message expiration:
producer.setTimeToLive(60000)
(60 seconds)
Challenge: Network of Brokers Issues
Solutions:
- Avoid duplicate message delivery:
<networkConnector duplex="false" decreaseNetworkConsumerPriority="true">
- Control message flow:
<networkConnector prefetchSize="1">
- Prevent cycles:
<networkConnector conduitSubscriptions="false">
- Filter messages:
<networkConnector dynamicallyIncludedDestinations="queue.A,topic.B">
Challenge: Slow Consumers
Solutions:
- Implement expiry policies:
<policyEntry expireMessagesPeriod="10000">
- Use cursor-based pending message stores:
<fileCursor/>
- Configure advisor notifications:
<policyEntry advisoryForSlowConsumers="true">
- Increase consumer threads in connection pool
Monitoring and Management
JMX Monitoring
// Enable JMX in activemq.xml
<broker useJmx="true">
<managementContext>
<managementContext createConnector="true" connectorPort="1099"/>
</managementContext>
</broker>
// Connect with JConsole: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
Key Metrics to Monitor
- Queue Depth: Number of pending messages
- Producer/Consumer Count: Active connection counts
- Memory Usage: Percentage of memory limit used
- Store Usage: Disk space used for persistence
- Connection Count: Total active connections
- Enqueue/Dequeue Rates: Message throughput
- Expired/DLQ Message Count: Failed message delivery indicators
Command Line Management
# List queues
activemq query -QQueue=*
# View queue statistics
activemq browse ORDERS.QUEUE
# Purge queue
activemq purge ORDERS.QUEUE
# Send message
activemq producer --message "Test message" --destination queue://TEST.QUEUE
# Consume messages
activemq consumer --destination queue://TEST.QUEUE
Advanced Features and Configurations
Message Redelivery Policy
// Client-side configuration
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000); // 1 second
policy.setBackOffMultiplier(2.0); // exponential backoff
policy.setMaximumRedeliveries(6); // try 6 times before DLQ
policy.setUseExponentialBackOff(true);
factory.setRedeliveryPolicy(policy);
Message Selectors
// Consumer with selector
String selector = "JMSPriority > 4 AND region = 'EMEA'";
MessageConsumer consumer = session.createConsumer(destination, selector);
// Setting filterable properties
TextMessage message = session.createTextMessage("High priority order");
message.setIntProperty("JMSPriority", 7);
message.setStringProperty("region", "EMEA");
producer.send(message);
Composite Destinations
<!-- In activemq.xml -->
<compositeTopic name="COMPOSITE.TOPIC" forwardOnly="false">
<forwardTo>
<queue physicalName="QUEUE.A"/>
<topic physicalName="TOPIC.B"/>
<topic physicalName="TOPIC.C"/>
</forwardTo>
</compositeTopic>
Virtual Topics
<!-- Producer sends to topic -->
<topic physicalName="VirtualTopic.Orders"/>
<!-- Consumers read from queue with consumer-specific name -->
<queue physicalName="Consumer.App1.VirtualTopic.Orders"/>
<queue physicalName="Consumer.App2.VirtualTopic.Orders"/>
Master-Slave Configuration (Shared File System)
<!-- Master and Slave use the same data directory -->
<persistenceAdapter>
<kahaDB directory="/shared/activemq/data"/>
</persistenceAdapter>
Resources for Further Learning
Official Documentation
- Apache ActiveMQ User Manual
- ActiveMQ in Action (book)
- ActiveMQ GitHub Repository
- ActiveMQ Mailing Lists
Monitoring Tools
- Hawtio Web Console
- ActiveMQ Web Console
- JConsole with JMX
- Commercial tools (Dynatrace, AppDynamics, etc.)
Integration Examples
- Spring JMS integration
- Camel-ActiveMQ integration
- Microservice patterns with ActiveMQ
- Cloud deployment scenarios
Remember: While ActiveMQ is powerful and flexible, the best practice is to start with simple configurations and add complexity only as needed. Always test thoroughly under expected production loads before deploying to production.