Reactive Queues usage example
Please read queues' overview first before continuing reading.
Let's assume that we want to call our service asynchronously and retrieve result in reactive way. We will use service already known from other parts of documentation. The name of microservice we will be calling is sampleMicroservice
with service named exampleService
. It's interface and implementation:
public interface ExampleService {
Integer exampleMethod(Integer i);
}
@Service("exampleService")
public class ExampleServiceImpl implements ExampleService {
@Override
public Integer exampleMethod(Integer i) {
return 2 * i;
}
}
How to configure communication interfaces is shown in communication chapter. Here we will show how to use reactive queues regardles the client type (JUnit client or servlet microservice - Spring Boot by default).
Queue configuration
Queue named SAMPLE
configuration in file queues.yml
in microservice queueMicroservice
:
DELEGATOR:
howOftenCheckingServerInMillisByDelegator: 5000
repeatsAmountByDelegator: 4
timeToWaitBetweenRepeatProbeInMillisByDelegator: 1000
QUEUES:
SAMPLE:
innerQueuesAmount: 4
waitTimeBetweenCheckingTaskReadyToStartInMillis: 500
storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.queue.impl.memory.JLupinMemoryQueueStorageImpl'
threadAmount: 128
maxSendProbeAmount: 2
maxAcceptExceptionAmount: 1
exceptionStringToRepeat: 'java.lang.Exception'
garbageThreadAmount: 4
howLongTaskStatusWillBeInATransientStateInMillis: 1000
howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis: 30000
howLongTaskInputShouldBeOnQueueWithoutResultItInMillis: 60000
waitTimeBetweenCheckingTaskByGarbageManagerInMillis: 5000
delaySendProcessTaskToExecuteInMillis: 0
Spring Boot client
Spring Boot supports usage of websockets which are perfect to show queues in action. Lets start with Spring Boot configuration (microservice name is example
):
package com.example.configuration;
import com.jlupin.impl.client.delegator.balance.JLupinQueueLoadBalancerDelegatorImpl;
import com.jlupin.impl.client.util.JLupinClientUtil;
import com.jlupin.impl.client.util.queue.JLupinClientQueueUtil;
import com.jlupin.interfaces.client.delegator.JLupinDelegator;
import com.jlupin.interfaces.common.enums.PortType;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.queue.JLupinQueueManagerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Configuration
@ComponentScan({
"com.example",
"com.jlupin.servlet.monitor.configuration"
})
public class ExampleSpringConfiguration {
@Bean
public JLupinDelegator getQueueJLupinDelegator() {
final JLupinDelegator jLupinDelegator = JLupinClientUtil.generateInnerMicroserviceLoadBalancerDelegator(PortType.QUEUE);
((JLupinQueueLoadBalancerDelegatorImpl) jLupinDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);
return jLupinDelegator;
}
@Bean
public JLupinQueueManagerService getJLupinQueueManagerService() {
return JLupinClientUtil.generateRemote(getQueueJLupinDelegator(), "queueMicroservice", "jLupinQueueManagerService", JLupinQueueManagerService.class);
}
@Bean(name = "sampleQueueClientUtil")
public JLupinClientQueueUtil getSampleQueueClientUtil() {
return new JLupinClientQueueUtil("SAMPLE", getJLupinQueueManagerService());
}
@PostConstruct
public void start() {
final JLupinClientQueueUtil sampleQueueClientUtil = getSampleQueueClientUtil();
sampleQueueClientUtil.start();
}
@PreDestroy
public void stop() {
final JLupinClientQueueUtil sampleQueueClientUtil = getSampleQueueClientUtil();
sampleQueueClientUtil.stop();
}
}
Also enable and configure web socket. To do so you must implement interface called WebSocketConfigurer
and put annotations @Configuration
and @EnableWebSocket
. This way Spring Boot will know it must enable web sockets support. You also need to bind your WebSocketHandler
implementation to some path (in this example it will be "/process").
package com.example.configuration;
import com.example.websocket.handler.WebSocketHandlerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfigurerImpl implements WebSocketConfigurer {
@Autowired
private WebSocketHandlerImpl webSocketHandler;
@Override
public void registerWebSocketHandlers(final WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(webSocketHandler, "/process")
.setAllowedOrigins("*");
}
}
Now we are going to create web socket handler. Becuase we will use text communication you just need to extend TextWebSocketHandler
class. By default it will handle opening and closing websocket and has convinient method handleTextMessage
which is called when new message is received. Please notice also that WebSocketSession
is passed as argument. This is due to fact that singleton class is handling multiple opened web socket connections. So this argument is not same every method invocation. We are also using it with closure to send response back to proper client (we want to asnwer on same connection which was used to call our service). Below implementation is shown:
package com.example.websocket.handler;
import com.jlupin.impl.client.util.queue.JLupinClientQueueUtil;
import com.jlupin.interfaces.function.JLupinQueueReactiveFunction;
import com.jlupin.interfaces.microservice.partofjlupin.asynchronous.service.queue.JLupinQueueManagerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
@Component
public class WebSocketHandlerImpl extends TextWebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(WebSocketHandlerImpl.class);
@Autowired
@Qualifier("sampleQueueClientUtil")
private JLupinClientQueueUtil sampleQueueClientUtil;
@Override
protected void handleTextMessage(final WebSocketSession session, final TextMessage message) throws Exception {
final String taskId = sampleQueueClientUtil.putTaskInput(
"sampleMicroservice",
"exampleService",
"exampleMethod",
new Object[]{
new Integer(message.getPayload())
}
);
sampleQueueClientUtil.registerFunctionOnTaskResult(
taskId,
new JLupinQueueReactiveFunction() {
@Override
public void onSuccess(final String taskId, final Object result) {
try {
session.sendMessage(new TextMessage(((Integer) result).toString()));
} catch (IOException e) {
logger.error("Error while sending message.", e);
}
}
@Override
public void onError(final String taskId, final Throwable throwable) {
try {
session.sendMessage(new TextMessage(throwable.getMessage()));
} catch (IOException e) {
logger.error("Error while sending message.", e);
}
}
}
);
}
}
As you can see result will be sent to user when computed in other thread than the one processing handleTestMessage
method. Now create static
directory in your resources
direcotry and put file to test (index.html
):
<!DOCTYPE html>
<head>
<meta charset="utf-8" />
<title>WebSocket Test</title>
</head>
<h2>WebSocket Test</h2>
<input type="text" id="input"/>
<button onclick="doSend()">Send</button>
<div id="output"></div>
</div>
<script language="javascript" type="text/javascript">
var wsUri = "ws://localhost:8000/example/process";
var input;
var output;
function init()
{
input = document.getElementById("input");
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket()
{
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) { onOpen(evt) };
websocket.onclose = function(evt) { onClose(evt) };
websocket.onmessage = function(evt) { onMessage(evt) };
websocket.onerror = function(evt) { onError(evt) };
}
function onOpen(evt)
{
writeToScreen("CONNECTED");
}
function onClose(evt)
{
writeToScreen("DISCONNECTED");
}
function onMessage(evt)
{
writeToScreen('<span style="color: blue;">RESPONSE:</span> ' + evt.data);
}
function onError(evt)
{
writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);
}
function doSend()
{
writeToScreen("SENT: " + input.value);
websocket.send(input.value);
input.value = "";
}
function writeToScreen(message)
{
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
window.addEventListener("load", init, false);
</script>
</body>
</html>
Make sure that your servlet microservice has proper name and type some number. Then click Send
and see what happened. Your request is send by websocket to servlet microservice, then servlet microservice is putting task on queue which is executed by native microservice. Result is handled by servlet microservice and returned to client by same websocket.
JUnit client
Client implementation (as a JUnit test, that's why external ip addresses are used for JLupinDelegator
).
Communication delegator definition
jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000,PortType.QUEUE, 1000, 3000000,
new JLupinMainServerInZoneConfiguration[]{
new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
}
);
((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);
try {
jLupinQueueDelegator.start();
} catch (JLupinDelegatorException e) {
throw new IllegalStateException("an error occurred caused by:", e);
}
Execution - putting task in queue
In below example we will put task to execute using queue interface on microservice sampleMicroservice
, service exampleService
, method exampleMethod
with one parameter of Integer
type - we execute all with queue named SAMPLE
which configuration is shown in section Queue configuration. After putting task in queue we don't wait for the result in same thread but we register reactive (event) function, which will be executed in different thread - here just for example we sleep main thread of taskTest_1
method to show that despite that main thread is asleep result is ractively received.
public class IntegrationTest {
private JLupinDelegator jLupinQueueDelegator;
private JLupinQueueManagerService jLupinQueueManagerService;
private JLupinClientQueueUtil jLupinClientQueueUtil;
@Before
public void before() throws Throwable {
jLupinQueueDelegator = JLupinClientUtil.generateOuterMicroserviceLoadBalancerDelegator(5000, 3, 5000, PortType.QUEUE, 1000, 3000000,
new JLupinMainServerInZoneConfiguration[]{
new JLupinMainServerInZoneConfiguration("NODE_1", "127.0.0.1", 9090, 9095, 9096, 9097)
}
);
jLupinQueueManagerService = JLupinClientUtil.generateRemote(jLupinQueueDelegator, "queueMicroservice", "jLupinQueueManagerService" , JLupinQueueManagerService.class);
jLupinClientQueueUtil = new JLupinClientQueueUtil("SAMPLE", jLupinQueueManagerService);
((JLupinQueueLoadBalancerDelegatorImpl) jLupinQueueDelegator).setGetStatusAnalyseAndChooseHighestFromAllEnvironment(true);
try {
jLupinQueueDelegator.start();
} catch (JLupinDelegatorException e) {
throw new IllegalStateException("an error occurred caused by:", e);
}
jLupinClientQueueUtil.start();
}
@After
public void after() throws Throwable {
jLupinQueueDelegator.stop();
jLupinClientQueueUtil.stop();
}
@Test
public void taskTest_1() throws Throwable {
final String taskId = jLupinQueueManagerService.putTaskInput(
"SAMPLE",
"sampleMicroservice",
"exampleService",
"exampleMethod",
new Object[]{
new Integer(10)
}
);
jLupinClientQueueUtil.registerFunctionOnTaskResult(taskId,new JLupinQueueReactiveFunction() {
@Override
public void onSuccess(String taskId, Object result) {
System.out.println("on success with task id:" + taskId + " and result object:" + result);
}
@Override
public void onError(String taskId, Throwable th) {
System.out.println("on error with task id:" + taskId + " and exception object:" + th.getMessage());
}
});
Thread.currentThread().sleep(10000);
}
}
Result
on success with task id: 1234509 and result object: 20