By kswaughs | Thursday, March 10, 2016

How to call multiple methods in parallel with different return types.

In a multi-thread programming applications development, Java concurrency API provides an ExecutorService interface that represents an asynchronous execution mechanism which is capable of executing multiple tasks in the background. This mechanism separates the task creation and its execution. With an executor, we only have to implement the Runnable objects and send them to the executor. ExecutorService takes care of their execution, instantiation, and running with necessary threads.

In this example, I am not only explaining how to use this ExecutorService, But also how to design your java classes like tasks, data models that will be used in real time applications.

The requirement is to write a processor that takes userId as an input and call two different back-end methods to get personal and employee information simultaneously. Read those response objects and build an user data object and send it as an output.

Step 1 : Create below data model classes

PersonalData - To hold user personal data returned by personal back-end method

PersonalData.java
package com.kswaughs.concurrency.beans;

public class PersonalData {

    private String userId;
    
    private String userName;
    
    private String homeAddress;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getHomeAddress() {
        return homeAddress;
    }

    public void setHomeAddress(String homeAddress) {
        this.homeAddress = homeAddress;
    }
}

EmployeeData - To hold user employee data returned by employee back-end method

EmployeeData.java
package com.kswaughs.concurrency.beans;

public class EmployeeData {

    private String userId;
    
    private String designation;
    
    private String officeAddress;

    public String getUserId() {
        return userId;
    }
    
    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getDesignation() {
        return designation;
    }

    public void setDesignation(String designation) {
        this.designation = designation;
    }

    public String getOfficeAddress() {
        return officeAddress;
    }

    public void setOfficeAddress(String officeAddress) {
        this.officeAddress = officeAddress;
    }
}

UserData - To hold user data that includes personal and employee data returned by our application

UserData.java
package com.kswaughs.concurrency.beans;

public class UserData {
    
    private String userId;
    
    private String userName;
    
    private String homeAddress;

    private String designation;
    
    private String officeAddress;
    
    public UserData(String userId) {
        this.userId = userId;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getHomeAddress() {
        return homeAddress;
    }

    public void setHomeAddress(String homeAddress) {
        this.homeAddress = homeAddress;
    }

    public String getDesignation() {
        return designation;
    }

    public void setDesignation(String designation) {
        this.designation = designation;
    }

    public String getOfficeAddress() {
        return officeAddress;
    }

    public void setOfficeAddress(String officeAddress) {
        this.officeAddress = officeAddress;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("UserData [userId=");
        builder.append(userId);
        builder.append(", userName=");
        builder.append(userName);
        builder.append(", homeAddress=");
        builder.append(homeAddress);
        builder.append(", designation=");
        builder.append(designation);
        builder.append(", officeAddress=");
        builder.append(officeAddress);
        builder.append("]");
        return builder.toString();
    }

}

TaskResult - To hold the output returned by each back-end method and is used by ExecutorService

TaskResult.java
package com.kswaughs.concurrency.beans;

public class TaskResult {

    private String taskName;
    
    private Object result;
    
    public TaskResult(String taskName) {
        this.taskName = taskName;
    }

    public String getTaskName() {
        return taskName;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }    

}

Step 2 : Create below task classes to process the request.

PersonalDataTask - This class implements Callable interface and is responsible to get personal data from a back-end method and builds PersonalData object return it in TaskResult.

PersonalDataTask.java
package com.kswaughs.concurrency.tasks;

import java.util.concurrent.Callable;

import com.kswaughs.concurrency.beans.PersonalData;
import com.kswaughs.concurrency.beans.TaskResult;

public class PersonalDataTask implements Callable<TaskResult> {

    private String id;
    
    public PersonalDataTask(String id) {
        this.id = id;
    }
    
    @Override
    public TaskResult call() throws Exception {
        
        System.out.println("PROCESSING PERSONAL_DATA : calling back-end method");
        
        PersonalData data = getPersonalData(id);
        
        TaskResult result = new TaskResult("PERSONAL_DATA");
        result.setResult(data);
        
        Thread.sleep(1000); // waiting for a second
        
        System.out.println("PROCESSING PERSONAL_DATA : received data successfully");

        return result;
    }
    
    
    private PersonalData getPersonalData(String id) {
        
        PersonalData data = new PersonalData();
        data.setUserId(id);
        data.setUserName("kswaughs");
        data.setHomeAddress("Banjara Hills-Hyderabad");
        
        return data;
        
    }

}

EmployeeDataTask - This class implements Callable interface and is responsible to get employee data from a back-end method and builds EmployeeData object return it in TaskResult.

EmployeeDataTask.java
package com.kswaughs.concurrency.tasks;

import java.util.concurrent.Callable;

import com.kswaughs.concurrency.beans.EmployeeData;
import com.kswaughs.concurrency.beans.TaskResult;

public class EmployeeDataTask implements Callable<TaskResult> {

