By kswaughs | Monday, March 21, 2016

Spring Boot Batch Job Example

Spring Boot provides a lot of utility classes to perform batch jobs and schedule them at regular intervals. This helps developers focus on writing only business logic and hence saves our time and effort.

Spring Batch Framework offers two processing styles.

  • TaskletStep Oriented
  • Chunk Oriented

TaskletStep is used when either only reading or writing the data item is required.

Chunk is used when both reading and writing the data item is required.

In this example, I will explain how to configure batch jobs with Chunk Oriented Processing Model using spring boot.

Chunk Oriented Processing Model involves three components

1.ItemReader - An input operation, Used for providing the data. 
  It reads the data that will be processed.

2.ItemProcessor - A processing operation, Used for item transformation.
  It processes input object and transforms to output object.

3.ItemWriter - An Output operation, Used to write the data transformed by ItemProcessor.
  The data items can be written to database, memory or outputstream.

One item is read from an ItemReader, handed to an ItemProcessor, and transformed into an output object. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter, and then the transaction is committed.

In this sample application, we will read the input data from a csv file and transform into rest service request and call a back-end rest service with 3 data items at a time.

Batch Job involves Job Configuration and Step Configuration.

  • Step Configuration defines the reader, processor, writer and chunk items
  • Job Configuration defines the incrementer, listener and flow Step that we defined above.

Step 1 : Create a batch configuration class as below.

BatchConfiguration
package com.kswaughs.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import com.kswaughs.beans.Order;
import com.kswaughs.beans.SvcReq;
import com.kswaughs.order.OrderSvcInvoker;
import com.kswaughs.processor.JobCompletionNotificationListener;
import com.kswaughs.processor.OrderItemProcessor;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job processOrderJob() {
        return jobBuilderFactory.get("processOrderJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(orderStep())
                .end()
                .build();
    }
    
    @Bean
    public Step orderStep() {
        return stepBuilderFactory.get("orderStep")
                .<Order, SvcReq> chunk(3)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
    
    @Bean
    public FlatFileItemReader<Order> reader() {
        FlatFileItemReader<Order> reader = new FlatFileItemReader<Order>();
        reader.setResource(new ClassPathResource("PhoneData.csv"));
        reader.setLineMapper(new DefaultLineMapper<Order>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "orderID", "orderName" });
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Order>() {{
                setTargetType(Order.class);
            }});
        }});
        return reader;
    }

    @Bean
    public OrderItemProcessor processor() {
        return new OrderItemProcessor();
    }
    
    @Bean
    public ItemWriter<SvcReq> writer() {
        return new OrderSvcInvoker();
    }

   @Bean
    public JobExecutionListener listener() {
        return new JobCompletionNotificationListener();
    }
    
}

Step 2: Below class is the processor that transforms into rest service request.

OrderItemProcessor
package com.kswaughs.processor;

import org.springframework.batch.item.ItemProcessor;

import com.kswaughs.beans.Order;
import com.kswaughs.beans.SvcReq;

public class OrderItemProcessor implements ItemProcessor<Order, SvcReq> {
    
    @Override
    public SvcReq process(final Order order) throws Exception {
      
        final SvcReq svcReq = new SvcReq();
        
        svcReq.setId(order.getOrderID());
        svcReq.setName(order.getOrderName().toUpperCase());

        System.out.println("Converting (" + order + ") into (" + svcReq + ")");

        return svcReq;
    }

}

Step 3: Below class is the Item Writer that calls the rest service.

OrderSvcInvoker
package com.kswaughs.order;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import com.kswaughs.beans.SvcReq;
import com.kswaughs.beans.SvcResp;

public class OrderSvcInvoker implements ItemWriter<SvcReq> {

    @Override
    public void write(List<? extends SvcReq> svcReqs) throws Exception {
        
        for (SvcReq svcReq : svcReqs) {
            
            RestTemplate restTemplate = new RestTemplate();
            
            ResponseEntity<SvcResp> respEntity = restTemplate
                .postForEntity("http://localhost:8080/phone/order", svcReq, SvcResp.class);
            
            SvcResp resp = respEntity.getBody();
            
            System.out.println("calling web service:" + resp);
        }
        
        System.out.println("Processed items:" + svcReqs.size());
    }

}

Step 4: Below class is the listener class that is executed when the job is finished.

JobCompletionNotificationListener
package com.kswaughs.processor;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("BATCH JOB FINISHED SUCCESSFULLY");
        }
    }
}

Step 5: Below are the value bean classes for Item, rest request and rest response.

Order
package com.kswaughs.beans;

public class Order {
    
    private String orderID;
    private String orderName;

    public String getOrderID() {
        return orderID;
    }

    public void setOrderID(String orderID) {
        this.orderID = orderID;
    }

