Reactive Channel usage example

Let's assume that there is a native microservice which opens stream for channel to which it will be writing and a client (JUnit) which will be handling that microservice results with reactive iterator.

Native microservice (sampleWriterMicroservice) as reactive Writer/Publisher:

Interface:

public interface SampleChannelWriter {
    String openStreamChannelAndStartWriteToIt() throws Throwable;
}

Implementation:

@Service("sampleChannelWriter")
public class SampleChannelWriterImpl implements SampleChannelWriter {

    @Autowired
    private ExecutorService executorService;

    @Autowired
    JLupinClientChannelUtil jLupinClientChannelUtil;

    @Override
    public String openStreamChannelAndStartWriteToIt() throws Throwable {
        String streamChannelId = jLupinClientChannelUtil.openStreamChannel();

        executorService.submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                for(int i = 0 ; i < 5; i ++) {
                    jLupinClientChannelUtil.putNextElementToStreamChannel(streamChannelId, new Integer(i));
                    Thread.currentThread().sleep(1000);
                }
                jLupinClientChannelUtil.closeStreamChannel(streamChannelId);
                return null;
            }
        });
        return streamChannelId;
    }
}

In above implementation Thread.currentThread().sleep(1000) line is just to simulate long computation - it is not necessary to use it in your code.

Beans' configuration:

@Bean(name="jLupinClientChannelUtil")
public JLupinClientChannelUtil getJLupinClientChannelUtil() {
    JLupinChannelManagerService jLupinChannelManagerService = JLupinClientUtil.generateRemote(
        getLupinDelegator(), "channelMicroservice", "jLupinChannelManagerService", JLupinChannelManagerService.class);

    return new JLupinClientChannelUtil("SAMPLE", jLupinChannelManagerService);
}

@Bean(name="executorService")
public ExecutorService getExecutorService() {
    return JLupinClientUtil.getExecutorServiceByNameManagedByJLupin("THREAD_POOL_1");
}

As you can see from implementation method openStreamChannelAndStartWriteToIt opends new stream for desired channel and write to it in seperate thread. Stream is dynamically generated (ephermal), opened and then closed just for time when communication between publishers and subscribers is taking place. Of course in real application other parts of system can just write to stream in channel - only knowledge of stream id is required (in this example variable streamChannelId). Additional thread pool is offloading main thread pool defined in microservice.yml for other services and their methods.

Channel configuration

Channel named SAMPLE configuration in file channels.yml in microservice channelMicroservice:

CHANNELS:
  SAMPLE:
    garbageThreadAmount: 4
    channelTimeOutInMilliseconds: 60000
    elementTimeOutInMilliseconds: 30000
    waitTimeBetweenCheckingChannelsInMilliseconds: 5000
    storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.streamchannel.impl.memory.JLupinMemoryStreamChannelStorageImpl'

Spring Boot as reactive Reader/Subscriber

Spring Boot 2 icludes stream support by special types of objects. If the Flux type is returned Spring Boot allows to stream result instead of returning whole result at the end of computation. Configuration file:

package com.demo.configuration;

import com.demo.service.interfaces.SampleChannelWriter;
import com.jlupin.impl.client.util.JLupinClientUtil;
import com.jlupin.impl.client.util.channel.JLupinClientChannelIterableProducer;
import com.jlupin.interfaces.client.delegator.JLupinDelegator;
import com.jlupin.interfaces.common.enums.PortType;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.channel.JLupinChannelManagerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;


@Configuration
@ComponentScan({
        "com.demo",
        "com.jlupin.servlet.monitor.configuration"
})
@EnableWebFlux
public class DemoSpringConfiguration {
    @Bean
    public JLupinDelegator getJLRMCJLupinDelegator() {
        return JLupinClientUtil.generateInnerMicroserviceLoadBalancerDelegator(PortType.JLRMC);
    }

    @Bean
    public JLupinChannelManagerService getJLupinChannelManagerService() {
        return JLupinClientUtil.generateRemote(getJLRMCJLupinDelegator(), "channelMicroservice", "jLupinChannelManagerService", JLupinChannelManagerService.class);
    }

    @Bean
    public JLupinClientChannelIterableProducer getJLupinClientChannelIterableProducer() {
        return new JLupinClientChannelIterableProducer(getJLupinChannelManagerService());
    }