    private String id;
    
    public EmployeeDataTask(String id) {
        this.id = id;
    }
    
    @Override
    public TaskResult call() throws Exception {
 
        System.out.println("PROCESSING EMPLOYEE_DATA : calling back-end method");
        
        EmployeeData data = getEmployeeData(id);
        
        TaskResult result = new TaskResult("EMPLOYEE_DATA");
        result.setResult(data);
        
        Thread.sleep(1000); // waiting for a second
        
        System.out.println("PROCESSING EMPLOYEE_DATA : received data successfully");
        
        return result;
    }
    
    
    private EmployeeData getEmployeeData(String id) {
        
        EmployeeData data = new EmployeeData();
        data.setUserId(id);
        data.setDesignation("Manager");
        data.setOfficeAddress("Madhapur-Hyderabad");
        
        return data;
        
    }
    
}

Step 3 : Create a main processor class.

UserDataProcessor - This class takes userid as input and process the above two functionalities in parallel using ExecutorService. After getting the response from each back-end methods, aggregate the two responses into one single response object.

UserDataProcessor.java
package com.kswaughs.concurrency;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.kswaughs.concurrency.beans.EmployeeData;
import com.kswaughs.concurrency.beans.PersonalData;
import com.kswaughs.concurrency.beans.TaskResult;
import com.kswaughs.concurrency.beans.UserData;
import com.kswaughs.concurrency.tasks.EmployeeDataTask;
import com.kswaughs.concurrency.tasks.PersonalDataTask;

public class UserDataProcessor {

    public UserData getUserData(String userId) 
        throws InterruptedException, ExecutionException {
        
        UserData userData = new UserData(userId);
        
        ExecutorService svc = Executors.newFixedThreadPool(2);
        
        Set<Callable<TaskResult>> callables = new HashSet<Callable<TaskResult>>();
        
        callables.add(new PersonalDataTask(userId));
        callables.add(new EmployeeDataTask(userId));
        
        System.out.println("PROCESSING USER_DATA : Calling tasks parallely");
        
        List<Future<TaskResult>> futures = svc.invokeAll(callables);

        for(Future<TaskResult> future : futures){
            
            processResult(userData, future);
        }
        
        svc.shutdown();
        
        System.out.println("PROCESSING USER_DATA : Finished processing the tasks");
        
        return userData;
        
    }

    private void processResult(UserData userData, Future<TaskResult> future)
            throws InterruptedException, ExecutionException {
        
        TaskResult result = future.get();
        
        if("PERSONAL_DATA".equals(result.getTaskName())) {
            
            buildPersonalData(userData, result);
        
        } else {
            
            buildEmployeeData(userData, result);
        }
    }

    private void buildPersonalData(UserData userData, TaskResult result) {
        
        PersonalData personalData = (PersonalData) result.getResult();
        
        userData.setUserName(personalData.getUserName());
        userData.setHomeAddress(personalData.getHomeAddress());
        
    }
    
    private void buildEmployeeData(UserData userData, TaskResult result) {
        
        EmployeeData empData = (EmployeeData) result.getResult();
        
        userData.setDesignation(empData.getDesignation());
        userData.setOfficeAddress(empData.getOfficeAddress());
        
    }
    
}

Step 4 : Testing

UserApplication.java
package com.kswaughs.concurrency;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.kswaughs.concurrency.beans.UserData;

public class UserApplication {

    public static void main(String[] args) 
        throws InterruptedException, ExecutionException {
        
        UserDataProcessor processor = new UserDataProcessor();
        
        UserData data = processor.getUserData("251514");
        
        System.out.println(data);
    }
}

Console Output

PROCESSING USER_DATA : Calling tasks parallely
PROCESSING PERSONAL_DATA : calling back-end method
PROCESSING EMPLOYEE_DATA : calling back-end method
PROCESSING PERSONAL_DATA : received data successfully
PROCESSING EMPLOYEE_DATA : received data successfully
PROCESSING USER_DATA : Finished processing the tasks
UserData [userId=251514, userName=kswaughs, homeAddress=Banjara Hills-Hyderabad, designation=Manager, officeAddress=Madhapur-Hyderabad]

Recommend this on


9 comments:

  1. Very Good Example with detailed explanation.

    ReplyDelete
  2. Exactly what I was looking for, thanks so much!

    ReplyDelete
  3. Exactly what I was looking for, thanks so much!

    ReplyDelete
  4. I have an issue its stuck if one of the result is not completed and wait until its job complete, is any solution

    ReplyDelete