Reactive Queues usage example

Please read queues' overview first before continuing reading.

Let's assume that we want to call our service asynchronously and retrieve result in reactive way. We will use service already known from other parts of documentation. The name of microservice we will be calling is sampleMicroservice with service named exampleService. It's interface and implementation:

public interface ExampleService {
    Integer exampleMethod(Integer i);
}
@Service("exampleService")
public class ExampleServiceImpl implements ExampleService {
    @Override
    public Integer exampleMethod(Integer i) {
        return 2 * i;
    }
}

How to configure communication interfaces is shown in communication chapter. Here we will show how to use reactive queues regardles the client type (JUnit client or servlet microservice - Spring Boot by default).

Queue configuration

Queue named SAMPLE configuration in file queues.yml in microservice queueMicroservice:

DELEGATOR:
    howOftenCheckingServerInMillisByDelegator: 5000
    repeatsAmountByDelegator: 4
    timeToWaitBetweenRepeatProbeInMillisByDelegator: 1000
QUEUES:
  SAMPLE:
    innerQueuesAmount: 4
    waitTimeBetweenCheckingTaskReadyToStartInMillis: 500
    storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.queue.impl.memory.JLupinMemoryQueueStorageImpl'
    threadAmount: 128
    maxSendProbeAmount: 2
    maxAcceptExceptionAmount: 1
    exceptionStringToRepeat: 'java.lang.Exception'
    garbageThreadAmount: 4
    howLongTaskStatusWillBeInATransientStateInMillis: 1000
    howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis: 30000
    howLongTaskInputShouldBeOnQueueWithoutResultItInMillis: 60000
    waitTimeBetweenCheckingTaskByGarbageManagerInMillis: 5000
    delaySendProcessTaskToExecuteInMillis: 0

Spring Boot client

Spring Boot supports usage of websockets which are perfect to show queues in action. Lets start with Spring Boot configuration (microservice name is example):

package com.example.configuration;

import com.jlupin.impl.client.delegator.balance.JLupinQueueLoadBalancerDelegatorImpl;
import com.jlupin.impl.client.util.JLupinClientUtil;
import com.jlupin.impl.client.util.queue.JLupinClientQueueUtil;
import com.jlupin.interfaces.client.delegator.JLupinDelegator;
import com.jlupin.interfaces.common.enums.PortType;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.queue.JLupinQueueManagerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
@ComponentScan({
        "com.example",
        "com.jlupin.servlet.monitor.configuration"
})
public class ExampleSpringConfiguration {
    @Bean
    public JLupinDelegator getQueueJLupinDelegator() {
        final JLupinDelegator jLupinDelegator = JLupinClientUtil.generateInnerMicroserviceLoadBalancerDelegator(PortType.QUEUE);
        ((JLupinQueueLoadBalancerDelegatorImpl) jLupinDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);
        return jLupinDelegator;
    }

    @Bean
    public JLupinQueueManagerService getJLupinQueueManagerService() {
        return JLupinClientUtil.generateRemote(getQueueJLupinDelegator(), "queueMicroservice", "jLupinQueueManagerService", JLupinQueueManagerService.class);
    }

    @Bean(name = "sampleQueueClientUtil")
    public JLupinClientQueueUtil getSampleQueueClientUtil() {
        return new JLupinClientQueueUtil("SAMPLE", getJLupinQueueManagerService());
    }

    @PostConstruct
    public void start() {
        final JLupinClientQueueUtil sampleQueueClientUtil = getSampleQueueClientUtil();
        sampleQueueClientUtil.start();
    }

    @PreDestroy
    public void stop() {
        final JLupinClientQueueUtil sampleQueueClientUtil = getSampleQueueClientUtil();
        sampleQueueClientUtil.stop();
    }
}

Also enable and configure web socket. To do so you must implement interface called WebSocketConfigurer and put annotations @Configuration and @EnableWebSocket. This way Spring Boot will know it must enable web sockets support. You also need to bind your WebSocketHandler implementation to some path (in this example it will be "/process").

package com.example.configuration;

import com.example.websocket.handler.WebSocketHandlerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfigurerImpl implements WebSocketConfigurer {
    @Autowired
    private WebSocketHandlerImpl webSocketHandler;

    @Override
    public void registerWebSocketHandlers(final WebSocketHandlerRegistry webSocketHandlerRegistry) {
        webSocketHandlerRegistry.addHandler(webSocketHandler, "/process")
                .setAllowedOrigins("*");
    }
}

Now we are going to create web socket handler. Becuase we will use text communication you just need to extend TextWebSocketHandler class. By default it will handle opening and closing websocket and has convinient method handleTextMessage which is called when new message is received. Please notice also that WebSocketSession is passed as argument. This is due to fact that singleton class is handling multiple opened web socket connections. So this argument is not same every method invocation. We are also using it with closure to send response back to proper client (we want to asnwer on same connection which was used to call our service). Below implementation is shown:

package com.example.websocket.handler;

import com.jlupin.impl.client.util.queue.JLupinClientQueueUtil;
import com.jlupin.interfaces.function.JLupinQueueReactiveFunction;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.queue.JLupinQueueManagerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;

@Component
public class WebSocketHandlerImpl extends TextWebSocketHandler {
    private final Logger logger = LoggerFactory.getLogger(WebSocketHandlerImpl.class);

    @Autowired
    @Qualifier("sampleQueueClientUtil")
    private JLupinClientQueueUtil sampleQueueClientUtil;

