Groovy pipelines are pipelines where you can control the flow of the jobs through the stages, using a groovy script instead of a list of stages. For example:


1
2
3
4
5
6
7

 <pipelines>
  <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr
    ]]></script>
  </pipeline>
</pipelines>


Variables

jobJava Type = Job

References the job which is being processed by this groovy pipeline. You can use this variable to process it through stages

docJava Type = AspireObject

The AspireObject which holds all of the metadata for the current document being processed by this groovy pipeline. This is the same as job.get() or job.doc - the job's data object.

StageNameEvery stage component configured in the actual Pipeline Manager is bound by its name to this groovy pipeline.

This way you can reference the stages by using their configured names (i.e. job | FetchUrl | ExtractText).

External Stages

If you want to reference a stage configured outside the actual Pipeline Manager, you can reference it by using the path to that stage component:


  job | stage("../OtherPipelineManager/HierarchyExtractor")


Stage Listing

The groovy pipelines allows you to dynamically build a list of stages to execute. This way you can have a better and easier control of what stages should and shouldn't be processed based on the input job metadata.


   def myPath = ((doc.action == "add" || doc.action == "update")?
                  FetchUrl |         //Stages to process if "add" or "update" action was received
                      ExtractText |
                      ExtractDomain : 
                  PrintToFile         //Stages to process if no "add" or "update" action was received
               ) | Feed2Solr          //Stage to process every time after all stages
 
  job | myPath

Redirects

You can use the redirect feature to print to a file the contents of the jobs received in the actual groovy pipeline, using the ">>" operator and then specifying the target file path.


job | FetchUrl | ExtractText >> "out.xml" | Feed2Solr

In the previous example the redirect is executed before the "Feed2Solr" stage, so if that stage adds or modify any content on the job metadata, it will not be reflected in the "out.xml" file. 

Closure Stages

A Closure Stage is an embedded stage (to the Groovy Pipeline) that receives a groovy closure to execute. For example:


 job | stage{it.doc.add("fetchUrl","http://www.searchtechnologies.com")} | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr


  You can use this to configure other job flows too:


   job | stage{
           it.doc.add("fetchUrl","http://www.searchtechnologies.com");
           it | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain;
           println "End of Closure Stage"
        } | PrintToFile | Feed2Solr

Control flow

Groovy control flow statements can be used to control what pipeline to execute given any condition you want:


   job | FetchUrl | ExtractText;
 
  if (doc.type.text == "text/xml")
   job | XMLProcessor | Post2Solr >> "xmlFiles.xml";
  else if (doc.type.text == "text/html")
    job | HTTPProcessor | Post2Solr >> "htmlFiles.xml";
  else
    job | Post2Solr >> "otherFiles.xml";

Iterations

You can loop through some stages as needed:


   for (i in 0..9) {
    job | stage {doc.add("stageNum",i)}
  }


The previous example will produce the following job:



1
2
3
4
5
6
7
8
9
10
11
12

 <doc>
  <stageNum>0</stageNum>
  <stageNum>1</stageNum>
  <stageNum>2</stageNum>
  <stageNum>3</stageNum>
  <stageNum>4</stageNum>
  <stageNum>5</stageNum>
  <stageNum>6</stageNum>
  <stageNum>7</stageNum>
  <stageNum>8</stageNum>
  <stageNum>9</stageNum>
</doc>

Pipeline branches

Groovy pipelines can also be configured with branches which determine what happens to a job/document when certain events occur. Those branches are configured the same way as in normal pipelines:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

 <pipelines>
  <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile | Feed2Solr
    ]]></script>
    <branches>
      <branch event="onError" pipeline="error-pipeline" />
      <branch event="onComplete" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" />
      <branch event="onMyEvent" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" stage="some-stage"/>
    </branches>
  </pipeline>
 
  <pipeline name="error-pipeline">
    <!-- process packages for which exception errors are thrown -->
    .
    .
    .
  </pipeline>
</pipelines>

Stage exceptions

Stage exceptions are a way, inside groovy pipelines, to have the same control of branches/errors but handled independently by Stage. To configure it you have to call the exceptions() method of the stage to be configured, it receives a Map of labels vs Stage (or List of Stages) For example:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

    <pipeline name="doc-process" default="true">
    <script><![CDATA[
      job | FetchUrl.exceptions([
              onComplete:  stage{it >> "fetchUrlCompleted.xml"} | stage{println "FetchUrl completed for "+it.jobId}
             ])| ExtractText |
                 Splitter |
                 DateChooser |
                 ExtractDomain |
                 PrintToFile |
                 Feed2Solr.exceptions([
                    onError: stage{it >> "Feed2SolrErrors.xml"},
                    onComplete: stage{it >> "Feed2SolrCompleted.xml"}
                 ]) >> "finished.xml"
    ]]></script>
    <branches>
      <branch event="onError" pipeline="error-pipeline" />
      <branch event="onComplete" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" />
      <branch event="onMyEvent" pipelineManager="SomeOtherPipemgr" pipeline="some-other-pipeline" stage="some-stage"/>
    </branches>
  </pipeline>

