Tuesday, July 12, 2011

High-volume Batch Processing in java

In this article I wanted share my experience about the batch processing using java.util.concurrent package that provides good thread pool and thread management capabilities.

Background: I wanted to give a brief explanation on how the team I was the part of it used java.util.concurrent package in older version of java. One of the requirements in my client’s place is interaction with the third party vendors and also different enterprise application with in the organization in different types of modes e.g. online and offline mode.

The popular choice for online integration is using java web services and for offline integration is through exchange of files. Each types of implementation posed its own challenges. I had a chance to work on offline integration through exchange of files (different files formats DAT and XML files). The technology stack was JEE on JDK 1.4.

The client had an enterprise solution implemented the processing of file using Weblogic integration tools (WLI) that has provided very efficient and highly scalable tools like File processor , data transformer , async processes etc. Even this solution was in place there were other challenges like multiple domain setup to cope up with the business needs, scheduling different processes (daily , weekly, monthly) along with other challenges was expert knowledge base to support the WLI application( in-turn turn out to be expensive also)

At this juncture, the enterprise application architect has given direction to move away from WLI solution for batch processing and the choice was to develop a framework in JEE and using spring framework.

Let’s take a step back and see the basic of batch processing and all the options that were available on hand with JDK 1.4. The requirements for batch framework were as follows (not limited to as specified below):

  • System should start the batch processing time to time using scheduling mechanism.
  • System should also provide an ability to kick off the process manually and asynchronously.
  • System should be highly scalable and efficient in handling large volume of data along complex logic and calculations may be involved.
  • System should have both process capability of handling database driven (end goal is generate / produce file(s) ) and also consume multiple files processing capability.

At this juncture, we have done analysis on options that are available in JEE world and the pro’s and con’s of each option. When this analysis was done following options were available

· Using Spring Batch- But Spring batch was not supported in JDK 1.4 and it needed an upgrade to newer/ higher version if JDK which was a constraint.

· Use J2EE technologies like EJB which provides an enterprise stack solutions that are highly scalable but EJB had limitations when comes to batch processing e.g. creating and managing threads in EJB is discouraged. EJB containers strictly control no of threads (execute threads using dispatch policy) and database connections. Resources like DB connection / JTA transactions can be held so long before they timeout indirectly making batch processing run within the constraints imposed by EJB container.

· Use the multi threaded processes using JMS technologies like Message driven beans but these may not be efficient and end up have multiple MDB which make very difficult to handle error conditions.

After analyzing above and many more options. Team has looked into the basics of Batch processing (Thumb rules of Batch Processing):

  1. Basic processing control* (to start & Stop) : A batch process will have capability to spool multiple thread across multiple servers and once multiple threads are spooled it will very difficult to stop them. To have this kind of support we used 2 techniques before threads are created. First one was used a table (batch controlled table) that has start and end time with a job name. The component that is responsible to create threads checks against this table with job name for end time and second technique was using the backport util to notify thread pool manager to pause all the threads (Pausable Thread Pool Manager implementation). In real time we never took advantage of this capability.

  1. Job partitioning * - To avoid large amount data processing or execution using threads could lead to potential problem of locking problems in database and also connections not available for processing thus chocking the batch processing. Job needs to be thoroughly portioned and queued to avoid DB locking issues and other resource not able issues. The job portioning can be done using different strategies and we used “process and mark” strategy for database driven processing initially using Oracle row numbers. But using oracle row numbers was also a great challenge as large data was flowing into database while batch processing is running which made shifting the row numbers ( it was like a moving target time to time) so we end using the sysid’s with start_id and end _id for a given run.

  1. Parallel processing and distribution* - Key to parallel processing is using the chunking the partitioned job into small unit of work that will be processed parallel and with a start and end id and we used JMS to large extent with Spring for re-queuing the portioned jobs.
  2. Fine grain transaction control* - We end up using the User transactions with Spring to have great control over processing each record as we adopted “Process and Mark” strategy so record can be updated or marked properly on successful or un successful processing in database using commit /rollback technique.
  3. Proper error handling and error monitoring* - Framework provided very good error handling mechanism by Exception handling and with in exception handling framework provided generation of error reports and also has built in aspects using cross-cutting mechanism provided by Spring AOP to get the visibility or view into the batch processing.

