By kswaughs | Thursday, June 11, 2020

Spring WebFlux Mono Junit Test Example

This example shows how to write junit test for a method that returns Mono.

StepVerifier from io.projectreactor.reactor-test is used to test reactive components.

Below is the Service class that returns Mono.

BookService.java
package com.kswaughs.webflux;

import com.kswaughs.webflux.model.Book;

import reactor.core.publisher.Mono;

public class BookService {
    
    public Mono<Book> getBookById(int bookId) {
        
        if(bookId == 0) {
            return Mono.error(new Exception("Invalid BookId"));
        }
        
        if(bookId == 100) {
            
            return Mono.just(buildBook(bookId));
        }
        
        return Mono.empty();
    }
    
    private Book buildBook(int bookId) {
        
        Book book = new Book();
        
        book.setBookId(bookId);
        book.setAuthor("John Grisham");
        book.setTitle("A Painted House");

        return book;
    }
}


Below is the Junit test class to test Mono using StepVerifier

SpringWebFluxMonoExampleTest.java
package com.kswaughs.webflux;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

import reactor.test.StepVerifier;

public class SpringWebFluxMonoExampleTest {

    private BookService bookService = new BookService();

    @Test
    public void whenBookExists() {

        StepVerifier.create(bookService.getBookById(100))
            .assertNext(book -> {

                assertEquals(Integer.valueOf(100), book.getBookId());
                assertEquals("John Grisham", book.getAuthor());
                assertEquals("A Painted House", book.getTitle());
                
        }).verifyComplete();
    }
    
    @Test
    public void whenBookNotExists() {

        StepVerifier.create(bookService.getBookById(56))
            .verifyComplete();
    }
    
    @Test
    public void whenBookIdIsInvalid() {

        StepVerifier.create(bookService.getBookById(0))
        
          /** case 1: To validate only Exception className  **/
            //.expectError(Exception.class)
        
          /** case 2: To validate only exception message **/    
            //.expectErrorMessage("Invalid BookId")
        
          /** case 3: To Validate complete exception object **/    
            .expectErrorSatisfies(thr -> {
                assertTrue(thr instanceof Exception);
                assertEquals("Invalid BookId", thr.getMessage());
            })
            .verify();
    }
}

pom.xml
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Useful Links

Spring WebFlux Junit Test Example

Recommend this on


By kswaughs | Tuesday, June 9, 2020

Spring WebFlux Junit Test Example

This example shows how to write junit test for a method that returns Flux of Order objects.

StepVerifier from io.projectreactor.reactor-test is used to test reactive components.

Below is the Service class that returns Flux of three Order objects.

OrderService.java
package com.kswaughs.webflux;

import java.util.ArrayList;
import java.util.List;

import com.kswaughs.webflux.model.Order;

import reactor.core.publisher.Flux;

public class OrderService {
    
   public Flux<Order> getOrders() {
       
       List<Order> orders = new ArrayList<>();
       
       orders.add(buildOrder(345, 30, "DELIVERED"));
       orders.add(buildOrder(654, 50, "IN-TRANSIT"));
       orders.add(buildOrder(999, 12, "DELIVERED"));
       
       // return flux of orders..
       return Flux.fromIterable(orders);
   }

   private Order buildOrder(int orderId, double amount, String status) {
       
       Order order = new Order();
       order.setOrderId(orderId);
       order.setAmount(amount);
       order.setStatus(status);
       
       return order;
   }
}

Below is the Junit test class to test Flux of objects using StepVerifier

SpringWebFluxExampleTest.java
package com.kswaughs.webflux;

import static org.junit.Assert.assertEquals;

import org.junit.Test;

import com.kswaughs.webflux.model.Order;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class SpringWebFluxExampleTest {
    
    private OrderService orderService = new OrderService();

    @Test
    public void testGetOrders() {
        
        Flux<Order> orders = orderService.getOrders();
        
        StepVerifier.create(orders)
            // Above method is returning three orders.
            // we are asserting on first order only.
            .assertNext(order -> {
            
                assertEquals(Integer.valueOf(345), Integer.valueOf(order.getOrderId()));
                assertEquals(Double.valueOf(30), Double.valueOf(order.getAmount()));
                assertEquals("DELIVERED", order.getStatus());
            })
            // verifying count of next orders.
            .expectNextCount(2)
            .verifyComplete();
    }
}