In this case, when a job completes successfully the FetchUrl stage, it will execute stage{job >> "fetchUrlCompleted.xml"} | stage{println "FetchUrl completed for "+job.jobId} before continuing with ExtractText. This is the same for onTerminate and onError exceptions.

  • If a stage specifies an exception for onTerminateonError or any other event label (i.e. job.setBranch("onAdd")), subsequent stages will receive the job without the job termination, or job branch.
  • If any exception or branch is generated in a stage with no exceptions declarations to handle it, it will propagate the exception up until it founds any stage that handles it.
  • If there are no stages to handle the exception/branch it will be branched according to the <branches> section of the groovy pipeline.

For example, in the previous example, if Feed2Solr has an error, it will execute stage{job >> "Feed2SolrErrors.xml"} and then the job will continue to the next Stage, which is a redirect to "finished.xml" and then, at the end, the "onComplete" branch from the pipeline will be executed. If the "onError" exception wouldn't be configured in Feed2Solr stage, then any error thrown in this stage will be handled by the "onError" branch of the pipeline, and the execution of the pipeline will end at that moment, without executing the redirect to "finished.xml".

You can also configure exceptions to lists of Stages:


   def myStagePath = FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile ;
  job | myStagePath.exceptions([
        onComplete: Feed2Solr
      ]);


Nested exception handling is also available:


   def myStagePath = FetchUrl | ExtractText | Splitter | DateChooser | ExtractDomain | PrintToFile ;
  job | myStagePath.exceptions([
        onComplete: Feed2Solr.exceptions([
                      onError: stage{"it >> 'fetchUrlError.xml'"},
                      onComplete: stage{"it >> 'indexedJobs.xml'"}
                    ])
      ]);

Handling Subjobs

Groovy pipelines provide a way of controlling the flow of sub jobs through stages. Using the subJobs() method of each stage, you can specify what you want to execute for possible subjobs generated in that Stage. It receives a single Groovy Closure or a Map of label (used when the subJob was branched) vs a Stage (or a List of stages):


   job | FetchUrl | XmlSubJobExtractor.subJobs([
                     onSubJob: stage{it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   ])



or just a single Closure that will be executed no matter what are the branch labels for the subjobs:


   job | FetchUrl | XmlSubJobExtractor.subJobs(
                     {it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   )


Note: Sub Job extractors need a dummy <branches>.

In the current design, when you create a sub job extractor for use in Groovy pipelines, you will need to create a dummy sub-job extractor. For example:


  <component name="XmlSubJobExtractor" subType="xmlSubJobExtractor" factoryName="aspire-xml-files">
   <branches><branch event="onSubJob" pipelineManager="DUMMY" /></branches>
 </component>


Otherwise the component will flag an error ("missing branch handler") when it is loaded.

Controlling the number of threads and the threading pool

A different Thread Pool Manager will be assigned to each Stage and parent job to process their subjobs.

To configure the maximum number of thread pools and their sizes.

pipeline/script/@maxThreadPools10The maximum number of thread pools to handle simultaneously by this Groovy pipeline for subjobs. If the maximum number of thread pools in use has been reached, then jobs that want to create new subjobs will have to wait until a thread pool is released by another job.
pipeline/script/@maxThreadsPerPool10The maximum number of threads to create (per thread pool) to handle subjobs.
pipeline/script/@maxQueueSizePerPool30The size of the queue (per thread pool) for processing subjobs. If the job queue is full, then feeders, which attempt to put a new job on the queue, will be blocked until the queue has room. It is recommended that the queue size be at least as large as the number of threads, if not two or three times larger.


Example:


   <pipeline name="doc-process" default="true">
    <script maxThreadPools="10" maxThreadsPerPool="10" maxQueueSizePerPool="30"><![CDATA[
        job | FetchUrl | XmlSubJobExtractor.subJobs([
                     onSubJob: stage{it | FetchUrl | ExtractText | PostHttp >> "subjobs.xml"}
                   ])
    ]]></script>
  </pipeline>

Creating jobs

You can create jobs inside a Groovy Pipeline by using the createJob method:


    contactsJob = createJob('<doc><url>'+doc.url.text()+'/contacts.html</url></doc>')
   contactsJob | FetchUrl | ExtractText

Filesystem job feeder

You can use groovy pipelines to create jobs for each file and directory from a given path. For this purpose the groovy pipelines provides a function named 'dir'. There are 3 possible arguments:

PathAspire Home pathDirectory where the files and directories will be fetched. If running AspireShell this it can be changed using 'cd' (change directory) command.
Closure
Closure to execute with every job created for each file and directory
Arguments""Specifies if directories should create jobs (using "+d") and if the extraction of files should be recursively (using "+r"). By default, if no Arguments specified only files will create jobs and will not crawl recursively.

This function can be used in 4 different ways:

  • dir (Path, Closure, Arguments)
  • dir (Path, Closure)
  • dir (Closure, Arguments)
  • dir (Closure)

Each job created will have an <url> field pointing to the corresponding file/directory.

Example:


  dir {it | FetchUrl | ExtractText >> "files.xml"}               //Only files inside the Aspire_Home directory
  dir ({it | FetchUrl | ExtractText >> "files+dir.xml"},"+d")    //Files and directories inside the Aspire_Home directory
  dir ({it | FetchUrl | ExtractText >> "files+dir.xml"},"+d+r")  //Files and directories recursively inside the Aspire_Home directory
  dir ("data",{it | FetchUrl | ExtractText >> "data_files.xml"}) //Only files inside the Aspire_Home/data directory
  dir ("data",{it | FetchUrl | ExtractText >> "data_files+dir.xml"},"+d") //Files and directories inside the Aspire_Home/data directory



  • No labels