ActiveMQ Ultimate Cheatsheet: Master Message Brokering

Introduction: What is ActiveMQ and Why It Matters

Apache ActiveMQ is an open-source, multi-protocol message broker written in Java that implements the Java Message Service (JMS) specification. It serves as middleware that facilitates asynchronous communication and loose coupling between distributed applications and services. ActiveMQ enables reliable message exchange between different components of a system, supporting various messaging patterns including point-to-point, publish/subscribe, and request/reply. This message-oriented middleware is crucial for building scalable, resilient, and high-performance distributed systems by decoupling producers from consumers, handling peak loads, and ensuring message delivery even during component failures.

Core Concepts of ActiveMQ

ConceptDescription
Message BrokerCentral server that receives, stores, and forwards messages between applications
DestinationAddressing entity where messages are sent (queue or topic)
QueuePoint-to-point destination where each message is delivered to exactly one consumer
TopicPublish/subscribe destination where each message is delivered to all active subscribers
ProducerApplication component that sends messages to destinations
ConsumerApplication component that receives messages from destinations
MessageData unit exchanged between applications (headers, properties, payload)
ConnectionNetwork connection between client and broker
SessionSingle-threaded context for producing and consuming messages
PersistenceStorage mechanism ensuring messages survive broker restarts
AcknowledgmentConfirmation mechanism for message processing

Setting Up ActiveMQ: Step-by-Step

  1. Download and install ActiveMQ
    • Download latest release from ActiveMQ website
    • Extract archive to desired location
    • Set ACTIVEMQ_HOME environment variable (optional but recommended)
  2. Start the broker
    • Navigate to $ACTIVEMQ_HOME/bin directory
    • Run ./activemq start (Linux/Mac) or activemq.bat start (Windows)
    • Default web console: http://localhost:8161/admin (user: admin, password: admin)
  3. Configure basic settings
    • Edit $ACTIVEMQ_HOME/conf/activemq.xml for broker configuration
    • Modify $ACTIVEMQ_HOME/conf/jetty.xml for web console settings
    • Adjust $ACTIVEMQ_HOME/conf/users.properties and groups.properties for security
  4. Set up persistence
    • Choose storage mechanism (KahaDB, JDBC, etc.)
    • Configure in activemq.xml using <persistenceAdapter> element
    • Example KahaDB configuration:
       
      xml
      <persistenceAdapter>
          <kahaDB directory="${activemq.data}/kahadb"/>
      </persistenceAdapter>
  5. Define destinations
    • Create queues and topics via configuration or dynamically at runtime
    • Configuration example:
       
      xml
      <destinations>
          <queue physicalName="ORDERS.QUEUE"/>
          <topic physicalName="PRICE.UPDATES"/>
      </destinations>

Connection Methods and Protocols

JMS API (Java)

 
java
// Connection factory setup
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();

// Session and destination
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("EXAMPLE.QUEUE");

// Producer example
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Hello ActiveMQ");
producer.send(message);

// Consumer example
MessageConsumer consumer = session.createConsumer(destination);
TextMessage received = (TextMessage) consumer.receive(1000);
System.out.println(received.getText());

// Cleanup
consumer.close();
session.close();
connection.close();

AMQP 1.0

 
java
// Using Qpid JMS client
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672");
Connection connection = factory.createConnection("username", "password");
// Rest similar to JMS example

STOMP

 
javascript
// JavaScript example with stomp.js
var client = Stomp.client("ws://localhost:61614/stomp");
client.connect("username", "password", function() {
  client.subscribe("/queue/example", function(message) {
    console.log("Received: " + message.body);
  });
  
  client.send("/queue/example", {}, "Hello STOMP");
});

MQTT

 
java
// Using Eclipse Paho client
MqttClient client = new MqttClient("tcp://localhost:1883", "ClientID");
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("username");
options.setPassword("password".toCharArray());
client.connect(options);

// Publish
client.publish("topic/example", "Hello MQTT".getBytes(), 1, false);

// Subscribe
client.subscribe("topic/example", (topic, message) -> {
    System.out.println("Received: " + new String(message.getPayload()));
});

Message Types and Structures

JMS Message Types

  • TextMessage: String payload
  • BytesMessage: Stream of uninterpreted bytes
  • MapMessage: Name-value pairs
  • ObjectMessage: Serialized Java object
  • StreamMessage: Primitive Java types in stream
  • Message: Headers and properties only (no body)

Message Components

 
Message = Headers + Properties + Body

Headers (Standard JMS):
- JMSDestination
- JMSDeliveryMode (PERSISTENT/NON_PERSISTENT)
- JMSExpiration
- JMSPriority (0-9)
- JMSMessageID
- JMSTimestamp
- JMSCorrelationID
- JMSReplyTo
- JMSType
- JMSRedelivered

Properties:
- JMS defined (JMSXxxxx)
- Provider-specific
- Application-specific

Key Configuration Options by Category

