Overview

JLupin provides reactive queues with an option of automated retrieving results - of course in microservice architecture. Queues are defined on specific native microservices (queue microservices). The number of queues defined on the particular queue microservice as well as the number of queue microservices depends of your needs, processing characteristic, security rules and infrastructure resources. In that way you can build the queue subsystem in microservice architecture, where queues are separated by running its own JVMs. This way your queues will be more resistant to failures, outages, overloads, etc.

Figure 1. JLupin Reactive Queues Overview.

We recommend to define one queue per one queue microservice if the capacity of the infrastructure allows you to do it.

The most important thing is that JLupin Reactive Queues are the additional layer to your microservices that you don't need to implement, integrate or refer to in any way. They transparently covers your code providing high efficient reactive, asynchronous, distributed reactive bus.


Architecture

Messages processing

Queues are provided on queue microservices, which are native microservices (so it has a standard configuration.yml file) with additional configuration in queues.yml file, where queues definitions are provided:

Figure 2. JLupin Reactive Queues Architecture.

Every queue consists of innerQueuesAmount number of inner queues. At any given moment one inner queue is active for writing, into which QUEUE entry point (provided by a native microservice) puts asynchronous requests (messages) from clients. The rest of inner queues are opened in read mode for messages processing (routing and executing on the target microservices by delegators).

Figure 3. JLupin Reactive Queues - initial queue filling.

After waitTimeBetweenCheckingTaskReadyToStartInMillis period of time (in milliseconds) Queue Manager opens the next inner queue for writing (closed for message processing), while the previous one is opened in read mode and messages processing is started. Messages are processed by threadAmount number of threads, no sooner than delaySendProcessTaskToExecuteInMillis period of time (in milliseconds). On each thread the message handler - delegator, tries to send a message to the target microservice in maxSendProbeAmount number of attempts, if it's not succeed the message are discarded.

Figure 4. JLupin Reactive Queues - initial queue filling

Queues are also capable of receiving and storing responses to previously sent messages to microservices. They wait howLongTaskInputShouldBeOnQueueWithoutResultItInMillis milliseconds of time waiting for the response, if the time is exceeded, messages are discarded. if the result comes with en exception (identified through regular expression by exceptionStringToRepeat string) the associated request will be repeated maxAcceptExceptionAmount times.After receiving the answer, it waits for howLongTaskResultShouldBeOnQueueWithoutDownloadItInMillis until it is retrieved by the client from the queue.

Reactive data flows

Lets assume that web application WebApp1 makes an asynchronous request to the application microservice app_A using SIMLPLE queue located on queues_1 queue microservive. It uses reactive communication schema, as show on the following diagram:

Figure 5. JLupin Reactive Queues - data flows

The whole process has the following stages:

  1. The web application WebApp1sends a asynchronous request to a service located on the app_A. It uses SIMPLE queues on queues_1 and registers callback function for the response. The communication is done through QUEUE entry points on Main Server.
  2. The WebApp1 receives confirmation from queues_1 of accepting the request for processing by the SIMPLE queue (through Main Server), after which the associated threat with the request is released. WebApp1 threats are not engaged in the response handling until it's ready on a queue. The response handling by WebApp1 is described in the next points (7,8).
  3. The SIMPLE queue handles the request and through the delegator repeatsAmountByDelegator times tries to find the available instance of app_A microservice that should process the request. If it not succeeded the queue delegator waits timeToWaitBetweenRepeatProbeInMillisByDelegator period of time to try again. If it is succeeded the delegator sends the request to the target microserive using QUEUE entry point on Main Server.
  4. The app_A microservice sends confirmation to queues_1 microservice of accepting the request for processing (the communication between a queue and a target application is also asynchronous).
  5. After the requests is processed the delegator located on QUEUE entry point of the target application microservice tries repeatsAmount times to find available instance of SIMPLE queue an queues_1 microservice and send the response. If it not succeeded the queue delegator waits timeToWaitBetweenRepeatProbeInMillis period of time to try again. If it is succeeded the delegator sends the response to the queue using QUEUE entry point on Main Server. These parameters are defined in the target application microservice configuration file, see this chapter to get more information.
  6. The queues_1 microservice confirms successful reception of the response.
  7. If a callback function has been registered during sending the request by WebApp1 microservice the process of sending back the response to the client is performed. It engages a new threat from the thread pool, which executes the callback function. The response waits howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis period of time to be sent to the client. After this time, the response are discarded by the garbage collection process.
  8. The confirmation of receiving the response is sent by the client (WebApp1) to queues_1 microservice.

