The Ultimate Apache Camel Cheatsheet: Integration Patterns & Development Guide

Introduction to Apache Camel

Apache Camel is a versatile open-source integration framework that implements Enterprise Integration Patterns (EIPs). It enables you to define routing and mediation rules in various domain-specific languages (DSL), connecting different systems with minimal coding. This cheatsheet provides a comprehensive reference for Apache Camel concepts, components, patterns, and implementation strategies.

Core Concepts

Basic Architecture Components

  • Route: Basic building block defining a pipeline of processors
  • Endpoint: Connection point to external systems (URI format)
  • Processor: Component that transforms or processes messages
  • Exchange: Container for messages being processed
  • Message: Data being transferred with headers, body, and attachments
  • Context: Runtime system containing routes and components

Message Flow and Structure

ComponentDescriptionExample
ExchangeContainer holding In/Out messagesExchange exchange = ...
MessageContains headers, body, attachmentsMessage message = exchange.getIn()
HeadersMetadata as key-value pairsmessage.getHeader("ContentType")
BodyActual payload/datamessage.getBody(String.class)
PropertiesExchange-level propertiesexchange.getProperty("propertyName")

Domain-Specific Languages (DSLs)

Java DSL

from("file:inbox")
    .filter(xpath("/person/city = 'London'"))
    .to("jms:queue:uk")
    .otherwise()
    .to("jms:queue:others");

XML DSL (Spring/Blueprint)

<route>
  <from uri="file:inbox"/>
  <choice>
    <when>
      <xpath>/person/city = 'London'</xpath>
      <to uri="jms:queue:uk"/>
    </when>
    <otherwise>
      <to uri="jms:queue:others"/>
    </otherwise>
  </choice>
</route>

YAML DSL

- route:
    from:
      uri: "file:inbox"
    steps:
      - choice:
          when:
            - xpath: "/person/city = 'London'"
              steps:
                - to: "jms:queue:uk"
          otherwise:
            steps:
              - to: "jms:queue:others"

Kotlin DSL

from("file:inbox")
  .choice()
    .`when`(xpath("/person/city = 'London'"))
      .to("jms:queue:uk")
    .otherwise()
      .to("jms:queue:others")
  .end()

Common Components

File Component

// Read files
from("file:data/inbox?delete=true")
    .to("jms:queue:incomingOrders");

// Write files
from("jms:queue:outgoingOrders")
    .to("file:data/outbox?fileName=${header.orderId}.xml");

HTTP Component

// HTTP GET request
from("direct:start")
    .to("http://api.example.com/data?bridgeEndpoint=true");

// HTTP POST with headers
from("direct:start")
    .setHeader(Exchange.HTTP_METHOD, constant("POST"))
    .setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
    .to("http://api.example.com/create");

JMS Component

// Read from queue
from("jms:queue:orders")
    .to("bean:orderProcessor");

// Publish to topic
from("direct:publishNews")
    .to("jms:topic:news");

// Request-reply pattern
from("direct:request")
    .to("jms:queue:request?replyTo=response&requestTimeout=10000")
    .log("Got response: ${body}");

REST Component

// Define REST service
rest("/api")
    .get("/users")
        .to("direct:getUsers")
    .post("/users")
        .type(User.class)
        .to("direct:createUser");

// Consume REST service
from("direct:start")
    .to("rest:get:users?host=api.example.com");

Database Component

// SQL query
from("direct:getUsers")
    .setBody(constant("SELECT * FROM users"))
    .to("jdbc:dataSource");

// SQL with parameters
from("direct:getUserById")
    .setBody(simple("SELECT * FROM users WHERE id = ${header.id}"))
    .to("jdbc:dataSource");

Timer Component

// Simple timer
from("timer:tick?period=5000")
    .log("Executed every 5 seconds");

// Cron expression
from("quartz:myTimer?cron=0 0 12 * * ?")
    .log("Executed at noon every day");

Enterprise Integration Patterns (EIPs)

Message Transformation

Transform

// Using processor
from("direct:start")
    .transform(exchange -> {
        String body = exchange.getIn().getBody(String.class);
        return body.toUpperCase();
    });

// Using expression
from("direct:start")
    .transform().simple("Hello, ${body}!");

Content Enricher

from("direct:start")
    .enrich("direct:getAdditionalData", (original, additional) -> {
        original.getIn().setHeader("enriched", additional.getIn().getBody());
        return original;
    });

Message Translator

from("direct:start")
    .marshal().json()
    .to("jms:queue:jsonOrders");