* Terms used above is from Devx article.

Given the choices of Java JDK 1.4 version and weblogic application server 8.1 and Spring 2.5, the direction was to use a third party framework called Backport util concurrent package (today it is part of Java 5 and Java 6 as java.util. concurrent) that provides good thread management and thread pool etc along above discussed the basic rules.

The framework was developed to start the batch processing using a JSP that will post a work description (it is an XML that has details of work should be done for given run) to a JMS queue. MDB that listens to JMS queue receives the work description as text message. Work description has an action which is a spring beanId which will be invoked loading from spring factory (which is a preprocessor). The responsibility of pre-processor is to get the total count of the records that are available for processing and also derive the start sysid and end sysid based on job partition size. Once the start_sysid and end_sysid is computed and this information is added to same work description and re-queued to next action(that processor bean) . The responsibility of processor bean is give the work description to a work manager along some pre-processing that may require. The work manager is POJO which will handle the Thread Pool and Thread Pool executor along with chunking the partitioned job into further small chunks (again computing small chunks in a partition) and also create / start a thread using Thread pool executor and each thread is assigned a Worker class passing the chunk details. Worker is the actual component that does real job of reading the records (get list) for given chunk start sysid and end sysid. Iterate through the list process the each record (transform DB record to file using XML Beans) and finally write the mapped data to a file (in a specified location on server).

The difference between database driven and file processing is the work description for each process has criteria respectively. Database driven one has criteria to read database records where as file based work description has file name pattern and file location. The pre-processor get the count either from DB and file location and also processor does the partitioning and chunking specified in the work description. One file is assigned to one worker for file processing using a thread and database driven worker will work on chuck start sysid and end sysid.

With above introduction to batch processing technique we will quickly discuss about the backport util / Java util concurrent components that are used in above framework. WorkManager uses ThreadPoolExecutor and LinkedBlockingQueue to manage the threads. A ThreadPoolExecutor is executor service that executes the submitted task using one fo the several thread pools. This thread pools are defined a executor factory and these pools e.g CachedThreadPool, SchduledThreadPool, FixedThreadPool. CachedThreadPool is more efficient as Threads are cached and thread in the pool will be available for reusability. This thread pool is efficient when large numbers of chunks/threads for processing are created for processing. Other 2 thread pools are self explanatory looking at the name of the thread pools. ThreadPoolExecutor is created with a default behavior with core & maximum Pool Size , keep alive times, Queuing, pre starting the all or few core threads ready available in thread pool etc.

Core Pool size : is number of thread allowed in the pool.

Maxium Pool size: Maximum number of threads allowed in the pool.

KeepAliveTimes: Time to be wait before idle threads are terminated ,again this happens only threads are more then core pooled threads.

WorkQueue: Queuing mechanism to hold before executing the threads / task. There are 3 kinds of queuing mechanism, we used the Unbounded queues strategy using the LinkedBlockingQueue , as per this strategy it is unbounded queue means no predefined capacity is defined for queuing. The tasks will be put into queue if all the core pool threads are busy thus no more than core pool size threads are ever created. This mechanism / strategy were used as each thread execution is independent of others.

/**

* A work manager implementation that utilizes the backport util.concurrent

* library for awesome thread pool management

* @author rpollepalli

*/

public class WorkManager implements IWorkManager

