Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Pipeline Manager is responsible for processing jobs (which contain documents to process) and pipelines. Pipeline managers are essentially passive and wait for jobs to arrive (from feeders). The jobs are put on an internal job queue and are then picked up by execution threads (a thread pool), which processes the job through the pipeline to completion.

 

Contents

...

Key Concepts

Jobs

Pipeline managers process jobs. Jobs can come from either of two sources:

...

The basic structure for configuring a pipeline manager is as follows:

...

Code Block
languagexml
linenumberstrue
<component name="PIPELINE-MANAGER-NAME" subType="pipeline" factoryName="aspire-application">
      <queueSize>30</queueSize>
      <maxThreads>10</maxThreads>
      <queueTimeout>300000</queueTimeout>
      <shutdownTimeout>300000</shutdownTimeout>

      <components>
        <!-- Identify and configure all components here, any order -->
        <component name="COMPONENT1" subType="SUBTYPE" factoryName="aspire-component1"> ... </component>
        <component name="COMPONENT2" subType="SUBTYPE" factoryName="aspire-component2"> ... </component>
        <component name="COMPONENT3" subType="SUBTYPE" factoryName="aspire-component3"> ... </component>
        .
        .
        .
      </components>

      <pipelines>
        <!-- The list of pipelines go here -->
        <pipeline name="PIPELINE1" default="true">
          <stages>
            <!-- List all stages in the pipeline here, in order. -->
            <stage component="COMPONENT1" />
            <stage component="COMPONENT2" />
            <stage component="COMPONENT3" />
            .
            .
            .
          </stages>
        </pipeline>

        <!-- A pipeline manager can manage any number of pipelines -->
        <pipeline name="PIPELINE1"> ... </pipeline>
        <pipeline name="PIPELINE2"> ... </pipeline>
        .
        .
        .
      </pipelines>
    </component>

 

 

Notes:

  • The pipeline contains a list of components, a list of pipelines, and a list of stages for each pipeline.
  • Each stage references a component by name from the list of components configured under the <components> tag.

...

Most pipeline configurations are a simple list of stages, for example:

...

Code Block
languagexml
linenumberstrue
 <pipelines>
  <pipeline name="doc-process" default="true">
    <stages>
      <stage component="fetchUrl" />
      <stage component="extractText" />
      <stage component="splitter" />
      <stage component="dateChooser" />
      <stage component="extractDomain" />
      <stage component="printToFile" />
      <stage component="feed2Solr" />
    </stages>
  </pipeline>
</pipelines>
 

Enabling and Disabling Pipelines and Stages

...

These flags are useful for turning on or off pipelines and references to stages in response to property settings (either as an App Bundle or via property settings specified in the settings.xml file).

Example:

...

Code Block
languagexml
linenumberstrue
 <pipelines>
  <!-- The next two pipelines are declared, but disabled. -->
  <pipeline name="doc-process1" enable="false">
    <stages>
      <stage component="fetchUrl" />
      <stage component="extractText" />
      <stage component="splitter" />
      <stage component="dateChooser" />
      <stage component="extractDomain" />
      <stage component="printToFile" />
      <stage component="feed2Solr" />
    </stages>
  </pipeline>
  <pipeline name="doc-process2" disable="true">
    <stages>
      <stage component="fetchUrl" />
      <stage component="extractText" />
      <stage component="splitter" />
      <stage component="dateChooser" />
      <stage component="extractDomain" />
      <stage component="printToFile" />
      <stage component="feed2Solr" />
    </stages>
  </pipeline>
  
  <!-- The next pipeline is enabled, but disables the 'splitter', 'dateChooser' and 'extractDomain' components. -->
  <pipeline name="doc-process3" enable="true">
    <stages>
      <stage component="fetchUrl" />
      <stage component="extractText" />
      <stage component="splitter" enable="false" />
      <stage component="dateChooser" disable="true" />
      <stage component="extractDomain" enable="false" />
      <stage component="printToFile" />
      <stage component="feed2Solr" />
    </stages>
  </pipeline>
</pipelines>
 

If neither @enable or @disable are present, then it is assumed that the pipeline or stage is enabled.

...

Pipelines can also be configured with branches which determine what happens to a job/document when certain events occur. Branches are configured inside the pipeline using a <branches> tag, like below:

...

Code Block
languagexml
linenumberstrue
 <pipelines>
  <pipeline name="doc-process" default="true">
    <stages>
      <stage component="FetchUrl" />
      <stage component="ExtractText" />
      <stage component="Splitter" />
      <stage component="DateChooser" />
      <stage component="ExtractDomain" />
      <stage component="PrintToFile" />
      <stage component="Feed2Solr" />
    </stages>
    <branches>
      <branch event="onError" pipeline="error-pipeline" />
      <branch event="onComplete" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" />
      <branch event="onMyEvent" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" stage="some-stage"/>
    </branches>
  </pipeline>

  <pipeline name="error-pipeline">
    <!-- process packages for which exception errors are thrown -->
    .
    .
    .
  </pipeline>