from("jms:queue:jsonOrders")
    .unmarshal().json(JsonLibrary.Jackson, Order.class)
    .to("bean:orderProcessor");

Message Routing

Content-Based Router

from("direct:start")
    .choice()
        .when(header("country").isEqualTo("US"))
            .to("direct:us")
        .when(header("country").isEqualTo("UK"))
            .to("direct:uk")
        .otherwise()
            .to("direct:other")
    .end();

Recipient List

from("direct:start")
    .recipientList(simple("direct:${header.region}"));

// Or with multiple destinations
from("direct:start")
    .setHeader("destinations", constant("jms:queue:A,jms:queue:B,jms:queue:C"))
    .recipientList(header("destinations"));

Dynamic Router

from("direct:start")
    .dynamicRouter(method(MyRouter.class, "determineDestination"));

// In MyRouter class
public String determineDestination(Exchange exchange) {
    String nextDestination = // logic to determine next endpoint
    // Return null to end routing
    return nextDestination;
}

Splitter

// Split by expression
from("direct:start")
    .split(body().tokenize(","))
    .to("direct:processItem");

// Split XML with streaming
from("file:inbox")
    .split(xpath("//order").streaming())
    .to("direct:processOrder");

Aggregator

from("direct:start")
    .aggregate(header("correlationId"), new GroupedExchangeAggregationStrategy())
    .completionSize(5)
    .completionTimeout(60000)
    .to("direct:processGroup");

// Custom aggregation strategy
public class MyAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldBody + "," + newBody);
        return oldExchange;
    }
}

Message Construction

Multicast

from("direct:start")
    .multicast()
    .to("direct:endpoint1", "direct:endpoint2", "direct:endpoint3");

// With parallelProcessing and aggregation
from("direct:start")
    .multicast()
    .parallelProcessing()
    .aggregationStrategy(new MyAggregationStrategy())
    .to("direct:endpoint1", "direct:endpoint2");

Composed Message Processor

from("direct:start")
    .split(body())
    .to("direct:validateItem")
    .aggregate(header("orderId"), new MyAggregationStrategy())
    .completionSize(5)
    .to("direct:processCompletedOrder");

Scatter-Gather

from("direct:start")
    .multicast()
    .aggregationStrategy(new MyAggregationStrategy())
    .parallelProcessing()
    .timeout(5000)
    .to("direct:supplier1", "direct:supplier2", "direct:supplier3")
    .end()
    .to("direct:selectBestOffer");

System Management

Wire Tap

from("direct:start")
    .wireTap("direct:audit")
    .to("direct:processOrder");

Dead Letter Channel

// Global configuration
errorHandler(deadLetterChannel("jms:queue:dead")
    .maximumRedeliveries(3)
    .redeliveryDelay(1000)
    .backOffMultiplier(2)
    .useExponentialBackOff());

// Route-specific configuration
from("direct:start")
    .errorHandler(deadLetterChannel("direct:error")
        .maximumRedeliveries(3))
    .to("direct:process");

Circuit Breaker

from("direct:start")
    .circuitBreaker()
        .resilience4jConfiguration()
            .slidingWindowSize(10)
            .failureRateThreshold(50)
            .waitDurationInOpenState(5000)
        .end()
        .to("http://might-fail-service")
    .onFallback()
        .transform().constant("Fallback response")
    .end()
    .to("direct:process");

Testing Strategies

Unit Testing with CamelTestSupport

public class MyRouteTest extends CamelTestSupport {
    
    @Override
    protected RouteBuilder createRouteBuilder() {
        return new MyRouteBuilder();
    }
    
    @Test
    public void testRoute() {
        String response = template.requestBody("direct:start", "Hello", String.class);
        assertEquals("HELLO", response);
    }
    
    @Test
    public void testWithMock() {
        getMockEndpoint("mock:result").expectedMessageCount(1);
        getMockEndpoint("mock:result").expectedBodiesReceived("HELLO");
        
        template.sendBody("direct:start", "Hello");
        
        assertMockEndpointsSatisfied();
    }
}

AdviceWith for Route Modification in Tests

@Test
public void testWithAdvice() throws Exception {
    AdviceWith.adviceWith(context, "myRouteId", advice -> {
        advice.mockEndpointsAndSkip("direct:external");
        
        advice.weaveByToUri("direct:someEndpoint")
            .replace().to("mock:replaced");
    });
    
    getMockEndpoint("mock:replaced").expectedMessageCount(1);
    
    template.sendBody("direct:start", "test");
    
    assertMockEndpointsSatisfied();
}