pom.xml
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Useful Links

Spring WebFlux Mono Junit Test Example

Recommend this on


By kswaughs | Monday, May 18, 2020

Spring Webflux Subscriber Example

This post explains how to subscribe to the flux of events with and without lambdas.

Flux publisher code is explained in the below link, that publishes multiplication table of given number starting from 0 to 10.

Spring Webflux publisher example

Example 1: Spring Webflux Subscriber with Lambdas
package com.kswaughs.webflux;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

public class SubscriberExampleWithLambdas {
    
    private static Logger log = LoggerFactory.getLogger("SubscriberExampleWithLambdas");

    public static void main(String[] args) {
        
        ProducerExample producer = new ProducerExample();
        
        Flux<String> flux = producer.produceTableOf(8);
    
        String name = "Netflix";
        
        flux.subscribe(
                
            data -> { // on Next
                log.info("{} -> data received : {}", name, data); 
                }, 
            error -> { // on Error
                log.info("{} -> error received : {}",name, error);
                }, 
            ()-> { // on Completion
                log.info("{} completed", name); 
                }, 
            subscription -> { // on Subscribe   
                log.info("{} Susbcription requested", name);
                subscription.request(15);
            });
        }
}

Output

08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix Susbcription requested
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 0 = 0
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 1 = 8
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 2 = 16
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 3 = 24
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 4 = 32
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 5 = 40
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 6 = 48
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 7 = 56
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 8 = 64
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 9 = 72
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 10 = 80
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix completed

Example 2: Spring Webflux Subscriber without using Lambdas
package com.kswaughs.webflux;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

public class SubscriberExample {
    
    private static Logger log = LoggerFactory.getLogger("SubscriberExample");

    public static void main(String[] args) {
        
        ProducerExample producer = new ProducerExample();
        
        Flux<String> flux = producer.produceTableOf(10);
        
        flux.subscribe(
                
            new Subscriber<String>() {
                
                String name = "Amazon";

                @Override
                public void onComplete() {
                    log.info("{} completed", name);    
                }

                @Override
                public void onError(Throwable error) {
                    log.info("{} -> error received : {}",name, error);    }

                @Override
                public void onNext(String data) {
                    log.info("{} -> data received : {}", name, data);    
                }

                @Override
                public void onSubscribe(Subscription subscription) {
                    log.info("{} Susbcription requested", name);
                    subscription.request(7);
                }
            }
        );
    }
}

Output

08:51:50.152 [main] INFO SubscriberExample - Amazon Susbcription requested
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 0 = 0
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 1 = 10
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 2 = 20
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 3 = 30
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 4 = 40
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 5 = 50
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 6 = 60

In the second example, as subscription requested for 7 elements, flux producer stopped publishing after 7 events. As all events are not completed, so onComplete() method is not executed.

Recommend this on


By kswaughs | Tuesday, May 12, 2020

Spring Webflux Publisher Example

This post explains how to produce or generate flux of events programmatically.

In this example, we are publishing multiplication table of given number starting from 0 to 10 and subscriber is printing each event in console.

PublisherExample.java
package com.kswaughs.webflux;

import reactor.core.publisher.Flux;

public class PublisherExample {

    public Flux<String> produceTableOf(int num) {

        return Flux.generate(
            () -> 0, // Supplier of Initial State
            (state, sink) -> {

                if (state > 10) {
                    sink.complete();
                    return state;
                }

                String calc = num + " x " + state + " = " + num * state;
                sink.next(calc);
                return state + 1;
            }, 
            state -> {
                System.out.println("Final state:" + state);
            });
    }

    public static void main(String[] args) {

        PublisherExample example = new PublisherExample();

        System.out.println("*** Table of 5 ****");
        example.produceTableOf(5).subscribe(System.out::println);

        System.out.println("\n*** Table of 3 ****");
        example.produceTableOf(3).subscribe(System.out::println);
    }
}

Output

*** Table of 5 ****
5 x 0 = 0
5 x 1 = 5
5 x 2 = 10
5 x 3 = 15
5 x 4 = 20
5 x 5 = 25
5 x 6 = 30
5 x 7 = 35
5 x 8 = 40
5 x 9 = 45
5 x 10 = 50
Final state:11

*** Table of 3 ****
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
Final state:11

Recommend this on