Overview
JLupin provides reactive queues with an option of automated retrieving results - of course in microservice architecture. Queues are defined on specific native microservices (queue microservices). The number of queues defined on the particular queue microservice as well as the number of queue microservices depends of your needs, processing characteristic, security rules and infrastructure resources. In that way you can build the queue subsystem in microservice architecture, where queues are separated by running its own JVMs. This way your queues will be more resistant to failures, outages, overloads, etc.
We recommend to define one queue per one queue microservice if the capacity of the infrastructure allows you to do it.
The most important thing is that JLupin Reactive Queues are the additional layer to your microservices that you don't need to implement, integrate or refer to in any way. They transparently covers your code providing high efficient reactive, asynchronous, distributed reactive bus.
Architecture
Messages processing
Queues are provided on queue microservices, which are native microservices (so it has a standard configuration.yml
file) with additional configuration in queues.yml
file, where queues definitions are provided:
Every queue consists of innerQueuesAmount
number of inner queues. At any given moment one inner queue is active for writing, into which QUEUE entry point (provided by a native microservice) puts asynchronous requests (messages) from clients. The rest of inner queues are opened in read mode for messages processing (routing and executing on the target microservices by delegators).
After waitTimeBetweenCheckingTaskReadyToStartInMillis
period of time (in milliseconds) Queue Manager opens the next inner queue for writing (closed for message processing), while the previous one is opened in read mode and messages processing is started. Messages are processed by threadAmount
number of threads, no sooner than delaySendProcessTaskToExecuteInMillis
period of time (in milliseconds). On each thread the message handler - delegator, tries to send a message to the target microservice in maxSendProbeAmount
number of attempts, if it's not succeed the message are discarded.
Queues are also capable of receiving and storing responses to previously sent messages to microservices. They wait howLongTaskInputShouldBeOnQueueWithoutResultItInMillis
milliseconds of time waiting for the response, if the time is exceeded, messages are discarded. if the result comes with en exception (identified through regular expression by exceptionStringToRepeat
string) the associated request will be repeated maxAcceptExceptionAmount
times.After receiving the answer, it waits for howLongTaskResultShouldBeOnQueueWithoutDownloadItInMillis
until it is retrieved by the client from the queue.
Reactive data flows
Lets assume that web application WebApp1
makes an asynchronous request to the application microservice app_A
using SIMLPLE
queue located on queues_1
queue microservive. It uses reactive communication schema, as show on the following diagram:
The whole process has the following stages:
- The web application
WebApp1
sends a asynchronous request to a service located on theapp_A
. It usesSIMPLE
queues onqueues_1
and registers callback function for the response. The communication is done through QUEUE entry points on Main Server. - The
WebApp1
receives confirmation fromqueues_1
of accepting the request for processing by theSIMPLE
queue (through Main Server), after which the associated threat with the request is released.WebApp1
threats are not engaged in the response handling until it's ready on a queue. The response handling byWebApp1
is described in the next points (7,8). - The
SIMPLE
queue handles the request and through the delegatorrepeatsAmountByDelegator
times tries to find the available instance ofapp_A
microservice that should process the request. If it not succeeded the queue delegator waitstimeToWaitBetweenRepeatProbeInMillisByDelegator
period of time to try again. If it is succeeded the delegator sends the request to the target microserive using QUEUE entry point on Main Server. - The
app_A
microservice sends confirmation toqueues_1
microservice of accepting the request for processing (the communication between a queue and a target application is also asynchronous). - After the requests is processed the delegator located on QUEUE entry point of the target application microservice tries
repeatsAmount
times to find available instance ofSIMPLE
queue anqueues_1
microservice and send the response. If it not succeeded the queue delegator waitstimeToWaitBetweenRepeatProbeInMillis
period of time to try again. If it is succeeded the delegator sends the response to the queue using QUEUE entry point on Main Server. These parameters are defined in the target application microservice configuration file, see this chapter to get more information. - The
queues_1
microservice confirms successful reception of the response. - If a callback function has been registered during sending the request by
WebApp1
microservice the process of sending back the response to the client is performed. It engages a new threat from the thread pool, which executes the callback function. The response waitshowLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis
period of time to be sent to the client. After this time, the response are discarded by the garbage collection process. - The confirmation of receiving the response is sent by the client (
WebApp1
) toqueues_1
microservice.
Of course, these reactive flow works fine in case where are many nodes with different set of components - in distributed architecture. For example where web applications, queues and applications are located on different set of nodes (at least two) as shown on the picture below:
If would like to try it in practice take a look at developer guide.
Configuration
Configuration files are provided with queue microservices. If consists of two files:
configuration.yml
This is a standard configuration file for native microservices, where some of the parameters have been adjusted to characteristic of their functions in the environment (queuing) and should be always considered in the process of system design:
- amount of memory (HEAP)
- number of threads
SERVERS:
JLRMC: #JLupin Remote Method Calls Fast Protocol:
readTimeout: 480000
isWaitForFinishExecuteAllRequests: true
waitToShutdownThreadsOnStop: 60000
backlog: 0
receiveBufferSize: 0
isReuseAddress: false
threadPoolSize: 128
isLogPeriodicOnDebug: true
isDestroyThreadOnTimeout: false
threadExecutingTimeOut: 240000
TRANSMISSION:
readTimeout: 480000
isWaitForFinishExecuteAllRequests: false
waitToShutdownThreadsOnStop: 60000
backlog: 0
receiveBufferSize: 0
isReuseAddress: false
threadPoolSize: 8
isLogPeriodicOnDebug: true
isDestroyThreadOnTimeout: false
threadExecutingTimeOut: 3600000
QUEUE:
readTimeout: 480000
isWaitForFinishExecuteAllRequests: true
waitToShutdownThreadsOnStop: 60000
backlog: 256
receiveBufferSize: 256
isReuseAddress: false
threadPoolSize: 128
isLogPeriodicOnDebug: true
isDestroyThreadOnTimeout: false
threadExecutingTimeOut: 240000
ENTRY_POINTS:
QUEUE:
threadAmount: 128
howOftenCheckingServerInMillis: 5000
repeatsAmount: 4
timeToWaitBetweenRepeatProbeInMillis: 1000
PROPERTIES:
#jvmOptions1: '-Xms128M -Xmx256M -agentlib:jdwp=transport=dt_socket,address=12998,server=y,suspend=n'
jvmOptions1: '-Xms64M -Xmx128M' #jvmOptions_2 - default the same as jvmOptions_1
#jvmOptions2: '-Xms128M -Xmx256M'
externalPort: '8000'
version: '1.0.0-RC1'
switchDelayTime: 0
connectionSocketTimeoutInMillis: 1000
readTimeoutInMillis: 30000
isKeepAlive: false
isOOBInline: false
isTcpNoDelay: false
isReuseAddress: false
sendBufferSize: 0
receiveBufferSize: 0
soLinger: 0
trafficClass: 0
#javaExecutablePath: 'c:\\jvm\\bin\\java.exe'
#additionalClassPath: 'c:\\temp\\*'
isStartOnMainServerInitialize: true
priorityStartOnMainServerInitialize: 2
waitForProcessInitResponseTimeInMillis: 900000
waitForProcessStartResponseTimeInMillis: 900000
waitForProcessDestroyResponseTimeInMillis: 30000
isAllFilesToJVMAppClassLoader: false
isArchiveOnStart: false
startLogMode: INFO
isInitErrorCauseWithNetworkInformation: true
checkAvailableScript: 'function isAvailable(checkResponseTimeInMillis, jrmcActiveThreads, jrmcMaxThreads,
queueActiveThreads, queueMaxThreads, servletActiveThreads, servletMaxThreads,
jvmMaxMemoryInBytes, jvmTotalMemoryInBytes, jvmFreeMemoryInBytes,
jvmProcessCpuLoadInPercentage, userAvailableFlag) {
var isAvailableByUser = Boolean(userAvailableFlag);
if(checkResponseTimeInMillis > 20000 || !isAvailableByUser) {
return false;
}
return true;
}'
APPLICATION:
applicationContainerProducerClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.jlupin.configuration.JLupinApplicationContainerProducerImpl'
INITIALIZING_LOGGER:
#directoryPath: '/logs/server'
#fileName: 'file_name'
fileExtension: 'log'
fileSizeInMB: 20
maxFiles: 10
MEMORY_ERRORS:
isRestartOnError: true
howManyTimes: 4
percentageGrowth: 25
isHeapDump: true
queues.yml
This file contains queues definitions and their parameters.
Section 'DELEGATOR'
The appropriate part of the configuration file:
DELEGATOR:
howOftenCheckingServerInMillisByDelegator: 5000
repeatsAmountByDelegator: 4
timeToWaitBetweenRepeatProbeInMillisByDelegator: 1000
Description:
Parameter | Description |
---|---|
howOftenCheckingServerInMillisByDelegator |
The loop period (expressed in milliseconds) in which the queue delegator checks if there are available microserives ready to process asynchronous requests stored on a queue. |
repeatsAmountByDelegator |
The number of attempts made by the delegator to deliver the request to a microservice (during each attempt, the delegator checks the entire environment available to him in order to find an available microservices) |
timeToWaitBetweenRepeatProbeInMillisByDelegator |
The time period (expressed in milliseconds) at which the queue delegator waits between successive attempts to execute results. |
Section 'QUEUES'
Each entry in this section is a definition of a queues. The name of the entry is the name of a queue that should be used during asynchronous invocations. Each queues has the following set of parameters:
The appropriate part of the configuration file:
QUEUES:
SAMPLE:
innerQueuesAmount: 4
waitTimeBetweenCheckingTaskReadyToStartInMillis: 500
storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.queue.impl.memory.JLupinMemoryQueueStorageImpl'
threadAmount: 128
maxSendProbeAmount: 2
maxAcceptExceptionAmount: 1
exceptionStringToRepeat: 'java.lang.Exception'
garbageThreadAmount: 4
howLongTaskStatusWillBeInATransientStateInMillis: 1000
howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis: 30000
howLongTaskInputShouldBeOnQueueWithoutResultItInMillis: 60000
waitTimeBetweenCheckingTaskByGarbageManagerInMillis: 5000
delaySendProcessTaskToExecuteInMillis: 0
Description:
Parameter | Description |
---|---|
innerQueuesAmount |
The number of inner queues that a queue consists of. |
waitTimeBetweenCheckingTaskReadyToStartInMillis |
The time in milliseconds, after which the inner queue is switched from writing mode to reading mode. |
storageClassName |
The implementation class of storage, where requestes and results are located (default is in-memory) |
threadAmount |
The number of threads assigned to each inner queues for handling asynchronous requests. |
maxSendProbeAmount |
The number of attempts that a thread processes the request. When maxSendProbeAmount is exceeded, the request is removed from the queue. |
maxAcceptExceptionAmount |
The additional attempts in case of catching exceptionStringToRepeat exception (the whole stack trace is checked with regular expression) |
exceptionStringToRepeat |
The string that identifies exceptions while processing responses on a queue (important if maxAcceptExceptionAmount > 0). |
garbageThreadAmount |
The number of threads assigned to queue garbage collection process (cleaning queues). |
howLongTaskStatusWillBeInATransientStateInMillis |
The maximum amount of time that a request or response in inconsistent state is kept by a queue. After this time a request or resposnse that is still in inconsistent state is discarded by the queue and removed by the garbage collector. |
howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis |
The maximum time in milliseconds that results are kept in a queue without downloading them by the clients. |
howLongTaskInputShouldBeOnQueueWithoutResultItInMillis |
The maximum time in milliseconds that requests are kept in a queue without getting results from microservices. |
waitTimeBetweenCheckingTaskByGarbageManagerInMillis |
The time period (expressed in milliseconds) at which the queue garbage collector waits between checking tasks iterations. |
delaySendProcessTaskToExecuteInMillis |
The minimum delay time in milliseconds between a request comes in and the start of its processing. |