The RDB Feeder component feeds AspireObjects from an RDB and can operate in full feed or incremental mode.
The current default is to feed records at one per second. Set <feedWait> to "0" (zero) to feed as fast as possible.
Note: This component is not the same as the RDB Sub Job Feeder
This feeder is based on the Simple Feeder
RDB Feeder | |
---|---|
Factory Name | com.searchtechnologies.aspire:aspire-rdbfeeder |
subType | default |
Inputs | RDB |
Outputs | Separate sub jobs for each RDB record, where each sub-job contains an AspireObject containing data from the RDB, published to the configured pipeline manager. |
Full mode uses SQL from the configuration and executes this against the database configured via the RDB connection stage. Each resulting row is formed in to an Aspire document using the column names as document elements and this document is submitted to a pipeline manager using the event configured for inserts. As the document is created, the value of the column identified in the configuration as the primary key is inserted in to the fetchUrl element of the document. The value insert will be placed in the action attribute of the document.
<doc action="insert"> <ID source="RDBFeederImpl">1</ID> <fetchUrl source="RDBFeederImpl">1</fetchUrl> <PG_ID source="RDBFeederImpl">1</PG_ID> <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE> <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID> <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders</PG_WEBSITE> <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE> <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel> <feederType source="RDBFeederImpl">RDBFeeder</feederType> </doc>
Incremental mode also uses SQL from the configuration file. A "pre-update" command is run (optionally), followed by a command to extract the data. Once jobs return from the pipeline, a "post-update" command is run for the individual job.
In order to handle updates:
<doc action="update"> <ID source="RDBFeederImpl">1</ID> <fetchUrl source="RDBFeederImpl">1</fetchUrl> <PG_ID source="RDBFeederImpl">1</PG_ID> <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE> <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID> <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders/test</PG_WEBSITE> <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE> <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel> <feederType source="RDBFeederImpl">RDBFeeder</feederType> </doc>
<doc action="delete"> <ID source="RDBFeederImpl">1</ID> <fetchUrl source="RDBFeederImpl">1</fetchUrl> <PG_ID source="RDBFeederImpl">1</PG_ID> <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel> <feederType source="RDBFeederImpl">RDBFeeder</feederType> </doc>
This feeder takes all parameters from the Simple Feeder plus the following:
Element | Type | Default | Description |
---|---|---|---|
feederLabel | string | RDBFeed | The feeder label submitted in the <feederLabel> of the published document. |
rdbLocation | String | rdb | The Aspire component name/path to the RDB service, which is an RDB Connection component which maintains a pool of RDB connections. |
id/@column | String | None | The name of the column that holds the primary key (id) for the row. NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct. |
sequence/@column | String | None | The name of the column that holds the sequence from the queue table. NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct. |
action/@column | String | None | The name of the column that holds the action from the queue table. NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct. |
sql | String | None | The various pieces of SQL to run. See below. |
events | String | See below | The events to publish documents against. See below. |
The RDB feeder uses configurable SQL to specify what to feed.
Element | Type | Default | Description |
---|---|---|---|
sql/preUpdate | String | None | The SQL to be run before an incremental update takes place. (optional) |
sql/update | String | None | The SQL to be run to extract data for the incremental update. Should include the id, sequence and action as a minimum. |
sql/postUpdate | String | None | The SQL to be run as jobs for an incremental complete. The token :SEQ will be replaced by the sequence for the update that completed. |
sql/fullSelect | String | None | The SQL to be run to extract data for the full feed. (optional) |
The RDB feeder publishes differing actions against differing events meaning different pipelines can be used to handle different actions.
Element | Type | Default | Description |
---|---|---|---|
events/insert/@event | String | onPublish | The event to publish against for an insert. |
events/update/@event | String | onPublish | The event to publish against for an update. |
events/delete/@event | String | onDelete | The event to publish against for a delete. |
The RDB feeder publishes documents using the branch manager. It publishes using the events configured above. You must therefore include <branches> for these events in the configuration to publish to a pipeline within a pipeline manager. See Branch Handler for more details.
<component name="EM3FullFeeder" subType="default" factoryName="aspire-rdbfeeder"> <branches> <branch event="onPublish" pipelineManager="/system/standard-pipe-manager"/> <branch event="onDelete" pipelineManager="/system/standard-pipe-manager" pipeline="delete-pipe"> </branches> <feedWait>0</feedWait> <id column="ID"/> <sequence column="SEQ"/> <action column="ACTION"/> <sql> <preUpdate><![CDATA[update queue set status='I' where seq in (select top 3 seq from queue where status!='I' order by seq asc)]]></preUpdate> <update><![CDATA[select queue.seq AS SEQ, queue.id AS ID, queue.action AS ACTION, main.col1, main.col2, main.col3 from main right outer join queue on main.id = queue.id where queue.status = 'I' order by queue.seq asc]]></update> <postUpdate><![CDATA[update queue set status='C' where seq = :SEQ]]></postUpdate> <fullSelect><![CDATA[select main.id, main.col1, main.col2, main.col3 from main]]></fullSelect> </sql> </component>
This example shows a simple configuration where only full re-feeds are required (i.e. no incremental updates).
Note: ROWNUM column shown below is only appropriate for Oracle.
<component name="CVsFullFeeder" subType="default" factoryName="aspire-rdbfeeder"> <rdbLocation>RDBConnection</rdbLocation> <branches> <branch event="onPublish" pipelineManager="ProcessCVsBatchXMLFile"/> </branches> <id column="applicant_id"/> <feedWait>0</feedWait> <sql> <fullSelect><![CDATA[ SELECT applicant_id, roles, main_skills, text_cv FROM ( select applicant_id, roles, main_skills, text_cv from stcvs ) WHERE ROWNUM <= 25 ]]></fullSelect> </sql> </component>
<component name="FeedRealTimeUpdates" subType="default" factoryName="aspire-rdbfeeder"> <feederLabel>FeedRealTimeUpdates</feederLabel> <rdbLocation>RDBConnection</rdbLocation> <branches> <branch event="onPublish" pipelineManager="ProcessRegistrant" pipeline="process-registrant-action"/> <branch event="onDelete" pipelineManager="ProcessRegistrant" pipeline="process-registrant-delete"/> </branches> <loopWait>60000</loopWait> <feedWait>0</feedWait> <id column="REGISTRATION_ID"/> <sequence column="UPDATE_ID"/> <action column="ACTION"/> <sql> <preUpdate> <![CDATA[ update REGISTRANT_UPDATES set status = 'I' where status = 'W' ]]> </preUpdate> <update> <![CDATA[ SELECT queue.update_id AS UPDATE_ID, queue.action AS ACTION, 'true' AS USE_COMMIT_WITHIN, r.REGISTRATION_ID, FIRST_NAME, CURRENT_LAST_NAME, CURRENT_STATE, to_char(BIRTH_DATE,'yyyy-mm-dd') AS BIRTH_DATE, GENDER, DISPLAY_STATE, to_char(CREATION_DATE,'yyyy-mm-dd') AS CREATION_DATE, to_char(LAST_UPDATE_DATE,'yyyy-mm-dd') AS LAST_UPDATE_DATE FROM registrations r right outer join REGISTRANT_UPDATES queue on r.registration_id = queue.registration_id WHERE DISPLAY_STATE >= 0 AND DISPLAY_STATE <= 2 and queue.status = 'I' order by queue.update_id asc ]]> </update> <postUpdate> <![CDATA[ UPDATE REGISTRANT_UPDATES set status = 'C' WHERE update_id = :SEQ and status = 'I' ]]> </postUpdate> </sql> </component>