Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This stage is primarily intended to split a parquet file containing a list of items and then to process each individual record one at a time, as sub-jobs on their own pipeline.

This stage takes a job which contains a data stream. It assumes that the data stream represents an parquet file and then process every entry in the file to create sub-job documents.


Warning

Due the nature of the parquet files, the component will not be monitoring for changes on previously processed files for changes

The Parquet Reader provides the following functionality:

  • Opens all Parquet files as discovered by the CIFS Connector.
  • Streams records within the Parquet file to the connector workflows as additional jobs (logical documents) for further processing.
  • Implemented as a standard download via Aspire Maven Repository and available in RedHat Package Management (RPM) form.
  • Integrated into the Aspire workflows via drag and drop in the Admin UI

    .




    Parquet Reader
    Factory Namecom.searchtechnologies.aspire:aspire-parquet-extractor
    subType

    default

    Inputs
    Outputs

    Job containing a data stream (object['contentStream'] which is a stream to the Archive File to process).

    OutputsOne subDocument for each entry in the parquet file, submitted as a subjob.

    Image AddedImage Removed

    Configuration

    Element
    Type
    Default
    Description
    debugbooleanfalseShow debug messages
    subJobTimeoutlong600000timeout for subjobs in millis
    notStoreIds
    booleanfalseIf true it will NOT store deletes for future deletes
    noInfoMessagesbooleanfalseIf true it will not write info messages to log
    bulkSizeint1000The bulk size for NoSql
    bulkTimeoutlong1000The bulk timeout for NoSql in ms

    Example Configurations

    Code Block
    languagexml
    themeRDark
    <component name="ParquetSubJobExtractor" subType="default" factoryName="aspire-parquet-extractor">
        <debug>${debug}</debug>
        <notStoreIds>${notStoreIds}</notStoreIds>
        <noInfoMessages>${noInfoMessages}</noInfoMessages>
        <bulkSize>${bulkSize}</bulkSize>
        <bulkTimeout>${bulkTimeout}</bulkTimeout>
    	 <branches>
    		<branch event="onAddUpdateSubJob" pipelineManager="AddUpdatePM" batching="false" />
    		<branch event="onErrorSubJob" pipelineManager="ErrorPM" batching="false" />
    		<branch event="onDeleteSubJob" pipelineManager="DeletePM" batching="false" />
    	</branches>
    </component>
    
    Note that you need to configure two pipelines due the branches for "AddUpdate","Delete" and "Error" subjobs. Additional you can add and Extract Text component in order to get the content of every entry.
    
    <component name="AddUpdatePM" subType="pipeline" factoryName="aspire-application">
       <debug>${debug}</debug>
       <gatherStatistics>${debug}</gatherStatistics>
       <pipelines>
          <pipeline name="addUpdatePipeline" default="true">
             <script> 
                <![CDATA[
                   .....			
    	    ]]>
    	 </script>
          </pipeline>
       </pipelines>
    </component>
    
    <component name="DeletePM" subType="pipeline" factoryName="aspire-application">
       <debug>${debug}</debug>
       <gatherStatistics>${debug}</gatherStatistics>
       <pipelines>
          <pipeline name="deletePipeline" default="true">
             <script> 
                <![CDATA[
                   .....			
    	    ]]>
    	 </script>
          </pipeline>
       </pipelines>
    </component>
    
    <component name="ErrorPM" subType="pipeline" factoryName="aspire-application">
       <debug>${debug}</debug>
       <gatherStatistics>${debug}</gatherStatistics>
       <pipelines>
          <pipeline name="errorPipeline" default="true">
             <script> 
                <![CDATA[
                   .....			
    	    ]]>
    	 </script>
          </pipeline>
       </pipelines>
    </component>