By kswaughs | Monday, May 18, 2020

Spring Webflux Subscriber Example

This post explains how to subscribe to the flux of events with and without lambdas.

Flux publisher code is explained in the below link, that publishes multiplication table of given number starting from 0 to 10.

Spring Webflux publisher example

Example 1: Spring Webflux Subscriber with Lambdas
package com.kswaughs.webflux;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

public class SubscriberExampleWithLambdas {
    
    private static Logger log = LoggerFactory.getLogger("SubscriberExampleWithLambdas");

    public static void main(String[] args) {
        
        ProducerExample producer = new ProducerExample();
        
        Flux<String> flux = producer.produceTableOf(8);
    
        String name = "Netflix";
        
        flux.subscribe(
                
            data -> { // on Next
                log.info("{} -> data received : {}", name, data); 
                }, 
            error -> { // on Error
                log.info("{} -> error received : {}",name, error);
                }, 
            ()-> { // on Completion
                log.info("{} completed", name); 
                }, 
            subscription -> { // on Subscribe   
                log.info("{} Susbcription requested", name);
                subscription.request(15);
            });
        }
}

Output

08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix Susbcription requested
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 0 = 0
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 1 = 8
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 2 = 16
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 3 = 24
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 4 = 32
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 5 = 40
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 6 = 48
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 7 = 56
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 8 = 64
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 9 = 72
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 10 = 80
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix completed

Example 2: Spring Webflux Subscriber without using Lambdas
package com.kswaughs.webflux;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

public class SubscriberExample {
    
    private static Logger log = LoggerFactory.getLogger("SubscriberExample");

    public static void main(String[] args) {
        
        ProducerExample producer = new ProducerExample();
        
        Flux<String> flux = producer.produceTableOf(10);
        
        flux.subscribe(
                
            new Subscriber<String>() {
                
                String name = "Amazon";

                @Override
                public void onComplete() {
                    log.info("{} completed", name);    
                }

                @Override
                public void onError(Throwable error) {
                    log.info("{} -> error received : {}",name, error);    }

                @Override
                public void onNext(String data) {
                    log.info("{} -> data received : {}", name, data);    
                }

                @Override
                public void onSubscribe(Subscription subscription) {
                    log.info("{} Susbcription requested", name);
                    subscription.request(7);
                }
            }
        );
    }
}

Output

08:51:50.152 [main] INFO SubscriberExample - Amazon Susbcription requested
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 0 = 0
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 1 = 10
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 2 = 20
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 3 = 30
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 4 = 40
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 5 = 50
08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 6 = 60

In the second example, as subscription requested for 7 elements, flux producer stopped publishing after 7 events. As all events are not completed, so onComplete() method is not executed.

Recommend this on