</pipelines>

...

 

If @pipelineManager is not specified, then the event will branch to the same pipeline manager. If @pipeline is not specified, the event will branch to the same pipeline on this pipeline manager (if @pipelineManager is not given), or the default pipeline on the specified pipeline manager. If @stage is specified, then the processing of the job will continue with that stage (which could be in the middle of the pipeline), on the pipeline manager and pipeline determined by the above rules.

...

Also note that pipelines can have branches and that a new _optional_ "onTerminate" event has been added to the pipeline manager.

For example:

Code Block
languagexml
linenumberstrue
   <pipeline name="test2">
    <stages>
      <stage component="Schwarzenegger"/>
      <stage component="OldFashionedSgml"/>
    </stages>
    <branches>
      <branch event="onTerminate" pipeline="process-terminate"/>
    </branches>
  </pipeline>
   <pipeline name="process-terminate">
    <stages>
      <stage component="NewFangledXml"/>
      <stage component="AndAnother"/>
    </stages>
  </pipeline>
 

In the above example, the "Schwarzenegger" stage causes the job to be terminated (Arnold is the Terminator, right?). This is trapped by the pipeline's "onTerminate" branch, which then sends the job to the "process-terminate" pipeline where it continues.

...

Groovy pipelines are pipelines where you can control the flow of the jobs through the stages, using a groovy script instead of a list of stages. For example:

...

Code Block
languagexml
linenumberstrue
 <pipelines>
  <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr
    ]]></script>
  </pipeline>
</pipelines>
 

Variables

VariableDescription
jobJava Type = Job

References the job which is being processed by this groovy pipeline. You can use this variable to process it through stages

docJava Type = AspireObject

The AspireObject which holds all of the metadata for the current document being processed by this groovy pipeline. This is the same as job.get() or job.doc - the job's data object.

StageNameEvery stage component configured in the actual Pipeline Manager is bound by its name to this groovy pipeline.

This way you can reference the stages by using their configured names (i.e. job | FetchUrl | ExtractText).

...

If you want to reference a stage configured outside the actual Pipeline Manager, you can reference it by using the path to that stage component:

...

Code Block
languagexml
linenumberstrue
  job | stage("../OtherPipelineManager/HierarchyExtractor")
 

Stage Listing

The groovy pipelines allows you to dynamically build a list of stages to execute. This way you can have a better and easier control of what stages should and shouldn't be processed based on the input job metadata.

Code Block
languagexml
linenumberstrue
   def myPath = ((doc.action == "add" || doc.action == "update")? 
                  FetchUrl |         //Stages to process if "add" or "update" action was received
                      ExtractText | 
                      

...

ExtractDomain :  
                  PrintToFile         //Stages to process if no "add" or "update" action was received
               ) | Feed2Solr          //Stage to process every time after all stages

  job | myPath
 

Redirects

You can use the redirect feature to print to a file the contents of the jobs received in the actual groovy pipeline, using the ">>" operator and then specifying the target file path.

...

A Closure Stage is an embedded stage (to the Groovy Pipeline) that receives a groovy closure to execute. For example:

Code Block
languagexml

...

linenumberstrue
 job | stage{it.doc.add("fetchUrl","http://www.searchtechnologies.com")} | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr

...

  You can use this to configure other job flows too:
Code Block
languagexml
linenumberstrue
   job | stage{
           it.doc.add("fetchUrl","http://www.searchtechnologies.com");
           it | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain;
           println "End of Closure Stage"
        } | PrintToFile | Feed2Solr
 

Control flow

Groovy control flow statements can be used to control what pipeline to execute given any condition you want:

Code Block
languagexml
linenumberstrue
   job | FetchUrl | ExtractText;

  if (doc.type.text == "text/xml") 
   job | XMLProcessor | Post2Solr >> "xmlFiles.xml";
  else if (doc.type.text == "text/html") 
    job | HTTPProcessor | Post2Solr >> "htmlFiles.xml";
  else
    job | Post2Solr >> "otherFiles.xml";

    

Iterations

You can loop through some stages as needed:

Code Block
languagexml
linenumberstrue
   for (i in 0..9) { 
    job | stage {doc.add("stageNum",i)}
  }

...

 

The previous example will produce the following job:

...

Code Block
languagexml
linenumberstrue
 <doc>
  <stageNum>0</stageNum>
  <stageNum>1</stageNum>
  <stageNum>2</stageNum>
  <stageNum>3</stageNum>
  <stageNum>4</stageNum>
  <stageNum>5</stageNum>
  <stageNum>6</stageNum>
  <stageNum>7</stageNum>
  <stageNum>8</stageNum>
  <stageNum>9</stageNum>
