Introduction to Apache Camel
Apache Camel is a versatile open-source integration framework that implements Enterprise Integration Patterns (EIPs). It enables you to define routing and mediation rules in various domain-specific languages (DSL), connecting different systems with minimal coding. This cheatsheet provides a comprehensive reference for Apache Camel concepts, components, patterns, and implementation strategies.
Core Concepts
Basic Architecture Components
- Route: Basic building block defining a pipeline of processors
- Endpoint: Connection point to external systems (URI format)
- Processor: Component that transforms or processes messages
- Exchange: Container for messages being processed
- Message: Data being transferred with headers, body, and attachments
- Context: Runtime system containing routes and components
Message Flow and Structure
| Component | Description | Example |
|---|---|---|
| Exchange | Container holding In/Out messages | Exchange exchange = ... |
| Message | Contains headers, body, attachments | Message message = exchange.getIn() |
| Headers | Metadata as key-value pairs | message.getHeader("ContentType") |
| Body | Actual payload/data | message.getBody(String.class) |
| Properties | Exchange-level properties | exchange.getProperty("propertyName") |
Domain-Specific Languages (DSLs)
Java DSL
from("file:inbox")
.filter(xpath("/person/city = 'London'"))
.to("jms:queue:uk")
.otherwise()
.to("jms:queue:others");
XML DSL (Spring/Blueprint)
<route>
<from uri="file:inbox"/>
<choice>
<when>
<xpath>/person/city = 'London'</xpath>
<to uri="jms:queue:uk"/>
</when>
<otherwise>
<to uri="jms:queue:others"/>
</otherwise>
</choice>
</route>
YAML DSL
- route:
from:
uri: "file:inbox"
steps:
- choice:
when:
- xpath: "/person/city = 'London'"
steps:
- to: "jms:queue:uk"
otherwise:
steps:
- to: "jms:queue:others"
Kotlin DSL
from("file:inbox")
.choice()
.`when`(xpath("/person/city = 'London'"))
.to("jms:queue:uk")
.otherwise()
.to("jms:queue:others")
.end()
Common Components
File Component
// Read files
from("file:data/inbox?delete=true")
.to("jms:queue:incomingOrders");
// Write files
from("jms:queue:outgoingOrders")
.to("file:data/outbox?fileName=${header.orderId}.xml");
HTTP Component
// HTTP GET request
from("direct:start")
.to("http://api.example.com/data?bridgeEndpoint=true");
// HTTP POST with headers
from("direct:start")
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
.setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
.to("http://api.example.com/create");
JMS Component
// Read from queue
from("jms:queue:orders")
.to("bean:orderProcessor");
// Publish to topic
from("direct:publishNews")
.to("jms:topic:news");
// Request-reply pattern
from("direct:request")
.to("jms:queue:request?replyTo=response&requestTimeout=10000")
.log("Got response: ${body}");
REST Component
// Define REST service
rest("/api")
.get("/users")
.to("direct:getUsers")
.post("/users")
.type(User.class)
.to("direct:createUser");
// Consume REST service
from("direct:start")
.to("rest:get:users?host=api.example.com");
Database Component
// SQL query
from("direct:getUsers")
.setBody(constant("SELECT * FROM users"))
.to("jdbc:dataSource");
// SQL with parameters
from("direct:getUserById")
.setBody(simple("SELECT * FROM users WHERE id = ${header.id}"))
.to("jdbc:dataSource");
Timer Component
// Simple timer
from("timer:tick?period=5000")
.log("Executed every 5 seconds");
// Cron expression
from("quartz:myTimer?cron=0 0 12 * * ?")
.log("Executed at noon every day");
Enterprise Integration Patterns (EIPs)
Message Transformation
Transform
// Using processor
from("direct:start")
.transform(exchange -> {
String body = exchange.getIn().getBody(String.class);
return body.toUpperCase();
});
// Using expression
from("direct:start")
.transform().simple("Hello, ${body}!");
Content Enricher
from("direct:start")
.enrich("direct:getAdditionalData", (original, additional) -> {
original.getIn().setHeader("enriched", additional.getIn().getBody());
return original;
});
Message Translator
from("direct:start")
.marshal().json()
.to("jms:queue:jsonOrders");
from("jms:queue:jsonOrders")
.unmarshal().json(JsonLibrary.Jackson, Order.class)
.to("bean:orderProcessor");
Message Routing
Content-Based Router
from("direct:start")
.choice()
.when(header("country").isEqualTo("US"))
.to("direct:us")
.when(header("country").isEqualTo("UK"))
.to("direct:uk")
.otherwise()
.to("direct:other")
.end();
Recipient List
from("direct:start")
.recipientList(simple("direct:${header.region}"));
// Or with multiple destinations
from("direct:start")
.setHeader("destinations", constant("jms:queue:A,jms:queue:B,jms:queue:C"))
.recipientList(header("destinations"));
Dynamic Router
from("direct:start")
.dynamicRouter(method(MyRouter.class, "determineDestination"));
// In MyRouter class
public String determineDestination(Exchange exchange) {
String nextDestination = // logic to determine next endpoint
// Return null to end routing
return nextDestination;
}
Splitter
// Split by expression
from("direct:start")
.split(body().tokenize(","))
.to("direct:processItem");
// Split XML with streaming
from("file:inbox")
.split(xpath("//order").streaming())
.to("direct:processOrder");
Aggregator
from("direct:start")
.aggregate(header("correlationId"), new GroupedExchangeAggregationStrategy())
.completionSize(5)
.completionTimeout(60000)
.to("direct:processGroup");
// Custom aggregation strategy
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "," + newBody);
return oldExchange;
}
}
Message Construction
Multicast
from("direct:start")
.multicast()
.to("direct:endpoint1", "direct:endpoint2", "direct:endpoint3");
// With parallelProcessing and aggregation
from("direct:start")
.multicast()
.parallelProcessing()
.aggregationStrategy(new MyAggregationStrategy())
.to("direct:endpoint1", "direct:endpoint2");
Composed Message Processor
from("direct:start")
.split(body())
.to("direct:validateItem")
.aggregate(header("orderId"), new MyAggregationStrategy())
.completionSize(5)
.to("direct:processCompletedOrder");
Scatter-Gather
from("direct:start")
.multicast()
.aggregationStrategy(new MyAggregationStrategy())
.parallelProcessing()
.timeout(5000)
.to("direct:supplier1", "direct:supplier2", "direct:supplier3")
.end()
.to("direct:selectBestOffer");
System Management
Wire Tap
from("direct:start")
.wireTap("direct:audit")
.to("direct:processOrder");
Dead Letter Channel
// Global configuration
errorHandler(deadLetterChannel("jms:queue:dead")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.backOffMultiplier(2)
.useExponentialBackOff());
// Route-specific configuration
from("direct:start")
.errorHandler(deadLetterChannel("direct:error")
.maximumRedeliveries(3))
.to("direct:process");
Circuit Breaker
from("direct:start")
.circuitBreaker()
.resilience4jConfiguration()
.slidingWindowSize(10)
.failureRateThreshold(50)
.waitDurationInOpenState(5000)
.end()
.to("http://might-fail-service")
.onFallback()
.transform().constant("Fallback response")
.end()
.to("direct:process");
Testing Strategies
Unit Testing with CamelTestSupport
public class MyRouteTest extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
return new MyRouteBuilder();
}
@Test
public void testRoute() {
String response = template.requestBody("direct:start", "Hello", String.class);
assertEquals("HELLO", response);
}
@Test
public void testWithMock() {
getMockEndpoint("mock:result").expectedMessageCount(1);
getMockEndpoint("mock:result").expectedBodiesReceived("HELLO");
template.sendBody("direct:start", "Hello");
assertMockEndpointsSatisfied();
}
}
AdviceWith for Route Modification in Tests
@Test
public void testWithAdvice() throws Exception {
AdviceWith.adviceWith(context, "myRouteId", advice -> {
advice.mockEndpointsAndSkip("direct:external");
advice.weaveByToUri("direct:someEndpoint")
.replace().to("mock:replaced");
});
getMockEndpoint("mock:replaced").expectedMessageCount(1);
template.sendBody("direct:start", "test");
assertMockEndpointsSatisfied();
}
Best Practices & Patterns
Error Handling
// Global error handler
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.backOffMultiplier(2)
.retryAttemptedLogLevel(LoggingLevel.WARN));
// Exception policies
onException(IOException.class)
.maximumRedeliveries(5)
.handled(true)
.to("direct:ioErrorHandler");
onException(ValidationException.class)
.handled(false)
.to("direct:validationErrorHandler");
Transaction Management
// JMS transacted
from("jms:queue:input?transacted=true")
.transacted()
.to("bean:processOrder")
.to("jms:queue:output");
// Spring transaction manager
from("jms:queue:input")
.transacted("PROPAGATION_REQUIRED")
.to("sql:insert into orders values (${body.id}, ${body.name})")
.to("jms:queue:output");
Performance Optimization
// Parallel processing
from("direct:start")
.split(body()).parallelProcessing().executorService(myThreadPool)
.to("direct:heavyProcessing");
// Stream processing for large files
from("file:inbox")
.split(body().tokenize("\n")).streaming()
.to("direct:processLine");
// Throttling
from("direct:start")
.throttle(100).timePeriodMillis(1000)
.to("direct:throttledEndpoint");
Monitoring and Management
// JMX enabled by default
CamelContext context = new DefaultCamelContext();
context.getManagementStrategy().getManagementAgent().setRegistryPort(1099);
// Adding event notifiers
context.getManagementStrategy().addEventNotifier(new MyEventNotifier());
// Health checks
context.getRegistry().bind("myHealthCheck", new MyHealthCheck());
Configuration Examples
Spring Boot Configuration
application.properties
# Camel core
camel.springboot.name=MyCamelApp
camel.springboot.main-run-controller=true
# Component configurations
camel.component.kafka.brokers=kafka1:9092,kafka2:9092
camel.component.jms.connection-factory=#jmsConnectionFactory
# Stream caching
camel.springboot.stream-caching-enabled=true
camel.springboot.stream-caching-spool-directory=./data/camel/cache
Spring Boot Starter
@SpringBootApplication
@ComponentScan(basePackages = "com.example")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
class MyRoutes extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:hello?period={{timer.period}}")
.log("Hello World");
}
}
Camel Main Configuration
public static void main(String[] args) throws Exception {
Main main = new Main();
main.configure().addRoutesBuilder(new MyRouteBuilder());
main.configure().addPropertyPlaceholder()
.setLocation("classpath:application.properties");
// Configure components
main.bind("myDataSource", createDataSource());
main.configure().addConfiguration(JmsComponent.class.getName())
.setParameter("connectionFactory", createJmsConnectionFactory());
main.run();
}
Advanced Patterns
Content-Based EIP Implementation
// Content-Based Router
from("direct:start")
.choice()
.when(xpath("/order/type = 'A'"))
.to("direct:typeA")
.when(xpath("/order/type = 'B'"))
.to("direct:typeB")
.otherwise()
.to("direct:default")
.end();
// Message Filter
from("direct:start")
.filter(header("valid").isEqualTo(true))
.to("direct:validOrders");
// Dynamic Router
from("direct:start")
.setHeader("slip", method(MyDynamicRouter.class, "computeSlip"))
.routingSlip(header("slip"));
Integration with External Systems
// REST to JMS integration
rest("/api")
.post("/orders")
.consumes("application/json")
.type(Order.class)
.to("direct:processOrder");
from("direct:processOrder")
.log("Received order: ${body}")
.marshal().json(JsonLibrary.Jackson)
.to("jms:queue:orders");
// File to Database integration
from("file:inbox?include=.*\\.csv&move=../processed/${date:now:yyyyMMdd}")
.unmarshal().csv()
.split(body())
.process(new RecordToSqlProcessor())
.to("jdbc:dataSource");
Resources for Further Learning
Official Documentation
- Apache Camel User Manual: https://camel.apache.org/manual/
- Component Reference: https://camel.apache.org/components/
- EIP Patterns: https://camel.apache.org/components/latest/eips/enterprise-integration-patterns.html
Books
- “Camel in Action” by Claus Ibsen and Jonathan Anstey
- “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf
Community Resources
- Camel GitHub: https://github.com/apache/camel
- Camel Mailing Lists: https://camel.apache.org/community/mailing-list/
- Stack Overflow: https://stackoverflow.com/questions/tagged/apache-camel
This comprehensive cheatsheet provides a quick reference for Apache Camel developers, covering essential concepts, components, patterns, and implementation strategies. Use it as a starting point for building robust integration solutions with Apache Camel.
