In a multi-thread programming applications development, Java concurrency API provides an ExecutorService interface that represents an asynchronous execution mechanism which is capable of executing multiple tasks in the background. This mechanism separates the task creation and its execution. With an executor, we only have to implement the Runnable objects and send them to the executor. ExecutorService takes care of their execution, instantiation, and running with necessary threads.
In this example, I am not only explaining how to use this ExecutorService, But also how to design your java classes like tasks, data models that will be used in real time applications.
The requirement is to write a processor that takes userId as an input and call two different back-end methods to get personal and employee information simultaneously. Read those response objects and build an user data object and send it as an output.
Step 1 : Create below data model classes
PersonalData - To hold user personal data returned by personal back-end method
package com.kswaughs.concurrency.beans; public class PersonalData { private String userId; private String userName; private String homeAddress; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getHomeAddress() { return homeAddress; } public void setHomeAddress(String homeAddress) { this.homeAddress = homeAddress; } }
EmployeeData - To hold user employee data returned by employee back-end method
package com.kswaughs.concurrency.beans; public class EmployeeData { private String userId; private String designation; private String officeAddress; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getDesignation() { return designation; } public void setDesignation(String designation) { this.designation = designation; } public String getOfficeAddress() { return officeAddress; } public void setOfficeAddress(String officeAddress) { this.officeAddress = officeAddress; } }
UserData - To hold user data that includes personal and employee data returned by our application
package com.kswaughs.concurrency.beans; public class UserData { private String userId; private String userName; private String homeAddress; private String designation; private String officeAddress; public UserData(String userId) { this.userId = userId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getHomeAddress() { return homeAddress; } public void setHomeAddress(String homeAddress) { this.homeAddress = homeAddress; } public String getDesignation() { return designation; } public void setDesignation(String designation) { this.designation = designation; } public String getOfficeAddress() { return officeAddress; } public void setOfficeAddress(String officeAddress) { this.officeAddress = officeAddress; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("UserData [userId="); builder.append(userId); builder.append(", userName="); builder.append(userName); builder.append(", homeAddress="); builder.append(homeAddress); builder.append(", designation="); builder.append(designation); builder.append(", officeAddress="); builder.append(officeAddress); builder.append("]"); return builder.toString(); } }
TaskResult - To hold the output returned by each back-end method and is used by ExecutorService
package com.kswaughs.concurrency.beans; public class TaskResult { private String taskName; private Object result; public TaskResult(String taskName) { this.taskName = taskName; } public String getTaskName() { return taskName; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } }
Step 2 : Create below task classes to process the request.
PersonalDataTask - This class implements Callable interface and is responsible to get personal data from a back-end method and builds PersonalData object return it in TaskResult.
package com.kswaughs.concurrency.tasks; import java.util.concurrent.Callable; import com.kswaughs.concurrency.beans.PersonalData; import com.kswaughs.concurrency.beans.TaskResult; public class PersonalDataTask implements Callable<TaskResult> { private String id; public PersonalDataTask(String id) { this.id = id; } @Override public TaskResult call() throws Exception { System.out.println("PROCESSING PERSONAL_DATA : calling back-end method"); PersonalData data = getPersonalData(id); TaskResult result = new TaskResult("PERSONAL_DATA"); result.setResult(data); Thread.sleep(1000); // waiting for a second System.out.println("PROCESSING PERSONAL_DATA : received data successfully"); return result; } private PersonalData getPersonalData(String id) { PersonalData data = new PersonalData(); data.setUserId(id); data.setUserName("kswaughs"); data.setHomeAddress("Banjara Hills-Hyderabad"); return data; } }
EmployeeDataTask - This class implements Callable interface and is responsible to get employee data from a back-end method and builds EmployeeData object return it in TaskResult.
package com.kswaughs.concurrency.tasks; import java.util.concurrent.Callable; import com.kswaughs.concurrency.beans.EmployeeData; import com.kswaughs.concurrency.beans.TaskResult; public class EmployeeDataTask implements Callable<TaskResult> { private String id; public EmployeeDataTask(String id) { this.id = id; } @Override public TaskResult call() throws Exception { System.out.println("PROCESSING EMPLOYEE_DATA : calling back-end method"); EmployeeData data = getEmployeeData(id); TaskResult result = new TaskResult("EMPLOYEE_DATA"); result.setResult(data); Thread.sleep(1000); // waiting for a second System.out.println("PROCESSING EMPLOYEE_DATA : received data successfully"); return result; } private EmployeeData getEmployeeData(String id) { EmployeeData data = new EmployeeData(); data.setUserId(id); data.setDesignation("Manager"); data.setOfficeAddress("Madhapur-Hyderabad"); return data; } }
Step 3 : Create a main processor class.
UserDataProcessor - This class takes userid as input and process the above two functionalities in parallel using ExecutorService. After getting the response from each back-end methods, aggregate the two responses into one single response object.
package com.kswaughs.concurrency; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.kswaughs.concurrency.beans.EmployeeData; import com.kswaughs.concurrency.beans.PersonalData; import com.kswaughs.concurrency.beans.TaskResult; import com.kswaughs.concurrency.beans.UserData; import com.kswaughs.concurrency.tasks.EmployeeDataTask; import com.kswaughs.concurrency.tasks.PersonalDataTask; public class UserDataProcessor { public UserData getUserData(String userId) throws InterruptedException, ExecutionException { UserData userData = new UserData(userId); ExecutorService svc = Executors.newFixedThreadPool(2); Set<Callable<TaskResult>> callables = new HashSet<Callable<TaskResult>>(); callables.add(new PersonalDataTask(userId)); callables.add(new EmployeeDataTask(userId)); System.out.println("PROCESSING USER_DATA : Calling tasks parallely"); List<Future<TaskResult>> futures = svc.invokeAll(callables); for(Future<TaskResult> future : futures){ processResult(userData, future); } svc.shutdown(); System.out.println("PROCESSING USER_DATA : Finished processing the tasks"); return userData; } private void processResult(UserData userData, Future<TaskResult> future) throws InterruptedException, ExecutionException { TaskResult result = future.get(); if("PERSONAL_DATA".equals(result.getTaskName())) { buildPersonalData(userData, result); } else { buildEmployeeData(userData, result); } } private void buildPersonalData(UserData userData, TaskResult result) { PersonalData personalData = (PersonalData) result.getResult(); userData.setUserName(personalData.getUserName()); userData.setHomeAddress(personalData.getHomeAddress()); } private void buildEmployeeData(UserData userData, TaskResult result) { EmployeeData empData = (EmployeeData) result.getResult(); userData.setDesignation(empData.getDesignation()); userData.setOfficeAddress(empData.getOfficeAddress()); } }
Step 4 : Testing
package com.kswaughs.concurrency; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.kswaughs.concurrency.beans.UserData; public class UserApplication { public static void main(String[] args) throws InterruptedException, ExecutionException { UserDataProcessor processor = new UserDataProcessor(); UserData data = processor.getUserData("251514"); System.out.println(data); } }
Console Output
PROCESSING USER_DATA : Calling tasks parallely PROCESSING PERSONAL_DATA : calling back-end method PROCESSING EMPLOYEE_DATA : calling back-end method PROCESSING PERSONAL_DATA : received data successfully PROCESSING EMPLOYEE_DATA : received data successfully PROCESSING USER_DATA : Finished processing the tasks UserData [userId=251514, userName=kswaughs, homeAddress=Banjara Hills-Hyderabad, designation=Manager, officeAddress=Madhapur-Hyderabad]
Very Good Example with detailed explanation.
ReplyDeleteExactly what I was looking for, thanks so much!
ReplyDeleteExactly what I was looking for, thanks so much!
ReplyDeleteI have an issue its stuck if one of the result is not completed and wait until its job complete, is any solution
ReplyDelete