By kswaughs | Friday, May 13, 2016

Camel Splitter Aggregator Example

What is a Splitter Aggregator Pattern ?

A Message is divided into small independent pieces and processed individually and then grouped all pieces and processed together.

In this example, We will create an order with two different types of items and will see how these items are processed individually to get price of each item and calculating the total price of all items of this order using aggregation strategy. We will see how the following business components involved in developing this application.

1. Item Processor component

2. Order Items Aggregator

3. Java DSL Router

4. Camel context Configuration

5. Data models

6. Test the Application 

Step 1 : Item Processor component

ItemSvc.java
package com.kswaughs.split;

import com.kswaughs.split.beans.Item;

public class ItemSvc {
    
    public Item processBook(Item item) throws InterruptedException {
        
        System.out.println("handle book Item:" +item);
        item.setPrice(30);
        
        System.out.println("book Item processed");
        
        return item;
    }

    public Item processPhone(Item item) throws InterruptedException {
        
        System.out.println("handle phone Item:" +item);
        item.setPrice(500);
        
        System.out.println("phone Item processed");
        
        return item;
    }
}

Step 2 : Order Items Aggregator

OrderItemStrategy.java
package com.kswaughs.split;

import java.util.ArrayList;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

import com.kswaughs.split.beans.Item;
import com.kswaughs.split.beans.Order;

public class OrderItemStrategy implements AggregationStrategy {
    
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        
           if (oldExchange == null) {
               
               Item newItem= newExchange.getIn().getBody(Item.class);
               System.out.println("Aggregate first item: " + newItem);
               
               Order currentOrder = new Order();
               currentOrder.setId("ORD"+System.currentTimeMillis());
               List<Item> currentItems = new ArrayList<Item>();
   
               currentItems.add(newItem);
               currentOrder.setItems(currentItems);
               currentOrder.setTotalPrice(newItem.getPrice());
               
               newExchange.getIn().setBody(currentOrder);
               
                // the first time we aggregate we only have the new exchange,
                // so we just return it
                return newExchange;
            }
           
            Order order = oldExchange.getIn().getBody(Order.class);
            Item newItem= newExchange.getIn().getBody(Item.class);
     
            System.out.println("Aggregate old items: " + order);
            System.out.println("Aggregate new item: " + newItem);
            
            order.getItems().add(newItem);
           
            double totalPrice = order.getTotalPrice() + newItem.getPrice();
            order.setTotalPrice(totalPrice);

            // return old as this is the one that has all the orders gathered until now
            return oldExchange;
    }

}

Step 3 : Java DSL Router

OrderRouter.java
package com.kswaughs.split;

import org.apache.camel.builder.RouteBuilder;

public class OrderRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        
        from("direct:processOrder")
            .split(body().method("getItems"), new OrderItemStrategy())
            // each splitted message is send to this bean to process it
            .to("direct:processItem")
         .end();
    
        
        from("direct:processItem")
            .choice()
                .when(body().method("getType").isEqualTo("Book"))
                    .to("bean:itemService?method=processBook").
                otherwise()
                    .to("bean:itemService?method=processPhone");
    }

}

Step 4 : Camel context Configuration

camel-context.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:cxf="http://camel.apache.org/schema/cxf"
    xmlns:jaxrs="http://cxf.apache.org/jaxrs" 
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/cxf
        http://camel.apache.org/schema/cxf/camel-cxf.xsd
        http://cxf.apache.org/jaxrs
        http://cxf.apache.org/schemas/jaxrs.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context.xsd">


    <bean id="orderRouter" class="com.kswaughs.split.OrderRouter" />
    <bean id="itemService" class="com.kswaughs.split.ItemSvc" />
    <camelContext id="orderCtx" xmlns="http://camel.apache.org/schema/spring">
        <routeBuilder ref="orderRouter" />
    </camelContext>
    
</beans>

Step 5 : Data models

Data model of Item

Item.java
package com.kswaughs.split.beans;

public class Item {

    public Item(String id, String name, String type) {
        this.id = id;
        this.name = name;
        this.type = type;
    }

    private String id;

    private String name;
    
    private String type;

    private double price;

    public String getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
    
    public String getType() {
        return type;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Item [id=");
        builder.append(id);
        builder.append(", name=");
        builder.append(name);
        builder.append(", type=");
        builder.append(type);
        builder.append(", price=");
        builder.append(price);
        builder.append("]");
        return builder.toString();
    }
    
}

Data model of Order

Order.java
package com.kswaughs.split.beans;

import java.util.List;

public class Order {
    
    private String id;
    
    private List items;
    
    private double totalPrice;
        
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public List getItems() {
        return items;
    }

    public void setItems(List items) {
        this.items = items;
    }    

    public double getTotalPrice() {
        return totalPrice;
    }

    public void setTotalPrice(double totalPrice) {
        this.totalPrice = totalPrice;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Order [id=");
        builder.append(id);
        builder.append(", items=");
        builder.append(items);
        builder.append(", totalPrice=");
        builder.append(totalPrice);
        builder.append("]");
        return builder.toString();
    }

}

Step 6 : Start the camel context and test your application

OrderApp.java
package com.kswaughs.split;

import java.util.ArrayList;
import java.util.List;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.kswaughs.split.beans.Item;
import com.kswaughs.split.beans.Order;

public class OrderApp {
    
    public static void main(String[] args) {
        
    
        try {
            ApplicationContext springCtx = new ClassPathXmlApplicationContext(
                    "camel-context.xml");

            CamelContext context = springCtx.getBean("orderCtx",
                    CamelContext.class);
            
            context.start();
                
            ProducerTemplate producerTemplate = context.createProducerTemplate();
            
            List<Item> items = new ArrayList<Item>();
            items.add(new Item("1", "Camel in Action book", "Book"));
            items.add(new Item("2", "Apple IPhone8", "Phone"));
            
            Order myOrder = new Order();
            myOrder.setItems(items);
                        
            Order respOrder = producerTemplate.requestBody(
                    "direct:processOrder", myOrder, Order.class);
            
            System.out.println("resp order:"+respOrder);

            context.stop();
            
        } catch (Exception e) {
            
            e.printStackTrace();
        
        } 

    }

}

Console Output

 
handle book Item:Item [id=1, name=Camel in Action book, type=Book, price=0.0]
book Item processed
Aggregate first item: Item [id=1, name=Camel in Action book, type=Book, price=30.0]
handle phone Item:Item [id=2, name=Apple IPhone8, type=Phone, price=0.0]
phone Item processed
Aggregate old items: Order [id=ORD1463125872691, items=[Item [id=1, name=Camel in Action book, type=Book, price=30.0]], totalPrice=30.0]
Aggregate new item: Item [id=2, name=Apple IPhone8, type=Phone, price=500.0]

resp order:Order [id=ORD1463125872691, items=[Item [id=1, name=Camel in Action book, type=Book, price=30.0], Item [id=2, name=Apple IPhone8, type=Phone, price=500.0]], totalPrice=530.0]

Maven dependencies

pom.xml
<properties>
    <camelspring.version>2.16.0</camelspring.version>  
    <spring.version>4.1.6.RELEASE</spring.version>
</properties>

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>${camelspring.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-cxf</artifactId>
    <version>${camelspring.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring</artifactId>
    <version>${camelspring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>${spring.version}</version>
</dependency>

Recommend this on


No comments:

Post a Comment