This post explains how to subscribe to the flux of events with and without lambdas.
Flux publisher code is explained in the below link, that publishes multiplication table of given number starting from 0 to 10.
Spring Webflux publisher example
Example 1: Spring Webflux Subscriber with Lambdas
package com.kswaughs.webflux; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; public class SubscriberExampleWithLambdas { private static Logger log = LoggerFactory.getLogger("SubscriberExampleWithLambdas"); public static void main(String[] args) { ProducerExample producer = new ProducerExample(); Flux<String> flux = producer.produceTableOf(8); String name = "Netflix"; flux.subscribe( data -> { // on Next log.info("{} -> data received : {}", name, data); }, error -> { // on Error log.info("{} -> error received : {}",name, error); }, ()-> { // on Completion log.info("{} completed", name); }, subscription -> { // on Subscribe log.info("{} Susbcription requested", name); subscription.request(15); }); } }
Output
08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix Susbcription requested 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 0 = 0 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 1 = 8 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 2 = 16 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 3 = 24 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 4 = 32 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 5 = 40 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 6 = 48 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 7 = 56 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 8 = 64 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 9 = 72 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix -> data received : 8 x 10 = 80 08:41:24.242 [main] INFO SubscriberExampleWithLambdas - Netflix completed
Example 2: Spring Webflux Subscriber without using Lambdas
package com.kswaughs.webflux; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; public class SubscriberExample { private static Logger log = LoggerFactory.getLogger("SubscriberExample"); public static void main(String[] args) { ProducerExample producer = new ProducerExample(); Flux<String> flux = producer.produceTableOf(10); flux.subscribe( new Subscriber<String>() { String name = "Amazon"; @Override public void onComplete() { log.info("{} completed", name); } @Override public void onError(Throwable error) { log.info("{} -> error received : {}",name, error); } @Override public void onNext(String data) { log.info("{} -> data received : {}", name, data); } @Override public void onSubscribe(Subscription subscription) { log.info("{} Susbcription requested", name); subscription.request(7); } } ); } }
Output
08:51:50.152 [main] INFO SubscriberExample - Amazon Susbcription requested 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 0 = 0 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 1 = 10 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 2 = 20 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 3 = 30 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 4 = 40 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 5 = 50 08:51:50.152 [main] INFO SubscriberExample - Amazon -> data received : 10 x 6 = 60
In the second example, as subscription requested for 7 elements, flux producer stopped publishing after 7 events. As all events are not completed, so onComplete() method is not executed.
No comments:
Post a Comment