...
Example: A ComponentBatch - Notice that job IDs are kept just for testing purposes.
class TestBatch implements ComponentBatch {
/**
* Just store the IDs of the jobs in this batch. Normally a "real" batching component should do streaming or something with the incoming jobs.
*/
StringBuffer currentBatchedJobs = new StringBuffer();
@Override
public void open() {
System.out.println("[TestBatch] Component batch object opened.");
synchronized (currentBatchedJobs) {
currentBatchedJobs.setLength(0);
}
}
@Override
public void close() {
// This is called when the last job in the batch is closed, AND when the component creating the batches
// has closed the batch (usually when the batch is flushed, or timed out)
synchronized (currentBatchedJobs) {
System.out.println("[TestBatch] Batch closed.. Batch is: \n" + currentBatchedJobs + "\n");
//
// HERE: Do all processing required to process the batch as a whole
//
currentBatchedJobs.setLength(0);
}
}
@Override
public void process(Job j) {
synchronized (currentBatchedJobs) {
currentBatchedJobs.append('\t');
currentBatchedJobs.append(j.getJobId());
currentBatchedJobs.append('\n');
}
}
@Override
public void creatorCloseEvent(Batch batch) throws AspireException {
// This will be called when the component who is creating the batches has finished writing jobs to the batch
// It is only used in special circumstances when hibernating jobs as part of a batch process
}
@Override
public void reset() throws AspireException {
close();
}
}
2. Make your process() method on your pipeline stage look similar to this:
public void process(Job j) throws AspireException {
TestBatch theBatch = null;
//Check if the received job is on a batch.
if(j.isOnBatch()) {
//Get existing component batch object.
theBatch = (TestBatch) j.getComponentBatchObject(this.getName());
synchronized (this) {
//If null, then create a new one.
if(theBatch == null) {
theBatch = new TestBatch();
theBatch.open();
j.setComponentBatchObject(this.getName(), theBatch);
}
}
//Add job to existing batch.
theBatch.process(j);
}
else {
//Work on normal operation (single, non-batched job).
}
}
Note: In process() we make use of the custom ComponentBatch class.