    @Bean(name = "sampleChannelWriter")
    public SampleChannelWriter getSampleChannelWriter() {
        return JLupinClientUtil.generateRemote(getJLRMCJLupinDelegator(), "sampleWriterMicroservice", SampleChannelWriter.class);
    }
}

Spring Boot controller:

package com.demo.controller;

import com.demo.service.interfaces.SampleChannelWriter;
import com.jlupin.impl.client.util.channel.JLupinClientChannelIterableProducer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.util.Iterator;

@RestController
public class ReactiveInfoController {
    @Autowired
    private SampleChannelWriter sampleChannelWriter;
    @Autowired
    private JLupinClientChannelIterableProducer jLupinClientChannelIterableProducer;

    @GetMapping("/list")
    public Flux<Integer> list() throws Throwable {
        final String streamId = sampleChannelWriter.openStreamChannelAndStartWriteToIt();

        final Iterable iterable = jLupinClientChannelIterableProducer.produceChannelIterable("SAMPLE", streamId);

        return Flux.from(new Publisher<Integer>() {
            @Override
            public void subscribe(final Subscriber<? super Integer> subscriber) {
                subscriber.onSubscribe(new MySubscription(subscriber, iterable.iterator()));
            }

            class MySubscription implements Subscription {
                private final Subscriber<? super Integer> subscriber;
                private final Iterator<Integer> iterator;
                private boolean canceled;

                public MySubscription(final Subscriber<? super Integer> subscriber, final Iterator<Integer> iterator) {
                    this.subscriber = subscriber;
                    this.iterator = iterator;
                    this.canceled = false;
                }

                @Override
                public void request(final long l) {
                    for (long i = 0; i < l && !canceled; ++i) {
                        if (iterator.hasNext()) {
                            final Integer res = iterator.next();
                            subscriber.onNext(res);
                        } else {
                            subscriber.onComplete();
                            break;
                        }
                    }
                }

                @Override
                public void cancel() {
                    this.canceled = true;
                }
            }
        });
    }
}

Because Spring Boot uses WebFlux which is using it's own Subscriber type (not standard Java type from java.util.concurrent) JLupin doesn't support returning WebFlux's Subscriber. It's due to that JLupin does not want to be depndant on other libraries. You must create publisher by yourself as shown in example above. In feature release standard Java Publisher and Subscriber will be available for clients.

Iterable is returning results when they are send on the other side of the channel. This way you see result one by one (as they are ready). Without streaming option your request will be executed as shown on Figure 1.

Figure 1. Call of example microservice without stremaing.

On the other hand if you use streaming you can receive results as they are processed (see Figure 2).

Figure 2. Call of example microservice with streaming.

JUnit Client as reactive Reader/Subscriber

Client implementation (as a JUnit test, that's why external ip adresses are used for JLupinDelegator).

public class IntegrationTest {
    private JLupinDelegator jLupinDelegator;

    @Before
    public void before() throws Throwable {
        jLupinDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000, PortType.JLRMC, 1000, 3000000,
                new JLupinMainServerInZoneConfiguration[]{
                        new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
                }
        );

        try {
            jLupinDelegator.start();
        } catch (JLupinDelegatorException e) {
            throw new IllegalStateException("an error occurred caused by:", e);
        }
    }

    private JLupinChannelManagerService getJLupinChannelManagerService() {
        return JLupinClientUtil.generateRemote(jLupinDelegator, "channelMicroservice", "jLupinChannelManagerService" , JLupinChannelManagerService.class);
    }

    private SampleChannelWriter getSampleChannelWriter() {
        return JLupinClientUtil.generateRemote(jLupinDelegator, "sampleWriterMicroservice", "sampleChannelWriter" , SampleChannelWriter.class);
    }

    private JLupinClientChannelIterableProducer getJLupinClientChannelIterableProducer() {
        return new JLupinClientChannelIterableProducer(getJLupinChannelManagerService());
    }

    @Test
    public void taskTest() throws Throwable {
        final SampleChannelWriter sampleChannelWriter = getSampleChannelWriter();
        final String streamId = sampleChannelWriter.openStreamChannelAndStartWriteToIt();

        final Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                  Iterable iterable = getJLupinClientChannelIterableProducer().produceChannelIterable("SAMPLE", streamId);
                  Iterator iterator = iterable.iterator();
                  while (iterator.hasNext()) {
                         Object o = iterator.next();
                         System.out.println("received object:" + o);
                  }
            }
        });

        t.start();
        t.join();
    }
}