</doc>
 

Pipeline branches

Groovy pipelines can also be configured with branches which determine what happens to a job/document when certain events occur. Those branches are configured the same way as in normal pipelines:

...

Code Block
languagexml
linenumberstrue
 <pipelines>
  <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr
    ]]></script>
    <branches>
      <branch event="onError" pipeline="error-pipeline" />
      <branch event="onComplete" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" />
      <branch event="onMyEvent" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" stage="some-stage"/>
    </branches>
  </pipeline>

  <pipeline name="error-pipeline">
    <!-- process packages for which exception errors are thrown -->
    .
    .
    .
  </pipeline>
</pipelines>
 
Stage exceptions

Stage exceptions are a way, inside groovy pipelines, to have the same control of branches/errors but handled independently by Stage. To configure it you have to call the exceptions() method of the stage to be configured, it receives a Map of labels vs Stage (or List of Stages) For example:

Code Block
languagexml
linenumberstrue
    <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl.exceptions([
              onComplete:  stage{it >> "fetchUrlCompleted.xml"} | stage{println "FetchUrl completed for "+it.jobId}
             ])| ExtractText | 
                 Splitter | 
                 DateChooser | 
                 ExtractDomain | 
                 PrintToFile | 
                 Feed2Solr.exceptions([
                    onError: stage{it >> "Feed2SolrErrors.xml"},
                    onComplete: stage{it >> "Feed2SolrCompleted.xml"}
                 ]) >> "finished.xml"
    ]]></script>
    <branches>
      <branch event="onError" pipeline="error-pipeline" />
      <branch event="onComplete" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" />
      <branch event="onMyEvent" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" stage="some-stage"/>
    </branches>
  </pipeline>

...

 

In this case when a job completes successfully the FetchUrl stage, it will execute stage{job >> "fetchUrlCompleted.xml"} | stage{println "FetchUrl completed for "+job.jobId} before continuing with ExtractText. This is the same for onTerminate and onError exceptions. If a stage specifies an exception for onTerminate, onError or any other event label (i.e. job.setBranch("onAdd")), subsequent stages will receive the job without the job termination, or job branch, but if any exception or branch is generated in a stage with no exceptions declarations to handle it, it will propagate the exception up until it founds any stage that handles it. If there are no stages to handle the exception/branch it will be branched according to the <branches> section of the groovy pipeline.

...

You can also configure exceptions to lists of Stages:

Code Block
languagexml
linenumberstrue
   def myStagePath = FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile 

...

;
  job | myStagePath.exceptions([
        onComplete: Feed2Solr
      ]);

...

 

Nested exception handling is also available:

Code Block
languagexml
linenumberstrue
   def myStagePath = FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | 

...

PrintToFile ;
  job | myStagePath.exceptions([
        onComplete: Feed2Solr.exceptions([
                      onError: stage{"it >> 'fetchUrlError.xml'"},
                      onComplete: stage{"it >> 'indexedJobs.xml'"}
                    ])
      ]);
 

Handling Subjobs

Groovy pipelines provide a way of controlling the flow of sub jobs through stages. Using the subJobs() method of each stage, you can specify what you want to execute for possible subjobs generated in that Stage. It receives a single Groovy Closure or a Map of label (used when the subJob was branched) vs a Stage (or a List of stages):