    @Override
    protected void handleTextMessage(final WebSocketSession session, final TextMessage message) throws Exception {
        final String taskId = sampleQueueClientUtil.putTaskInput(
                "sampleMicroservice",
                "exampleService",
                "exampleMethod",
                new Object[]{
                        new Integer(message.getPayload())
                }
        );

        sampleQueueClientUtil.registerFunctionOnTaskResult(
                taskId,
                new JLupinQueueReactiveFunction() {
                    @Override
                    public void onSuccess(final String taskId, final Object result) {
                        try {
                            session.sendMessage(new TextMessage(((Integer) result).toString()));
                        } catch (IOException e) {
                            logger.error("Error while sending message.", e);
                        }
                    }

                    @Override
                    public void onError(final String taskId, final Throwable throwable) {
                        try {
                            session.sendMessage(new TextMessage(throwable.getMessage()));
                        } catch (IOException e) {
                            logger.error("Error while sending message.", e);
                        }
                    }
                }
        );
    }
}

As you can see result will be sent to user when computed in other thread than the one processing handleTestMessage method. Now create static directory in your resources direcotry and put file to test (index.html):

<!DOCTYPE html>
<head>
<meta charset="utf-8" />
<title>WebSocket Test</title>
</head>
  <h2>WebSocket Test</h2>

  <input type="text" id="input"/>
  <button onclick="doSend()">Send</button>
  <div id="output"></div>

</div>

  <script language="javascript" type="text/javascript">
    var wsUri = "ws://localhost:8000/example/process";
    var input;
    var output;

    function init()
    {
      input = document.getElementById("input");
      output = document.getElementById("output");
      testWebSocket();
    }

    function testWebSocket()
    {
      websocket = new WebSocket(wsUri);
      websocket.onopen = function(evt) { onOpen(evt) };
      websocket.onclose = function(evt) { onClose(evt) };
      websocket.onmessage = function(evt) { onMessage(evt) };
      websocket.onerror = function(evt) { onError(evt) };
    }

    function onOpen(evt)
    {
      writeToScreen("CONNECTED");
    }

    function onClose(evt)
    {
      writeToScreen("DISCONNECTED");
    }

    function onMessage(evt)
    {
      writeToScreen('<span style="color: blue;">RESPONSE:</span> ' + evt.data);
    }

    function onError(evt)
    {
      writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);
    }

    function doSend()
    {
      writeToScreen("SENT: " + input.value);
      websocket.send(input.value);
      input.value = "";
    }

    function writeToScreen(message)
    {
      var pre = document.createElement("p");
      pre.style.wordWrap = "break-word";
      pre.innerHTML = message;
      output.appendChild(pre);
    }

    window.addEventListener("load", init, false);
  </script>
</body>
</html>

Make sure that your servlet microservice has proper name and type some number. Then click Send and see what happened. Your request is send by websocket to servlet microservice, then servlet microservice is putting task on queue which is executed by native microservice. Result is handled by servlet microservice and returned to client by same websocket.

JUnit client

Client implementation (as a JUnit test, that's why external ip addresses are used for JLupinDelegator).

Communication delegator definition

jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000,PortType.QUEUE, 1000, 3000000,
        new JLupinMainServerInZoneConfiguration[]{
                new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
        }
);

((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);

try {
    jLupinQueueDelegator.start();
} catch (JLupinDelegatorException e) {
    throw new IllegalStateException("an error occurred caused by:", e);
}

Execution - putting task in queue

In below example we will put task to execute using queue interface on microservice sampleMicroservice, service exampleService, method exampleMethod with one parameter of Integer type - we execute all with queue named SAMPLE which configuration is shown in section Queue configuration. After putting task in queue we don't wait for the result in same thread but we register reactive (event) function, which will be executed in different thread - here just for example we sleep main thread of taskTest_1 method to show that despite that main thread is asleep result is ractively received.

public class IntegrationTest {
    private JLupinDelegator jLupinQueueDelegator;
    private JLupinQueueManagerService jLupinQueueManagerService;
    private JLupinClientQueueUtil jLupinClientQueueUtil;

    @Before
    public void before() throws Throwable {
        jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000, PortType.QUEUE, 1000, 3000000,
                new JLupinMainServerInZoneConfiguration[]{
                        new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
                }
        );
        jLupinQueueManagerService = JLupinClientUtil.generateRemote(jLupinQueueDelegator, "queueMicroservice", "jLupinQueueManagerService" , JLupinQueueManagerService.class);
        jLupinClientQueueUtil = new JLupinClientQueueUtil("SAMPLE", jLupinQueueManagerService);

        ((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);

        try {
            jLupinQueueDelegator.start();
        } catch (JLupinDelegatorException e) {
            throw new IllegalStateException("an error occurred caused by:", e);
        }
        jLupinClientQueueUtil.start();
    }

    @After
    public void after() throws Throwable {
        jLupinQueueDelegator.stop();
        jLupinClientQueueUtil.stop();
    }

    @Test
    public void taskTest_1() throws Throwable {
        final String taskId = jLupinQueueManagerService.putTaskInput(
            "SAMPLE",
            "sampleMicroservice",
            "exampleService",
            "exampleMethod",
            new Object[]{
                new Integer(10)
            }
        );

        jLupinClientQueueUtil.registerFunctionOnTaskResult(taskId,new JLupinQueueReactiveFunction() {
            @Override
            public void onSuccess(String taskId, Object result) {
                System.out.println("on success with task id:" + taskId + " and result object:" + result);
            }

            @Override
            public void onError(String taskId, Throwable th) {
                System.out.println("on error with task id:" + taskId + " and exception object:" + th.getMessage());
            }
        });

        Thread.currentThread().sleep(10000);
    }
}

Result

on success with task id: 1234509 and result object: 20