{


private static final transient Logger logger = new Logger(

WorkManager.class,

"LOGGER_APP_NAME");


// Thread Executor settings

private boolean allowCoreThreadTimeOut;

// These must be set via bean properties in the Spring config

private IChunker chunker;

private int corePoolSize;

// These can be set via bean properties in the Spring config

protected IErrorHandler errorHandler;

private long keepAliveTime;


private int maximumPoolSize;

private long monitorTime;


// The Thread Executor

private ThreadPoolExecutor threadPool;


private IWorker worker;

// The work queue

private LinkedBlockingQueue workQueue;

private boolean isDatabaseWorkManager;


/**

* Initializes a new work manager with the default settings

*/

public WorkManager()

{

super();

allowCoreThreadTimeOut = false;

keepAliveTime = 60;

corePoolSize = 10;

maximumPoolSize = 1000;

monitorTime = 5000L;

workQueue = new LinkedBlockingQueue();

}


/**

*

*/

public WorkManager(WorkDescriptor descriptor)

{

super();

allowCoreThreadTimeOut = false;

keepAliveTime = 60;

corePoolSize = 10;

maximumPoolSize = 1000;

monitorTime = 5000L;

workQueue = new LinkedBlockingQueue();

}


/**

* @return the corePoolSize

*/

public final int getCorePoolSize()

{

return corePoolSize;

}


/**

* @return the thread keep alive time in seconds

*/

public final long getKeepAliveTime()

{

return keepAliveTime;

}


/**

* @return the maximumPoolSize

*/

public final int getMaximumPoolSize()

{

return maximumPoolSize;

}


/**

* @return the monitorTime

*/

public final long getMonitorTime()

{

return monitorTime;

}


public void init() throws BatchException

{

if (chunker == null)

{

throw new MissingDependencyException(SpringLoader.getMessage(

"error.initialization.missing.dependency", new Object[] {

"chunker", IChunker.class.getName(),

getClass().getName() }));

}


if (worker == null)

{

throw new MissingDependencyException(SpringLoader.getMessage(

"error.initialization.missing.dependency", new Object[] {

"worker", IWorker.class.getName(),

getClass().getName() }));

}

}


/**

* @return the allowCoreThreadTimeOut

*/

public final boolean isAllowCoreThreadTimeOut()

{

return allowCoreThreadTimeOut;

}


/**

* Queue's the chunks for execution. It is capable of handling nested

* chunks--Chunks that have children chunks as well as an N number of

* grandchildren chunks

*

* @param chunk

* a chunk to be queued

*/

private void queueChunk(final Chunk chunk)

{

if (chunk == null)

{

logger.error("Ummmmm, the chunk is null!", null);

return;

}


if (chunk.isParent())

{

// It's a parent chunk, make sure we queue it's children

Iterator iter = chunk.getChunks().iterator();


while (iter.hasNext())

{

Chunk child = (Chunk) iter.next();

queueChunk(child);

}

}

else

{

if (chunk.getDescriptor() == null)

{

logger

.error(

"Ummmmm, the descriptor for the chunk is null, skipping chunk",

null);

return;

}


// Queue the thread with an anonymous inner class

threadPool.submit(new Runnable()

{


/**

* An anonymous inner class used to make the worker threaded

*

* @see java.lang.Runnable#run()

*/

public void run()

{

try

{

if(isDatabaseWorkManager){

Map params = chunk.getDescriptor().getParams();

String fileName = FileUtils.getFileName(featureType,seqNum.intValue(),serviceCode);

logger.debug("fileName: " + fileName,null);

chunk.getDescriptor().setOutputFile(fileName);

}

worker.executeChunk(chunk.getDescriptor());

}

catch (Exception ex)

{

logger

.error(

"The worker threw an exception and processing may not have completed for the chunk",

ex);

// Run the error handler if we have one specified

if (errorHandler != null)

{

errorHandler

.processError(chunk.getDescriptor(), ex);

}

}

}

});

}

}


/**

* Runs a job with the default values for the number of core threads and

* keep alive time. Keep alive time is defined in seconds, however

* allowCoreThreadTimeOut is defaulted to false, so that no core threads

* will time out even if the keep alive time is specified

*

*/

public void runJob(WorkDescriptor workDescriptor) throws BatchException

{

threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,

keepAliveTime, TimeUnit.SECONDS, workQueue);

threadPool.allowCoreThreadTimeOut(allowCoreThreadTimeOut);


// by default all threads are started when the first thread is

// executed, however, we don't want to wait for that because

// we don't know how many chunks we're dealing with, so we're

// prestarting them so they'll be ready to go as soon as we

// start queueing them

threadPool.prestartAllCoreThreads();


Chunk chunk = chunker.getChunks(workDescriptor);


queueChunk(chunk);


if (logger.isDebugEnabled() && monitorTime > 0)

{

// if we're debugging things, we're going to log the thread

// pool statistics every now and then

try

{

while (threadPool.getActiveCount() > 0)

{

StringBuffer out = new StringBuffer(300);


// ### of ### workers have completed, with ### currently

// active. The current thread pool size is ### and the

// largest thread pool size was ###.

out.append(threadPool.getCompletedTaskCount());

out.append(" of ");

out.append(threadPool.getTaskCount());

out.append(" have completed, with ");

out.append(threadPool.getActiveCount());

out

.append(" currently active. The current thread pool size is ");

out.append(threadPool.getPoolSize());

out.append(" and the largest thread pool seen so far was ");

out.append(threadPool.getLargestPoolSize());

out.append(".");


if (allowCoreThreadTimeOut)

{

out

.append(" Core threads are allowed to timeout after ");

out.append(keepAliveTime);

out.append(" seconds.");

}


logger.debug(out.toString(), null);


Thread.sleep(monitorTime);

}

}

catch (Exception e)

{

// just ignore it since we're going to stop monitoring

// the problem on the first exception anyhow

}

}


threadPool.shutdown();

}


