Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a new class that implements ComponentBatch.
    This class will handle the open/close batch events. Also take care of single job adds to the batch. Is recommended that in the implementation of this class you use streaming as much as possible (don't store all jobs data, since that would be memory inefficient).
    Also, pay attention to thread synchronization. All methods on ComponentBatchObject are likely to be called by multiple threads at the same time.

 

Example: A ComponentBatch - Notice that job IDs are kept just for testing purposes.

 class TestBatch implements ComponentBatch {
   /**
    * Just store the IDs of the jobs in this batch. Normally a "real" batching component should do streaming or something with the incoming jobs.
    */
   StringBuffer currentBatchedJobs = new StringBuffer();
   @Override
   public void open() {
     System.out.println("[TestBatch] Component batch object opened.");
     synchronized (currentBatchedJobs) {
       currentBatchedJobs.setLength(0);
     }
   }

   @Override
   public void close() {
     // This is called when the last job in the batch is closed, AND when the component creating the batches
     // has closed the batch (usually when the batch is flushed, or timed out)
     synchronized (currentBatchedJobs) {
       System.out.println("[TestBatch] Batch closed.. Batch is: \n" + currentBatchedJobs + "\n");
       //
       // HERE:  Do all processing required to process the batch as a whole
       //
       currentBatchedJobs.setLength(0);
     }
   }

   @Override
   public void process(Job j) {
     synchronized (currentBatchedJobs) {
       currentBatchedJobs.append('\t');
       currentBatchedJobs.append(j.getJobId());
       currentBatchedJobs.append('\n');
     }
   }
   
   @Override
   public void creatorCloseEvent(Batch batch) throws AspireException {
     // This will be called when the component who is creating the batches has finished writing jobs to the batch
     // It is only used in special circumstances when hibernating jobs as part of a batch process
     
   }
   
   @Override
   public void reset() throws AspireException {
     close();
   }
 }

2. Make your process() method on your pipeline stage look similar to this:

 public void process(Job j) throws AspireException {
   TestBatch theBatch = null;

   //Check if the received job is on a batch.
   if(j.isOnBatch()) {
     //Get existing component batch object.
     theBatch = (TestBatch) j.getComponentBatchObject(this.getName());
     synchronized (this) {
       //If null,  then create a new one.
       if(theBatch == null) {
         theBatch = new TestBatch();
         theBatch.open();
         j.setComponentBatchObject(this.getName(), theBatch);
       }
     }

     //Add job to existing batch.
     theBatch.process(j);
   }
   else {
     //Work on normal operation (single, non-batched job).
   }

 }

Note: In process() we make use of the custom ComponentBatch class.