Transport Connectors

 
xml
<transportConnectors>
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883"/>
  <transportConnector name="ws" uri="ws://0.0.0.0:61614"/>
</transportConnectors>

Memory Settings

 
xml
<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage percentOfJvmHeap="70"/>
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb"/>
    </storeUsage>
    <tempUsage>
      <tempUsage limit="50 gb"/>
    </tempUsage>
  </systemUsage>
</systemUsage>

Security Settings

 
xml
<plugins>
  <simpleAuthenticationPlugin>
    <users>
      <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
      <authenticationUser username="publisher" password="publisher" groups="publishers"/>
      <authenticationUser username="consumer" password="consumer" groups="consumers"/>
    </users>
  </simpleAuthenticationPlugin>
  
  <authorizationPlugin>
    <map>
      <authorizationMap>
        <authorizationEntries>
          <authorizationEntry queue=">" read="consumers" write="publishers" admin="admins"/>
          <authorizationEntry topic=">" read="consumers" write="publishers" admin="admins"/>
        </authorizationEntries>
      </authorizationMap>
    </map>
  </authorizationPlugin>
</plugins>

Destination Policies

 
xml
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
        <deadLetterStrategy>
          <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
        </deadLetterStrategy>
      </policyEntry>
      <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
        <pendingSubscriberPolicy>
          <vmCursor/>
        </pendingSubscriberPolicy>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

Comparing Deployment Architectures

ArchitectureDescriptionBest ForConsiderations
Standalone BrokerSingle broker instanceSmall applications, developmentSingle point of failure
Master-SlaveHot standby replicaProduction with HA needsFailover support needed in clients
Network of BrokersMultiple connected brokersDistributed systems, high throughputComplex configuration, network overhead
Broker ClusterMultiple brokers acting as oneLoad balancing, scalabilityClient connection routing
Embedded BrokerBroker within applicationSelf-contained applicationsLimited management features
Docker/KubernetesContainerized deploymentCloud-native applicationsResource allocation, persistence challenges

Common Messaging Patterns with ActiveMQ

Point-to-Point (Queue)

 
java
// Producer
Queue queue = session.createQueue("ORDERS.QUEUE");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Order #12345");
producer.send(message);

// Consumer
MessageConsumer consumer = session.createConsumer(queue);
TextMessage received = (TextMessage) consumer.receive(); // blocks until message arrives

Publish/Subscribe (Topic)

 
java
// Publisher
Topic topic = session.createTopic("PRICE.UPDATES");
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage("AAPL:150.75");
producer.send(message);

// Subscriber
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
    try {
        System.out.println("Received: " + ((TextMessage)message).getText());
    } catch (JMSException e) {
        e.printStackTrace();
    }
});

Request/Reply

 
java
// Requestor
Queue requestQueue = session.createQueue("REQUESTS");
Queue replyQueue = session.createTemporaryQueue();

MessageProducer producer = session.createProducer(requestQueue);
MessageConsumer consumer = session.createConsumer(replyQueue);

TextMessage request = session.createTextMessage("GetQuote:IBM");
request.setJMSReplyTo(replyQueue);
String correlationId = UUID.randomUUID().toString();
request.setJMSCorrelationID(correlationId);
producer.send(request);

// Responder
MessageConsumer requestConsumer = session.createConsumer(requestQueue);
requestConsumer.setMessageListener(requestMessage -> {
    try {
        TextMessage txtMsg = (TextMessage) requestMessage;
        Destination replyDestination = txtMsg.getJMSReplyTo();
        String correlationId = txtMsg.getJMSCorrelationID();
        
        MessageProducer replyProducer = session.createProducer(replyDestination);
        TextMessage replyMessage = session.createTextMessage("IBM:145.80");
        replyMessage.setJMSCorrelationID(correlationId);
        replyProducer.send(replyMessage);
    } catch (JMSException e) {
        e.printStackTrace();
    }
});

Durable Subscription

 
java
// Creating durable subscription
connection.setClientID("ClientXYZ");
Topic topic = session.createTopic("NEWS.UPDATES");
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubscriptionName");

// Later reconnecting to the same subscription
MessageConsumer reconnectedConsumer = session.createDurableSubscriber(topic, "SubscriptionName");

// Removing subscription when no longer needed
session.unsubscribe("SubscriptionName");

Message Grouping

 
java
// Producer with grouping
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Order item for customer 1001");
message.setStringProperty("JMSXGroupID", "Customer-1001");
producer.send(message);

Common Challenges and Solutions

Challenge: Message Loss

Solutions:

  • Use persistent delivery mode: producer.setDeliveryMode(DeliveryMode.PERSISTENT)
  • Configure proper acknowledgment mode: Session.CLIENT_ACKNOWLEDGE and call message.acknowledge()
  • Implement dead letter queues: Configure <deadLetterStrategy> in policy entries
  • Use transactions: session = connection.createSession(true, Session.SESSION_TRANSACTED) and session.commit()