Of course, these reactive flow works fine in case where are many nodes with different set of components - in distributed architecture. For example where web applications, queues and applications are located on different set of nodes (at least two) as shown on the picture below:

Figure 6. JLupin Reactive Queues - data flows in distributed configuration

If would like to try it in practice take a look at developer guide.


Configuration

Configuration files are provided with queue microservices. If consists of two files:

configuration.yml

This is a standard configuration file for native microservices, where some of the parameters have been adjusted to characteristic of their functions in the environment (queuing) and should be always considered in the process of system design:

  • amount of memory (HEAP)
  • number of threads
SERVERS:
  JLRMC: #JLupin Remote Method Calls Fast Protocol:
    readTimeout: 480000
    isWaitForFinishExecuteAllRequests: true
    waitToShutdownThreadsOnStop: 60000
    backlog: 0
    receiveBufferSize: 0
    isReuseAddress: false
    threadPoolSize: 128
    isLogPeriodicOnDebug: true
    isDestroyThreadOnTimeout: false
    threadExecutingTimeOut: 240000
  TRANSMISSION:
    readTimeout: 480000
    isWaitForFinishExecuteAllRequests: false
    waitToShutdownThreadsOnStop: 60000
    backlog: 0
    receiveBufferSize: 0
    isReuseAddress: false
    threadPoolSize: 8
    isLogPeriodicOnDebug: true
    isDestroyThreadOnTimeout: false
    threadExecutingTimeOut: 3600000
  QUEUE:
     readTimeout: 480000
     isWaitForFinishExecuteAllRequests: true
     waitToShutdownThreadsOnStop: 60000
     backlog: 256
     receiveBufferSize: 256
     isReuseAddress: false
     threadPoolSize: 128
     isLogPeriodicOnDebug: true
     isDestroyThreadOnTimeout: false
     threadExecutingTimeOut: 240000
ENTRY_POINTS:
  QUEUE:
     threadAmount: 128
     howOftenCheckingServerInMillis: 5000
     repeatsAmount: 4
     timeToWaitBetweenRepeatProbeInMillis: 1000
PROPERTIES:
  #jvmOptions1: '-Xms128M -Xmx256M -agentlib:jdwp=transport=dt_socket,address=12998,server=y,suspend=n'
  jvmOptions1: '-Xms64M -Xmx128M' #jvmOptions_2 - default the same as jvmOptions_1
  #jvmOptions2: '-Xms128M -Xmx256M'
  externalPort: '8000'
  version: '1.0.0-RC1'
  switchDelayTime: 0
  connectionSocketTimeoutInMillis: 1000
  readTimeoutInMillis: 30000
  isKeepAlive: false
  isOOBInline: false
  isTcpNoDelay: false
  isReuseAddress: false
  sendBufferSize: 0
  receiveBufferSize: 0
  soLinger: 0
  trafficClass: 0
  #javaExecutablePath: 'c:\\jvm\\bin\\java.exe'
  #additionalClassPath: 'c:\\temp\\*'
  isStartOnMainServerInitialize: true
  priorityStartOnMainServerInitialize: 2
  waitForProcessInitResponseTimeInMillis: 900000
  waitForProcessStartResponseTimeInMillis: 900000
  waitForProcessDestroyResponseTimeInMillis: 30000
  isAllFilesToJVMAppClassLoader: false
  isArchiveOnStart: false
  startLogMode: INFO
  isInitErrorCauseWithNetworkInformation: true
  checkAvailableScript: 'function isAvailable(checkResponseTimeInMillis, jrmcActiveThreads, jrmcMaxThreads,
                                              queueActiveThreads, queueMaxThreads, servletActiveThreads, servletMaxThreads,
                                              jvmMaxMemoryInBytes, jvmTotalMemoryInBytes, jvmFreeMemoryInBytes,
                                              jvmProcessCpuLoadInPercentage, userAvailableFlag) {

                          var isAvailableByUser = Boolean(userAvailableFlag);
                            if(checkResponseTimeInMillis > 20000 || !isAvailableByUser) {
                               return false;
                            }
                            return true;
                         }'