    public String getOrderName() {
        return orderName;
    }

    public void setOrderName(String orderName) {
        this.orderName = orderName;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Order [orderID=");
        builder.append(orderID);
        builder.append(", orderName=");
        builder.append(orderName);
        builder.append("]");
        return builder.toString();
    }

}

SvcReq
package com.kswaughs.beans;

public class SvcReq {
    
    private String id;
    
    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("SvcReq [id=");
        builder.append(id);
        builder.append(", name=");
        builder.append(name);
        builder.append("]");
        return builder.toString();
    }
}

SvcResp
package com.kswaughs.beans;

public class SvcResp {
    
    private String id;
    
    private String message;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("SvcResp [id=");
        builder.append(id);
        builder.append(", message=");
        builder.append(message);
        builder.append("]");
        return builder.toString();
    }
}

Main class

BootApp
package com.kswaughs.config;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan({ "com.kswaughs" })
public class BootApp {
    
    public static void main(String[] args) {
        
        SpringApplication.run(new Object[] { BootApp.class }, args);

    }

}

Input csv file : PhoneData.csv

101,Apple IPhone
102,Samsung Galaxy Y
103,Moto E
104,Moto X
105,Yuphoria

Output:

2016-03-16 13:58:53.324  INFO 12876 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: 
[FlowJob: [name=processOrderJob]] launched with the following parameters: [{run.id=1}]
2016-03-16 13:58:53.343  INFO 12876 --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [orderStep]
Converting (Order [orderID=101, orderName=Apple IPhone]) into (SvcReq [id=101, name=APPLE IPHONE])
Converting (Order [orderID=102, orderName=Samsung Galaxy Y]) into (SvcReq [id=102, name=SAMSUNG GALAXY Y])
Converting (Order [orderID=103, orderName=Moto E]) into (SvcReq [id=103, name=MOTO E])
calling web service:SvcResp [id=101, message=APPLE IPHONE Processed successfully]
calling web service:SvcResp [id=102, message=SAMSUNG GALAXY Y Processed successfully]
calling web service:SvcResp [id=103, message=MOTO E Processed successfully]
Processed items:3
Converting (Order [orderID=104, orderName=Moto X]) into (SvcReq [id=104, name=MOTO X])
Converting (Order [orderID=105, orderName=Yuphoria]) into (SvcReq [id=105, name=YUPHORIA])
calling web service:SvcResp [id=104, message=MOTO X Processed successfully]
calling web service:SvcResp [id=105, message=YUPHORIA Processed successfully]
Processed items:2
BATCH JOB FINISHED SUCCESSFULLY
2016-03-16 13:58:53.786  INFO 12876 --- [main] o.s.b.c.l.support.SimpleJobLauncher: Job: 
[FlowJob: [name=processOrderJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]

Maven POM

<dependencies>
    <!-- Spring framework related jars -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>1.3.0.RELEASE</version>
    </dependency>
    <!-- Batch related jars -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
        <version>1.2.7.RELEASE</version>
    </dependency>
    <!-- Rest service call related jars -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-rest</artifactId>
        <version>1.3.0.RELEASE</version>
        <exclusions>
              <exclusion>
                   <groupId>org.springframework.boot</groupId>
                   <artifactId>spring-boot-starter-web</artifactId>
               </exclusion>
           </exclusions> 
    </dependency>
</dependencies>

Recommend this on


14 comments:

  1. Hi kswaughs,
    is there any github project for this?
    SO that we can see your project structure.

    Thanks

    ReplyDelete
    Replies
    1. Currently, our code is not committed to github. This code is configured in my local eclipse as a simple maven project. Below is the project structure.

      proj_name/src/main/java [ Java files ]
      proj_name/src/main/resources [ properties & csv files ]
      proj_name/pom.xml [ maven pom ]

      Package names of each classes are already defined in java classes.

      Delete
    2. Sorry, But I can not follow your project.
      So that I got error again and again.
      I think it would be nice if you put this project to github so that everyone can see and test it easily.
      Thanks

      Delete
    3. Could you please provide GIT hub link to download code?

      Regards,
      Savani

      Delete
    4. Could you please share the application.properties & pom files ?

      Delete
  2. Hi Kswaughs ,
    I created your project in ecipse but it cant Post data inti localhost:8080/phone/Order , connection refused as it cant find the path.

    ReplyDelete
  3. Hi Vikram,

    This example is about batch job which runs as stand-alone application. This post purely focuses on batch job components & its functionalities. The Item writer is one such component used as rest consumer. The back-end rest service url localhost:8080/phone/Order which is called here is an independant component and it must run on some server as rest service[provider].
    In this website, there are lots of examples on developing a rest service using different frameworks. You can refer them.

    ReplyDelete
  4. Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

    ReplyDelete