Asynchronous subsystem execution

An API delivered with JLupin Platform's libraries for a business solutions developer enables simple subsystem executions. It also enables simple executions of other code fragments in an asynchronous manner which allows shorter main thread executions.

Figure 1. Asynchronous Subsystem Execution Overview.

Consider the following scenario:

A given code working as the main thread has to handle four requests for the A, B, C, D subsystems.
The requests last approximately:

A – 1 second
B – 1 second
C – 2 seconds
D – 500 milliseconds   

The synchronous execution for the above mentioned subsystem will last approximately 4.5 seconds. If we execute our subsystems asynchronously we may lower the delay to the time of the longest lasting request of the given subsystems - in this case subsystem C - 2 seconds. The most important assumption: Requests to subsystems are non dependent on one another. The analysis of the result may occur after the request execution. Consider the following hypothetical example of executing queries for the above mentioned subsystems.

Service interface:

public interface SubSystemInvokingExample {
    public int synchronousInvoking()  throws Throwable;
    public int asynchronousInvoking() throws Throwable;
    public void asynchronousInvokingWithoutResult() throws Throwable;
}

Implementation:

public class SubSystemInvokingExampleImpl implements SubSystemInvokingExample {

    protected final JLupinConcurrentTaskExecutor jLupinConcurrentTaskExecutor;
    protected static final int THREAD_POOL_SIZE  = 64;
    private JLupinLogger jLupinLogger;

    public SubSystemInvokingExampleImpl() {
        jLupinConcurrentTaskExecutor = new JLupinConcurrentTaskExecutor(THREAD_POOL_SIZE);
        jLupinLogger = JLupinRuntimeContainer.getInstance().getJLupinLogger();
    }

    public void destroy() {
        jLupinConcurrentTaskExecutor.shutdownThreadPool();
    }

    @Override
    public int synchronousInvoking() throws Throwable {
            long startTime = System.currentTimeMillis();

            callASubSystem();
            callBSubSystem(1,2);
            callCSubSystem();
            callDSubSystem(2);

            long stopTime = System.currentTimeMillis() - startTime;

            return (int)(stopTime / 1000);
    }

    @Override
    public int asynchronousInvoking() throws Throwable {
        long startTime = System.currentTimeMillis();

        JLupinBlockingMapTaskCallable callAsynchronousATask = new JLupinBlockingMapTaskCallable("key_A", 2000, null,jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {
                return callASubSystem();
            }
        };

        JLupinBlockingMapTaskCallable callAsynchronousBTask = new JLupinBlockingMapTaskCallable("key_B", 2000, new Object[] {1,2},jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {
                int a = (Integer)args[0];
                int b = (Integer)args[1];

                return callBSubSystem(a,b);

            }
        };

        JLupinBlockingMapTaskCallable callAsynchronousCTask = new JLupinBlockingMapTaskCallable("key_C", 2000, null,jLupinLogger ) {
            @Override
            public Long task(Object[] args) throws Throwable {
                return callCSubSystem();
            }
        };

        JLupinBlockingMapTaskCallable callAsynchronousDTask = new JLupinBlockingMapTaskCallable("key_D", 2000, new Object[] {2},jLupinLogger ) {
            @Override
            public Boolean task(Object[] args) throws Throwable {
                int a = (Integer)args[0];
                return callDSubSystem(a);
            }
        };

        jLupinConcurrentTaskExecutor.execute(callAsynchronousATask,callAsynchronousBTask,callAsynchronousCTask,callAsynchronousDTask);

        try {
            Integer i = callAsynchronousATask.getResultObject("key_A");
            Integer j = callAsynchronousBTask.getResultObject("key_B");
            Long    z = callAsynchronousCTask.getResultObject("key_C");
            Boolean b = callAsynchronousDTask.getResultObject("key_D");

            jLupinLogger.info("received object:%d,%d, %d,%s", new Object[] {i,j,z,b});
        } catch (JLupinTaskCallableException e) {
                jLupinLogger.log(1, LogMode.ERROR, "an error occurred during wait for reading all input stream data caused by:", e);
        }

        long stopTime = System.currentTimeMillis() - startTime;
        return (int)(stopTime / 1000);
    }