APPLICATION:
  applicationContainerProducerClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.jlupin.configuration.JLupinApplicationContainerProducerImpl'
INITIALIZING_LOGGER:
  #directoryPath: '/logs/server'
  #fileName: 'file_name'
  fileExtension: 'log'
  fileSizeInMB: 20
  maxFiles: 10
MEMORY_ERRORS:
  isRestartOnError: true
  howManyTimes: 4
  percentageGrowth: 25
  isHeapDump: true

queues.yml

This file contains queues definitions and their parameters.

Section 'DELEGATOR'

The appropriate part of the configuration file:

DELEGATOR:
    howOftenCheckingServerInMillisByDelegator: 5000
    repeatsAmountByDelegator: 4
    timeToWaitBetweenRepeatProbeInMillisByDelegator: 1000

Description:

Parameter Description
howOftenCheckingServerInMillisByDelegator The loop period (expressed in milliseconds) in which the queue delegator checks if there are available microserives ready to process asynchronous requests stored on a queue.
repeatsAmountByDelegator The number of attempts made by the delegator to deliver the request to a microservice (during each attempt, the delegator checks the entire environment available to him in order to find an available microservices)
timeToWaitBetweenRepeatProbeInMillisByDelegator The time period (expressed in milliseconds) at which the queue delegator waits between successive attempts to execute results.

Section 'QUEUES'

Each entry in this section is a definition of a queues. The name of the entry is the name of a queue that should be used during asynchronous invocations. Each queues has the following set of parameters:

The appropriate part of the configuration file:

QUEUES:
  SAMPLE:
    innerQueuesAmount: 4
    waitTimeBetweenCheckingTaskReadyToStartInMillis: 500
    storageClassName: 'com.jlupin.impl.microservice.partofjlupin.asynchronous.storage.queue.impl.memory.JLupinMemoryQueueStorageImpl'
    threadAmount: 128
    maxSendProbeAmount: 2
    maxAcceptExceptionAmount: 1
    exceptionStringToRepeat: 'java.lang.Exception'
    garbageThreadAmount: 4
    howLongTaskStatusWillBeInATransientStateInMillis: 1000
    howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis: 30000
    howLongTaskInputShouldBeOnQueueWithoutResultItInMillis: 60000
    waitTimeBetweenCheckingTaskByGarbageManagerInMillis: 5000
    delaySendProcessTaskToExecuteInMillis: 0

Description:

Parameter Description
innerQueuesAmount The number of inner queues that a queue consists of.
waitTimeBetweenCheckingTaskReadyToStartInMillis The time in milliseconds, after which the inner queue is switched from writing mode to reading mode.
storageClassName The implementation class of storage, where requestes and results are located (default is in-memory)
threadAmount The number of threads assigned to each inner queues for handling asynchronous requests.
maxSendProbeAmount The number of attempts that a thread processes the request. When maxSendProbeAmount is exceeded, the request is removed from the queue.
maxAcceptExceptionAmount The additional attempts in case of catching exceptionStringToRepeat exception (the whole stack trace is checked with regular expression)
exceptionStringToRepeat The string that identifies exceptions while processing responses on a queue (important if maxAcceptExceptionAmount > 0).
garbageThreadAmount The number of threads assigned to queue garbage collection process (cleaning queues).
howLongTaskStatusWillBeInATransientStateInMillis The maximum amount of time that a request or response in inconsistent state is kept by a queue. After this time a request or resposnse that is still in inconsistent state is discarded by the queue and removed by the garbage collector.
howLongTaskResultShouldBeOnQueueWithoutDownloadingItInMillis The maximum time in milliseconds that results are kept in a queue without downloading them by the clients.
howLongTaskInputShouldBeOnQueueWithoutResultItInMillis The maximum time in milliseconds that requests are kept in a queue without getting results from microservices.
waitTimeBetweenCheckingTaskByGarbageManagerInMillis The time period (expressed in milliseconds) at which the queue garbage collector waits between checking tasks iterations.
delaySendProcessTaskToExecuteInMillis The minimum delay time in milliseconds between a request comes in and the start of its processing.