Code Block
languagexml
linenumberstrue
   job | FetchUrl | XmlSubJobExtractor.subJobs([
                     onSubJob: stage{it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   ])

...

 

or just a single Closure that will be executed no matter what are the branch labels for the subjobs:

Code Block
languagexml
linenumberstrue
   job | FetchUrl | XmlSubJobExtractor.subJobs(
                     {it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   )

...

 


Note: Sub Job extractors need a dummy <branches>.

In the current design, when you create a sub job extractor for use in Groovy pipelines, you will need to create a dummy sub-job extractor. For example:

Code Block
languagexml
linenumberstrue
  <component name="XmlSubJobExtractor" subType="xmlSubJobExtractor" factoryName="aspire-xml-files">
   <branches><branch event="onSubJob" pipelineManager="DUMMY" /></branches>
 </component>

...

 

Otherwise the component will flag an error ("missing branch handler") when it is loaded.

...

ElementDefaultDescription
pipeline/script/@maxThreadPools10The maximum number of thread pools to handle simultaneously by this Groovy pipeline for subjobs. If the maximum number of thread pools in use has been reached, then jobs that want to create new subjobs will have to wait until a thread pool is released by another job.
pipeline/script/@maxThreadsPerPool10The maximum number of threads to create (per thread pool) to handle subjobs.
pipeline/script/@maxQueueSizePerPool30The size of the queue (per thread pool) for processing subjobs. If the job queue is full, then feeders, which attempt to put a new job on the queue, will be blocked until the queue has room. It is recommended that the queue size be at least as large as the number of threads, if not two or three times larger.

For example:

Code Block
languagexml
linenumberstrue
   <pipeline name="doc-process" default="true">
    <script maxThreadPools="10" maxThreadsPerPool="10" maxQueueSizePerPool="30"><![CDATA[
        job | FetchUrl | XmlSubJobExtractor.subJobs([
                     onSubJob: stage{it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   ])
    ]]></script>
  </pipeline>
 

Creating jobs

You can create jobs inside a Groovy Pipeline by using the createJob method::

Code Block
languagexml
linenumberstrue
    contactsJob = createJob('<doc><url>'+doc.url.text()+'/contacts.html</url></doc>')
   contactsJob | FetchUrl | ExtractText
 

Filesystem job feeder

You can use groovy pipelines to create jobs for each file and directory from a given path. For this purpose the groovy pipelines provides a function named 'dir'. There are 3 possible arguments:

...

Each job created will have an <url> field pointing to the corresponding file/directory.

Example:

Code Block
languagexml
linenumberstrue
   dir {it | FetchUrl | ExtractText >> "files.xml"}               //Only files inside the Aspire_Home directory
  dir ({it | FetchUrl | ExtractText >> "files+dir.xml"},"+d")    //Files and directories inside the Aspire_Home directory
  dir ({it | FetchUrl | ExtractText >> "files+dir.xml"},"+d+r")  //Files and directories recursively inside the Aspire_Home directory
  dir ("data",{it | FetchUrl | ExtractText >> "data_files.xml"}) //Only files inside the Aspire_Home/data directory
  dir ("data",{it | FetchUrl | ExtractText >> "data_files+dir.xml"},"+d") //Files and directories inside the Aspire_Home/data directory
 

Configuring Health Checks

...

The following is an example configuration:

Code Block
languagexml
linenumberstrue
  <component name="MyPipelineManager" subType="pipeline" factoryName="aspire-application">
   <healthChecks>
       
     <timestamp name="Test Timestamp" redThreshold="4000" yellowThreshold="1000" 
                history="5" />
       
     <initialization name="Test Long Initializer">
        <check componentRef="/pipeline/LongInitializer"/>
        <check componentRef="/pipeline/Concat-Test"/>
     </initialization>
       
     <jobCount redThreshold="1" />
       
   </healthChecks>
     
   <!-- Configure your pipelines here -->
     
   <!-- Configure your components here -->
   
 </component>
 

Note that all health checks can have a "user friendly name" attached to them (the @name attribute).

...

This health check is used to check components to see if they are initializing. If they are, the health of the system will be returned as "INITIALIZING".

Configuration:

Code Block
languagexml
linenumberstrue
      <initialization name="Test Long Initializer">
        <check componentRef="/pipeline/LongInitializer"/>
        <check componentRef="/pipeline/Concat-Test"/>
     </initialization>
 
  • <check> - Specifies the component to check.

...

This health check provides a total count of jobs and will determine the health of the system based on the total number of failed jobs that occur.

Configuration:

Code Block
languagexml
linenumberstrue
  <jobCount name="Count of Document Jobs" redThreshold="3" yellowThreshold="1"/>
 

Attributes:

  • @redThreshold - If total number of failed jobs is greater than or equal to this number, system health is RED. Should be 1 or more.
    • If @redThreshold is "0", your system will be RED all the time! So, set it to 1 or more.

...

Timestamp health checks are used to check the duration of every job and are typically used for occasional jobs (for example, nightly) that take a long time to run (i.e., hours).

Configuration:

Code Block
languagexml
linenumberstrue
  <timestamp name="Rebuild Dictionary Token Stats" history="5" redThreshold="10000" yellowThreshold="2000"/>
 

Attribute:

  • @history - The number of past timestamps to display on the health detail page.
  • @redThreshold - (milliseconds) If a job takes this much time to complete (or more), health will be RED.
  • @yellowThreshold - (milliseconds) If a job takes this much time to complete (or more), health will be at least YELLOW.

...

Configuration: (defaults to 15 minute intervals over 24 hours)

Code Block
languagexml
linenumberstrue
  <latency name="Process Single Document" jobsToAverage="5" isSticky="true" 
          redThreshold="15000" yellowThreshold="5000" />

...

 

Configuration: (specify the interval and history length)

Code Block
languagexml
linenumberstrue
  <latency name="Process Single Document" jobsToAverage="5" isSticky="true" 
          redThreshold="15000" yellowThreshold="5000"
          interval="3600000" history="48" />
 

Attributes for Moving Averages:

...