    @Override
    public void asynchronousInvokingWithoutResult() throws Throwable {
        long startTime = System.currentTimeMillis();

        JLupinNonBlockingTaskCallable callAsynchronousATask = new JLupinNonBlockingTaskCallable(null,jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {
                return callASubSystem();

            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousBTask = new JLupinNonBlockingTaskCallable(new Object[] {1,2},jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {

                int a = (Integer)args[0];
                int b = (Integer)args[1];

                return callBSubSystem(a,b);

            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousCTask = new JLupinNonBlockingTaskCallable(null,jLupinLogger ) {
            @Override
            public Long task(Object[] args) throws Throwable {
                return callCSubSystem();
            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousDTask = new JLupinNonBlockingTaskCallable(new Object[] {2},jLupinLogger ) {
            @Override
            public Boolean task(Object[] args) throws Throwable {
                int a = (Integer)args[0];
                return callDSubSystem(a);

            }
        };
        jLupinConcurrentTaskExecutor.execute(callAsynchronousATask,callAsynchronousBTask,callAsynchronousCTask,callAsynchronousDTask);

        long stopTime = System.currentTimeMillis() - startTime;
        int resultTime = (int)(stopTime / 1000);
        jLupinLogger.info("result time:" + resultTime);
    }

    private int callASubSystem() throws Throwable{
        // emulate invoking subsystem...
        Thread.sleep(1000);
        return 1;
    }

    private int callBSubSystem(int a, int b) throws Throwable{
        // emulate invoking subsystem...
        Thread.sleep(1000);
        return 2;
    }

    private long callCSubSystem() throws Throwable{
        // emulate invoking subsystem...
        Thread.sleep(2000);
        return 3L;
    }

    private boolean callDSubSystem(int a) throws Throwable{
        // emulate invoking subsystem...
        Thread.sleep(500);
        if (a> 0) {
            return true;
        }
        else {
            return false;
        }
    }
}

Explanation of the above mentioned example:

The private methods 'callASubSystem, callBSubSystem, callCSubSystem, callDSubSystem' in the above example emulate execution of subsystems A, B, C, D  - the thread becomes idle for the emulation's estimated time. Every subsystem has precisely the same response time as mentioned at the beginning of the chapter. The methods return various objects to show variety only for the purpose of the example.

The method:  

public int synchronousInvoking();

calls subsystems in a synchronous manner in order one after another which causes the response time to add up. The response time is approximately 4.5 seconds.

The method:  

public int asynchronousInvoking();

calls subsystem in an asynchronous manner with the help of the JLupinConcurrentTaskExecutor mechanism.

In this particular example as a task to be executed there is the JLupinBlockingMapTaskCallable object used which enables the retrieval of the results of a given task by the key. Observe the following construction:

JLupinBlockingMapTaskCallable callAsynchronousBTask = new JLupinBlockingMapTaskCallable("key_B", 2000, new Object[] {1,2},jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {

                int a = (Integer)args[0];
                int b = (Integer)args[1];

                return callBSubSystem(a,b);

            }
};

The first constructor parameter is a key on which we will refer to the task's results, the second one is the maximum time for which a thread will be blocked in order to await the task completion - it should be longer than the longest estimated (worst case scenario) response time of the subsystem. The third parameter is an array of parameters that can be taken in the method 'task (Object [] args)' under the respective indices which is shown by the mentioned example. The fourth one, the logger, is already known.

The execution of the task is performed by the following line:

    jLupinConcurrentTaskExecutor.execute(callAsynchronousATask,callAsynchronousBTask,callAsynchronousCTask,callAsynchronousDTask)

The JLupinConcurrentTaskExecutor class is the main mechanism which asynchronously performs the tasks' execution. This class should be a singleton in the given class or even a system - it is important that keys for tasks should not be mixed within the system or class that makes use of this mechanism. In the 'execute' method the JLupinConcurrentTaskExecutor accepts a single task object or an array of tasks. The tasks are executed without the key - without queue polling - without awaiting the results. To perform such tasks use the JLupinNonBlockingTaskCallable task class.

Constructor:

    public  JLupinNonBlockingTaskCallable(Object[] args, JLupinLogger jLupinLogger)

The constructor performs tasks in an asynchronous manner without awaiting the results.
Example - method

    public void asynchronousInvokingWithoutResult() throws Throwable;

    @Override
    public void asynchronousInvokingWithoutResult() throws Throwable {

        long startTime = System.currentTimeMillis();

        JLupinNonBlockingTaskCallable callAsynchronousATask = new JLupinNonBlockingTaskCallable(null,jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {
                return callASubSystem();

            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousBTask = new JLupinNonBlockingTaskCallable(new Object[] {1,2},jLupinLogger ) {
            @Override
            public Integer task(Object[] args) throws Throwable {

                int a = (Integer)args[0];
                int b = (Integer)args[1];

                return callBSubSystem(a,b);

            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousCTask = new JLupinNonBlockingTaskCallable(null,jLupinLogger ) {
            @Override
            public Long task(Object[] args) throws Throwable {
                return callCSubSystem();
            }
        };

        JLupinNonBlockingTaskCallable callAsynchronousDTask = new JLupinNonBlockingTaskCallable(new Object[] {2},jLupinLogger ) {
            @Override
            public Boolean task(Object[] args) throws Throwable {
                int a = (Integer)args[0];
                return callDSubSystem(a);

            }
        };
        jLupinConcurrentTaskExecutor.execute(callAsynchronousATask,callAsynchronousBTask,callAsynchronousCTask,callAsynchronousDTask);

        long stopTime = System.currentTimeMillis() - startTime;
        int resultTime = (int)(stopTime / 1000);
        jLupinLogger.info("result time:" + resultTime);
    }