Challenge: Performance Bottlenecks

Solutions:

  • Use non-persistent delivery for non-critical messages: DeliveryMode.NON_PERSISTENT
  • Implement producer flow control: <policyEntry producerFlowControl="true"
  • Tune prefetch values: consumer.setPrefetchSize(10)
  • Batch messages using transactions
  • Configure appropriate memory settings
  • Use async consumers: consumer.setMessageListener(message -> { ... })

Challenge: High Memory Usage

Solutions:

  • Configure memory limits: <memoryUsage limit="64 mb"/>
  • Implement producer flow control
  • Set appropriate destination policies
  • Use cursor-based message stores for topics: <pendingSubscriberPolicy><vmCursor/></pendingSubscriberPolicy>
  • Implement message expiration: producer.setTimeToLive(60000) (60 seconds)

Challenge: Network of Brokers Issues

Solutions:

  • Avoid duplicate message delivery: <networkConnector duplex="false" decreaseNetworkConsumerPriority="true">
  • Control message flow: <networkConnector prefetchSize="1">
  • Prevent cycles: <networkConnector conduitSubscriptions="false">
  • Filter messages: <networkConnector dynamicallyIncludedDestinations="queue.A,topic.B">

Challenge: Slow Consumers

Solutions:

  • Implement expiry policies: <policyEntry expireMessagesPeriod="10000">
  • Use cursor-based pending message stores: <fileCursor/>
  • Configure advisor notifications: <policyEntry advisoryForSlowConsumers="true">
  • Increase consumer threads in connection pool

Monitoring and Management

JMX Monitoring

 
// Enable JMX in activemq.xml
<broker useJmx="true">
  <managementContext>
    <managementContext createConnector="true" connectorPort="1099"/>
  </managementContext>
</broker>

// Connect with JConsole: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi

Key Metrics to Monitor

  • Queue Depth: Number of pending messages
  • Producer/Consumer Count: Active connection counts
  • Memory Usage: Percentage of memory limit used
  • Store Usage: Disk space used for persistence
  • Connection Count: Total active connections
  • Enqueue/Dequeue Rates: Message throughput
  • Expired/DLQ Message Count: Failed message delivery indicators

Command Line Management

 
bash
# List queues
activemq query -QQueue=*

# View queue statistics
activemq browse ORDERS.QUEUE

# Purge queue
activemq purge ORDERS.QUEUE

# Send message
activemq producer --message "Test message" --destination queue://TEST.QUEUE

# Consume messages
activemq consumer --destination queue://TEST.QUEUE

Advanced Features and Configurations

Message Redelivery Policy

 
java
// Client-side configuration
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000); // 1 second
policy.setBackOffMultiplier(2.0); // exponential backoff
policy.setMaximumRedeliveries(6); // try 6 times before DLQ
policy.setUseExponentialBackOff(true);
factory.setRedeliveryPolicy(policy);

Message Selectors

 
java
// Consumer with selector
String selector = "JMSPriority > 4 AND region = 'EMEA'";
MessageConsumer consumer = session.createConsumer(destination, selector);

// Setting filterable properties
TextMessage message = session.createTextMessage("High priority order");
message.setIntProperty("JMSPriority", 7);
message.setStringProperty("region", "EMEA");
producer.send(message);

Composite Destinations

 
xml
<!-- In activemq.xml -->
<compositeTopic name="COMPOSITE.TOPIC" forwardOnly="false">
  <forwardTo>
    <queue physicalName="QUEUE.A"/>
    <topic physicalName="TOPIC.B"/>
    <topic physicalName="TOPIC.C"/>
  </forwardTo>
</compositeTopic>

Virtual Topics

 
xml
<!-- Producer sends to topic -->
<topic physicalName="VirtualTopic.Orders"/>

<!-- Consumers read from queue with consumer-specific name -->
<queue physicalName="Consumer.App1.VirtualTopic.Orders"/>
<queue physicalName="Consumer.App2.VirtualTopic.Orders"/>

Master-Slave Configuration (Shared File System)

 
xml
<!-- Master and Slave use the same data directory -->
<persistenceAdapter>
  <kahaDB directory="/shared/activemq/data"/>
</persistenceAdapter>

Resources for Further Learning

Official Documentation

  • Apache ActiveMQ User Manual
  • ActiveMQ in Action (book)
  • ActiveMQ GitHub Repository
  • ActiveMQ Mailing Lists

Monitoring Tools

  • Hawtio Web Console
  • ActiveMQ Web Console
  • JConsole with JMX
  • Commercial tools (Dynatrace, AppDynamics, etc.)

Integration Examples

  • Spring JMS integration
  • Camel-ActiveMQ integration
  • Microservice patterns with ActiveMQ
  • Cloud deployment scenarios

Remember: While ActiveMQ is powerful and flexible, the best practice is to start with simple configurations and add complexity only as needed. Always test thoroughly under expected production loads before deploying to production.

Scroll to Top