/**

* Execute a job on a work descriptor using the specified core pool size.

* This method does not allow core threads to timeout and it also delegates

* to the runJob(WorkDescriptor) method for actual execution.

*

* @see setMaximumPoolSize(int)

*/

public void runJob(WorkDescriptor workDescriptor, int corePoolSize)

throws BatchException

{

setCorePoolSize(corePoolSize);

runJob(workDescriptor);

}


/**

* Execute a job on a work descriptor using the specified core pool size and

* keep alive time (in seconds). This method allows core threads to timeout

* and it also delegates to the runJob(WorkDescriptor) method for actual

* execution.

*

* @see setAllowCoreThreadTimeOut(boolean)

* @see setKeepAliveTime(long)

*/

public void runJob(WorkDescriptor workDescriptor, int corePoolSize,

long keepAliveTime) throws BatchException

{

setAllowCoreThreadTimeOut(true);

setKeepAliveTime(keepAliveTime);

runJob(workDescriptor, maximumPoolSize);

}


/**

* If false (default), core threads stay alive even when idle. If true, core

* threads use keepAliveTime to time out waiting for work.

*

* @param allowCoreThreadTimeOut

* keep core threads alive even when idle

*/

public final void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut)

{

this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;

}


/**

* @param chunker

* the chunker to use for splitting up the work descriptor

*/

public final void setChunker(IChunker chunker)

{

this.chunker = chunker;

}


/**

* Core pool size is the minimum number of workers to keep alive (and not

* allow to time out etc) unless allowCoreThreadTimeOut is set, in which

* case the minimum is zero.

*

* @param corePoolSize

* the minimum number of workers to keep alive

*/

public final void setCorePoolSize(int corePoolSize)

{

this.corePoolSize = corePoolSize;

}


/**

* @param errorHandler

* the errorHandler to execute if an exception is thrown during

* the worker's execution

*/

public final void setErrorHandler(IErrorHandler errorHandler)

{

this.errorHandler = errorHandler;

}


/**

* Timeout in seconds for idle threads waiting for work. Threads use this

* timeout when there are more than the maximum number of threads present or

* if allowCoreThreadTimeOut. Otherwise they wait forever for new work.

*

* @param keepAliveTime

* the time core threads stay alive while waiting for work in

* seconds

*/

public final void setKeepAliveTime(long keepAliveTime)

{

this.keepAliveTime = keepAliveTime;

}


/**

* Maximum pool size of threads, internally limited to a reasonably high

* number in the event a bad number, ie Integer.MAX_VALUE, is set to keep

* the thread executor from breaking the will of the system

*

* @param maximumPoolSize

* the maximum number of threads that can be queued

*/

public final void setMaximumPoolSize(int maximumPoolSize)

{

this.maximumPoolSize = maximumPoolSize;

}


/**

* @param monitorTime

* the monitorTime to set

*/

public final void setMonitorTime(long monitorTime)

{

this.monitorTime = monitorTime;

}


/**

* @param worker

* the worker used to process a chunk

*/

public final void setWorker(IWorker worker)

{

this.worker = worker;

}


}

Reference:
a)Prior Working experience with Backport Util.

No comments:

Post a Comment