The Branch Handler can be configured to handle batches of jobs. Pipelines typically operate a job at a time, and Aspire is mostly about splitting items up into pieces and processing the smaller pieces (aka, "sub jobs"). However, there are situations where batching is appropriate, mostly when you need communicate with external systems, for example to send a batch of documents to a search engine.

Introduction


When batching is enabled (using configuration parameters on the Branch Handler), the Branch Handler will create a Batch object to represent the batch as a whole. All jobs which belong to the batch will be attached to the batch. You can use the methods Job.isOnBatch() to see if a job is part of a larger batch.

Each component that supports job batching will implement a custom ComponentBatch that contains the required logic for opening/closing events on the batch and processing single jobs that are part of a existing batch. The parent Batch object will contain, essentially, a map of all of the ComponentBatch objects for all components which have implemented batching.

Once all of the jobs in a batch are "complete" (and when the batch is released by the Branch Handler), the batch will be officially "complete". This then causes the batch operations to be performed. Basically this means that all of the ComponentBatch objects will be closed (using the close() method). Each ComponentBatch object will then perform the batch-ed job operation which they need to perform.

Program for Batching In Your Components


Important: Most components operate on documents a job at a time. Batches will flow through these components unmodified. No special programming is required to handle batches unless your component needs to do some special batch operation (such as save up all IDs and do a batch update of a relational database, for example).

Follow these steps to allow your pipeline stage to take advantage of job batching.

  1. Create a new class that implements ComponentBatch.
    This class will handle the open/close batch events. Also take care of single job adds to the batch. Is recommended that in the implementation of this class you use streaming as much as possible (don't store all jobs data, since that would be memory inefficient).
    Also, pay attention to thread synchronization. All methods on ComponentBatchObject are likely to be called by multiple threads at the same time.

Example: A ComponentBatch - Notice that job IDs are kept just for testing purposes.

 class TestBatch implements ComponentBatch {
   /**
    * Just store the IDs of the jobs in this batch. Normally a "real" batching component should do streaming or something with the incoming jobs.
    */
   StringBuffer currentBatchedJobs = new StringBuffer();
   @Override
   public void open() {
     System.out.println("[TestBatch] Component batch object opened.");
     synchronized (currentBatchedJobs) {
       currentBatchedJobs.setLength(0);
     }
   }

   @Override
   public void close() {
     // This is called when the last job in the batch is closed, AND when the component creating the batches
     // has closed the batch (usually when the batch is flushed, or timed out)

     synchronized (currentBatchedJobs) {
       System.out.println("[TestBatch] Batch closed.. Batch is: \n" + currentBatchedJobs + "\n");
       //
       // HERE:  Do all processing required to process the batch as a whole
       //
       currentBatchedJobs.setLength(0);
     }
   }

   @Override
   public void process(Job j) {
     synchronized (currentBatchedJobs) {
       currentBatchedJobs.append('\t');
       currentBatchedJobs.append(j.getJobId());
       currentBatchedJobs.append('\n');
     }
   }
   
   @Override
   public void creatorCloseEvent(Batch batch) throws AspireException {
     // This will be called when the component who is creating the batches has finished writing jobs to the batch
     // It is only used in special circumstances when hibernating jobs as part of a batch process
     
   }
   
   @Override
   public void reset() throws AspireException {
     close();
   }
 }

2. Make your process() method on your pipeline stage look similar to this:

 public void process(Job j) throws AspireException {
   TestBatch theBatch = null;

   //Check if the received job is on a batch.
   if(j.isOnBatch()) {
     //Get existing component batch object.
     theBatch = (TestBatch) j.getComponentBatchObject(this.getName());
     synchronized (this) {
       //If null,  then create a new one.
       if(theBatch == null) {
         theBatch = new TestBatch();
         theBatch.open();
         j.setComponentBatchObject(this.getName(), theBatch);
       }
     }

     //Add job to existing batch.
     theBatch.process(j);
   }
   else {
     //Work on normal operation (single, non-batched job).
   }

 }

In process() we make use of the custom ComponentBatch class.

  • No labels