Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The Avro Files Extractor provides the following functionality:

  • Opens all Avro files as discovered by a connector.
  • Streams records within the Avro file to the connector workflows as additional jobs (logical documents) for further processing.
  • ID's of extracted documents are stored together with their parent ID in NoSQL db and used later if delete request for the Avro file occur to be able to send "delete" jobs for previously extracted subJobs
Avro Files Extractor
Factory Namecom.searchtechnologies.aspire:aspire-avro-extractor
subType

default

Inputs

Job containing a data stream (object['contentStream'] which is a stream to the Avro File to process).

OutputsOne subDocument for each Avro record in the Avro file, submitted as a subjob.

Aspire EnterpriseAspire Premium

Configuration

Element
Type
Default
Description
subJobTimeoutlong600000timeout for subjobs in millis
debugbooleanfalseIf true it will log debug information from the component

Example Configurations

Code Block
<component name="AvroSubJobExtractor" subType="default" factoryName="aspire-avro-extractor">
   <debug>${debug}</debug>
   <branches>
      <branch event="onAddUpdateSubJob" pipelineManager="CompletePM" batching="false" />
      <branch event="onDeleteSubJob" pipelineManager="DeletePM" batching="false" />
   </branches>
</component>

<component name="CompletePM" subType="pipeline" factoryName="aspire-application">
   <debug>${debug}</debug>
   <gatherStatistics>${debug}</gatherStatistics>
   <pipelines>
      <pipeline name="completePipeline" default="true">
         <script>
            <![CDATA[
               job | AddUpdateJobLogger
               job.addRoute("/"+doc.sourceId.text()+"/ProcessPipelineManager@${addUpdatePM}")
            ]]>
         </script>
      </pipeline>
   </pipelines>
   <components>
      <component name="AddUpdateJobLogger" subType="jobLogger" factoryName="aspire-tools">
         <debug>${debug}</debug>
         <logFile>log/${app.name}/addUpdate.jobs</logFile>
      </component>
   </components>
</component>

<component name="DeletePM" subType="pipeline" factoryName="aspire-application">
   <debug>${debug}</debug>
   <gatherStatistics>${debug}</gatherStatistics>
   <pipelines>
      <pipeline name="deletePipeline" default="true">
         <script>
            <![CDATA[
               job | DeletedJobLogger
               job.addRoute("/"+doc.sourceId.text()+"/ProcessPipelineManager@${deletePM}")
            ]]>
         </script>
      </pipeline>
   </pipelines>
   <components>
      <component name="DeletedJobLogger" subType="jobLogger" factoryName="aspire-tools">
         <debug>${debug}</debug>
         <logFile>log/${app.name}/deleted.jobs</logFile>
      </component>
   </components>
</component>