Reactive Channel usage example
Please read channels' overview first before continuing reading.
Let's assume that there is a native microservice which opens stream for channel to which it will be writing and a client (JUnit) which will be handling that microservice results with reactive iterator.
Native microservice (sampleWriterMicroservice
) as reactive Writer/Publisher:
Interface:
public interface SampleChannelWriter {
String openStreamChannelAndStartWriteToIt() throws Throwable;
}
Implementation:
@Service("sampleChannelWriter")
public class SampleChannelWriterImpl implements SampleChannelWriter {
@Autowired
private ExecutorService executorService;
@Autowired
JLupinClientChannelUtil jLupinClientChannelUtil;
@Override
public String openStreamChannelAndStartWriteToIt() throws Throwable {
String streamChannelId = jLupinClientChannelUtil.openStreamChannel();
executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
for(int i = 0 ; i < 5; i ++) {
jLupinClientChannelUtil.putNextElementToStreamChannel(streamChannelId, new Integer(i));
Thread.currentThread().sleep(1000);
}
jLupinClientChannelUtil.closeStreamChannel(streamChannelId);
return null;
}
});
return streamChannelId;
}
}
In above implementation Thread.currentThread().sleep(1000)
line is just to simulate long computation - it is not necessary to use it in your code.
Beans' configuration:
@Bean(name="jLupinClientChannelUtil")
public JLupinClientChannelUtil getJLupinClientChannelUtil() {
JLupinChannelManagerService jLupinChannelManagerService = JLupinClientUtil.generateRemote(
getLupinDelegator(), "channelMicroservice", "jLupinChannelManagerService", JLupinChannelManagerService.class);
return new JLupinClientChannelUtil("SAMPLE", jLupinChannelManagerService);
}
@Bean(name="executorService")
public ExecutorService getExecutorService() {
return JLupinClientUtil.getExecutorServiceByNameManagedByJLupin("THREAD_POOL_1");
}
As you can see from implementation method openStreamChannelAndStartWriteToIt
opends new stream for desired channel and write to it in seperate thread. Stream is dynamically generated (ephermal), opened and then closed just for time when communication between publishers and subscribers is taking place. Of course in real application other parts of system can just write to stream in channel - only knowledge of stream id is required (in this example variable streamChannelId
). Additional thread pool is offloading main thread pool defined in microservice.yml
for other services and their methods.
Channel configuration
Channel named SAMPLE
configuration in file channels.yml
in microservice channelMicroservice
:
CHANNELS:
SAMPLE:
garbageThreadAmount: 4
channelTimeOutInMilliseconds: 60000
elementTimeOutInMilliseconds: 30000
waitTimeBetweenCheckingChannelsInMilliseconds: 5000
storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.streamchannel.impl.memory.JLupinMemoryStreamChannelStorageImpl'
Spring Boot as reactive Reader/Subscriber
Spring Boot 2 icludes stream support by special types of objects. If the Flux
type is returned Spring Boot allows to stream result instead of returning whole result at the end of computation. Configuration file:
package com.demo.configuration;
import com.demo.service.interfaces.SampleChannelWriter;
import com.jlupin.impl.client.util.JLupinClientUtil;
import com.jlupin.impl.client.util.channel.JLupinClientChannelIterableProducer;
import com.jlupin.interfaces.client.delegator.JLupinDelegator;
import com.jlupin.interfaces.common.enums.PortType;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.channel.JLupinChannelManagerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
@Configuration
@ComponentScan({
"com.demo",
"com.jlupin.servlet.monitor.configuration"
})
@EnableWebFlux
public class DemoSpringConfiguration {
@Bean
public JLupinDelegator getJLRMCJLupinDelegator() {
return JLupinClientUtil.generateInnerMicroserviceLoadBalancerDelegator(PortType.JLRMC);
}
@Bean
public JLupinChannelManagerService getJLupinChannelManagerService() {
return JLupinClientUtil.generateRemote(getJLRMCJLupinDelegator(), "channelMicroservice", "jLupinChannelManagerService", JLupinChannelManagerService.class);
}
@Bean
public JLupinClientChannelIterableProducer getJLupinClientChannelIterableProducer() {
return new JLupinClientChannelIterableProducer(getJLupinChannelManagerService());
}
@Bean(name = "sampleChannelWriter")
public SampleChannelWriter getSampleChannelWriter() {
return JLupinClientUtil.generateRemote(getJLRMCJLupinDelegator(), "sampleWriterMicroservice", SampleChannelWriter.class);
}
}
Spring Boot controller:
package com.demo.controller;
import com.demo.service.interfaces.SampleChannelWriter;
import com.jlupin.impl.client.util.channel.JLupinClientChannelIterableProducer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.util.Iterator;
@RestController
public class ReactiveInfoController {
@Autowired
private SampleChannelWriter sampleChannelWriter;
@Autowired
private JLupinClientChannelIterableProducer jLupinClientChannelIterableProducer;
@GetMapping("/list")
public Flux<Integer> list() throws Throwable {
final String streamId = sampleChannelWriter.openStreamChannelAndStartWriteToIt();
final Iterable iterable = jLupinClientChannelIterableProducer.produceChannelIterable("SAMPLE", streamId);
return Flux.from(new Publisher<Integer>() {
@Override
public void subscribe(final Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new MySubscription(subscriber, iterable.iterator()));
}
class MySubscription implements Subscription {
private final Subscriber<? super Integer> subscriber;
private final Iterator<Integer> iterator;
private boolean canceled;
public MySubscription(final Subscriber<? super Integer> subscriber, final Iterator<Integer> iterator) {
this.subscriber = subscriber;
this.iterator = iterator;
this.canceled = false;
}
@Override
public void request(final long l) {
for (long i = 0; i < l && !canceled; ++i) {
if (iterator.hasNext()) {
final Integer res = iterator.next();
subscriber.onNext(res);
} else {
subscriber.onComplete();
break;
}
}
}
@Override
public void cancel() {
this.canceled = true;
}
}
});
}
}
Because Spring Boot uses WebFlux which is using it's own Subscriber
type (not standard Java type from java.util.concurrent
) JLupin doesn't support returning WebFlux's Subscriber
. It's due to that JLupin does not want to be depndant on other libraries. You must create publisher by yourself as shown in example above. In feature release standard Java Publisher
and Subscriber
will be available for clients.
Iterable is returning results when they are send on the other side of the channel. This way you see result one by one (as they are ready). Without streaming option your request will be executed as shown on Figure 1.
On the other hand if you use streaming you can receive results as they are processed (see Figure 2).
JUnit Client as reactive Reader/Subscriber
Client implementation (as a JUnit test, that's why external ip adresses are used for JLupinDelegator
).
public class IntegrationTest {
private JLupinDelegator jLupinDelegator;
@Before
public void before() throws Throwable {
jLupinDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000, PortType.JLRMC, 1000, 3000000,
new JLupinMainServerInZoneConfiguration[]{
new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
}
);
try {
jLupinDelegator.start();
} catch (JLupinDelegatorException e) {
throw new IllegalStateException("an error occurred caused by:", e);
}
}
private JLupinChannelManagerService getJLupinChannelManagerService() {
return JLupinClientUtil.generateRemote(jLupinDelegator, "channelMicroservice", "jLupinChannelManagerService" , JLupinChannelManagerService.class);
}
private SampleChannelWriter getSampleChannelWriter() {
return JLupinClientUtil.generateRemote(jLupinDelegator, "sampleWriterMicroservice", "sampleChannelWriter" , SampleChannelWriter.class);
}
private JLupinClientChannelIterableProducer getJLupinClientChannelIterableProducer() {
return new JLupinClientChannelIterableProducer(getJLupinChannelManagerService());
}
@Test
public void taskTest() throws Throwable {
final SampleChannelWriter sampleChannelWriter = getSampleChannelWriter();
final String streamId = sampleChannelWriter.openStreamChannelAndStartWriteToIt();
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
Iterable iterable = getJLupinClientChannelIterableProducer().produceChannelIterable("SAMPLE", streamId);
Iterator iterator = iterable.iterator();
while (iterator.hasNext()) {
Object o = iterator.next();
System.out.println("received object:" + o);
}
}
});
t.start();
t.join();
}
}