Best Practices & Patterns

Error Handling

// Global error handler
errorHandler(defaultErrorHandler()
    .maximumRedeliveries(3)
    .redeliveryDelay(1000)
    .backOffMultiplier(2)
    .retryAttemptedLogLevel(LoggingLevel.WARN));

// Exception policies
onException(IOException.class)
    .maximumRedeliveries(5)
    .handled(true)
    .to("direct:ioErrorHandler");

onException(ValidationException.class)
    .handled(false)
    .to("direct:validationErrorHandler");

Transaction Management

// JMS transacted
from("jms:queue:input?transacted=true")
    .transacted()
    .to("bean:processOrder")
    .to("jms:queue:output");

// Spring transaction manager
from("jms:queue:input")
    .transacted("PROPAGATION_REQUIRED")
    .to("sql:insert into orders values (${body.id}, ${body.name})")
    .to("jms:queue:output");

Performance Optimization

// Parallel processing
from("direct:start")
    .split(body()).parallelProcessing().executorService(myThreadPool)
    .to("direct:heavyProcessing");

// Stream processing for large files
from("file:inbox")
    .split(body().tokenize("\n")).streaming()
    .to("direct:processLine");

// Throttling
from("direct:start")
    .throttle(100).timePeriodMillis(1000)
    .to("direct:throttledEndpoint");

Monitoring and Management

// JMX enabled by default
CamelContext context = new DefaultCamelContext();
context.getManagementStrategy().getManagementAgent().setRegistryPort(1099);

// Adding event notifiers
context.getManagementStrategy().addEventNotifier(new MyEventNotifier());

// Health checks
context.getRegistry().bind("myHealthCheck", new MyHealthCheck());

Configuration Examples

Spring Boot Configuration

application.properties

# Camel core
camel.springboot.name=MyCamelApp
camel.springboot.main-run-controller=true

# Component configurations
camel.component.kafka.brokers=kafka1:9092,kafka2:9092
camel.component.jms.connection-factory=#jmsConnectionFactory

# Stream caching
camel.springboot.stream-caching-enabled=true
camel.springboot.stream-caching-spool-directory=./data/camel/cache

Spring Boot Starter

@SpringBootApplication
@ComponentScan(basePackages = "com.example")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@Component
class MyRoutes extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("timer:hello?period={{timer.period}}")
            .log("Hello World");
    }
}

Camel Main Configuration

public static void main(String[] args) throws Exception {
    Main main = new Main();
    main.configure().addRoutesBuilder(new MyRouteBuilder());
    main.configure().addPropertyPlaceholder()
        .setLocation("classpath:application.properties");
    
    // Configure components
    main.bind("myDataSource", createDataSource());
    main.configure().addConfiguration(JmsComponent.class.getName())
        .setParameter("connectionFactory", createJmsConnectionFactory());
    
    main.run();
}

Advanced Patterns

Content-Based EIP Implementation

// Content-Based Router
from("direct:start")
    .choice()
        .when(xpath("/order/type = 'A'"))
            .to("direct:typeA")
        .when(xpath("/order/type = 'B'"))
            .to("direct:typeB")
        .otherwise()
            .to("direct:default")
    .end();

// Message Filter
from("direct:start")
    .filter(header("valid").isEqualTo(true))
    .to("direct:validOrders");

// Dynamic Router
from("direct:start")
    .setHeader("slip", method(MyDynamicRouter.class, "computeSlip"))
    .routingSlip(header("slip"));

Integration with External Systems

// REST to JMS integration
rest("/api")
    .post("/orders")
        .consumes("application/json")
        .type(Order.class)
        .to("direct:processOrder");

from("direct:processOrder")
    .log("Received order: ${body}")
    .marshal().json(JsonLibrary.Jackson)
    .to("jms:queue:orders");

// File to Database integration
from("file:inbox?include=.*\\.csv&move=../processed/${date:now:yyyyMMdd}")
    .unmarshal().csv()
    .split(body())
    .process(new RecordToSqlProcessor())
    .to("jdbc:dataSource");

Resources for Further Learning

Official Documentation

Books

  • “Camel in Action” by Claus Ibsen and Jonathan Anstey
  • “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf

Community Resources

This comprehensive cheatsheet provides a quick reference for Apache Camel developers, covering essential concepts, components, patterns, and implementation strategies. Use it as a starting point for building robust integration solutions with Apache Camel.

Scroll to Top