This stage is primarily intended to split a parquet file containing a list of items and then to process each individual record one at a time, as sub-jobs on their own pipeline.
This stage takes a job which contains a data stream. It assumes that the data stream represents an parquet file and then process every entry in the file to create sub-job documents.
Due the nature of the parquet files, the component will not be monitoring for changes on previously processed files for changes.
Parquet Reader | |
---|---|
Factory Name | com.searchtechnologies.aspire:aspire-parquet-extractor |
subType | default |
Inputs | Job containing a data stream (object['contentStream'] which is a stream to the Archive File to process). |
Outputs | One subDocument for each entry in the parquet file, submitted as a subjob. |
Element | Type | Default | Description |
---|---|---|---|
debug | boolean | false | Show debug messages |
subJobTimeout | long | 600000 | timeout for subjobs in millis |
notStoreIds | boolean | false | If true it will NOT store deletes for future deletes |
noInfoMessages | boolean | false | If true it will not write info messages to log |
bulkSize | int | 1000 | The bulk size for NoSql |
bulkTimeout | long | 1000 | The bulk timeout for NoSql in ms |
<component name="ParquetSubJobExtractor" subType="default" factoryName="aspire-parquet-extractor"> <debug>${debug}</debug> <notStoreIds>${notStoreIds}</notStoreIds> <noInfoMessages>${noInfoMessages}</noInfoMessages> <bulkSize>${bulkSize}</bulkSize> <bulkTimeout>${bulkTimeout}</bulkTimeout> <branches> <branch event="onAddUpdateSubJob" pipelineManager="AddUpdatePM" batching="false" /> <branch event="onErrorSubJob" pipelineManager="ErrorPM" batching="false" /> <branch event="onDeleteSubJob" pipelineManager="DeletePM" batching="false" /> </branches> </component> Note that you need to configure two pipelines due the branches for "AddUpdate","Delete" and "Error" subjobs. Additional you can add and Extract Text component in order to get the content of every entry. <component name="AddUpdatePM" subType="pipeline" factoryName="aspire-application"> <debug>${debug}</debug> <gatherStatistics>${debug}</gatherStatistics> <pipelines> <pipeline name="addUpdatePipeline" default="true"> <script> <![CDATA[ ..... ]]> </script> </pipeline> </pipelines> </component> <component name="DeletePM" subType="pipeline" factoryName="aspire-application"> <debug>${debug}</debug> <gatherStatistics>${debug}</gatherStatistics> <pipelines> <pipeline name="deletePipeline" default="true"> <script> <![CDATA[ ..... ]]> </script> </pipeline> </pipelines> </component> <component name="ErrorPM" subType="pipeline" factoryName="aspire-application"> <debug>${debug}</debug> <gatherStatistics>${debug}</gatherStatistics> <pipelines> <pipeline name="errorPipeline" default="true"> <script> <![CDATA[ ..... ]]> </script> </pipeline> </pipelines> </component>