ALPHA VERSION

The Avro Files Extractor provides the following functionality:

  • Opens all Avro files as discovered by a connector.
  • Streams records within the Avro file to the connector workflows as additional jobs (logical documents) for further processing.
  • ID's of extracted documents are stored together with their parent ID in NoSQL db and used later if delete request for the Avro file occur to be able to send "delete" jobs for previously extracted subJobs
Avro Files Extractor
Factory Namecom.searchtechnologies.aspire:aspire-avro-extractor
subType

default

Inputs

Job containing a data stream (object['contentStream'] which is a stream to the Avro File to process).

OutputsOne subDocument for each Avro record in the Avro file, submitted as a subjob.

Configuration

Element
Type
Default
Description
subJobTimeoutlong600000timeout for subjobs in millis
debugbooleanfalseIf true it will log debug information from the component
notStoreIds
booleanfalseIf true it will NOT store deletes for future deletes
noInfoMessagesbooleanfalseIf true it will not write info messages to log
bulkSizeint1000The bulk size for NoSql
bulkTimeoutlong1000The bulk timeout for NoSql in ms

Example Configurations

<component name="AvroSubJobExtractor" subType="default" factoryName="aspire-avro-extractor">
	<debug>${debug}</debug>
	<notStoreIds>${notStoreIds}</notStoreIds>
	<noInfoMessages>${noInfoMessages}</noInfoMessages>
	<bulkSize>${bulkSize}</bulkSize>
	<bulkTimeout>${bulkTimeout}</bulkTimeout>
    <branches>
      <branch event="onAddUpdateSubJob" pipelineManager="CompletePM" batching="false" />
      <branch event="onDeleteSubJob" pipelineManager="DeletePM" batching="false" />
    </branches>
</component>

<component name="CompletePM" subType="pipeline" factoryName="aspire-application">
   <debug>${debug}</debug>
   <gatherStatistics>${debug}</gatherStatistics>
   <pipelines>
      <pipeline name="completePipeline" default="true">
         <script>
            <![CDATA[
               job | AddUpdateJobLogger
               job.addRoute("/"+doc.sourceId.text()+"/ProcessPipelineManager@${addUpdatePM}")
            ]]>
         </script>
      </pipeline>
   </pipelines>
   <components>
      <component name="AddUpdateJobLogger" subType="jobLogger" factoryName="aspire-tools">
         <debug>${debug}</debug>
         <logFile>log/${app.name}/addUpdate.jobs</logFile>
      </component>
   </components>
</component>

<component name="DeletePM" subType="pipeline" factoryName="aspire-application">
   <debug>${debug}</debug>
   <gatherStatistics>${debug}</gatherStatistics>
   <pipelines>
      <pipeline name="deletePipeline" default="true">
         <script>
            <![CDATA[
               job | DeletedJobLogger
               job.addRoute("/"+doc.sourceId.text()+"/ProcessPipelineManager@${deletePM}")
            ]]>
         </script>
      </pipeline>
   </pipelines>
   <components>
      <component name="DeletedJobLogger" subType="jobLogger" factoryName="aspire-tools">
         <debug>${debug}</debug>
         <logFile>log/${app.name}/deleted.jobs</logFile>
      </component>
   </components>
</component>



  • No labels