The RDB Feeder component feeds AspireObjects from an RDB and can operate in full feed or incremental mode.

The current default is to feed records at one per second. Set <feedWait> to "0" (zero) to feed as fast as possible.

Note: This component is not the same as the RDB Sub Job Feeder

This feeder is based on the Simple Feeder

RDB Feeder
Factory Namecom.searchtechnologies.aspire:aspire-rdbfeeder
subTypedefault
InputsRDB
OutputsSeparate sub jobs for each RDB record, where each sub-job contains an AspireObject containing data from the RDB, published to the configured pipeline manager.

Full feed

Full mode uses SQL from the configuration and executes this against the database configured via the RDB connection stage. Each resulting row is formed in to an Aspire document using the column names as document elements and this document is submitted to a pipeline manager using the event configured for inserts. As the document is created, the value of the column identified in the configuration as the primary key is inserted in to the fetchUrl element of the document. The value insert will be placed in the action attribute of the document.

Example AspireObject from a full feed

 <doc action="insert">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE>
   <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID>
   <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders</PG_WEBSITE>
   <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Incremental feed

Incremental mode also uses SQL from the configuration file. A "pre-update" command is run (optionally), followed by a command to extract the data. Once jobs return from the pipeline, a "post-update" command is run for the individual job.

In order to handle updates:

  1. Configure a queue table in your database. (The actual name of the table can be anything, as long as it is referenced consistently in the SQL statements below.)
    When a row changes in the table, a trigger should fire and insert a row into this queue table.
    1. The queue table must have (at least) four columns: sequence, id, action and status.
      (The columns can be named anything; the first three are set using config parameters, and status is used only in the SQL statements below.)
      1. The sequence column is a unique incrementing ID for this change.
      2. id indicates the ID (primary key) of the row in the main data table that changed.
      3. action is a single character indicating the type of change (I for insert, U for update, and D for delete, or N for no change).
      4. status is a flag indicating the current status of the update (suggested values are W for waiting to be processed, I for in progress, and C for completed.
  2. Your "pre-update" SQL can then update the status (from W to I) for updates you wish to process on this pass. This could be all updates that are not complete, or the first n.
  3. Your data extraction SQL should then join this update table with the main data table,
    1. selecting the columns you wish to index (those with a status of I) and the id, sequence and action at a minimum,
    2. ordering the rows by increasing sequence number.
      The column names returned for id, sequence and action must match the column names specified in the <id>, <sequence> and <action> parameters - you may need to use the SQL AS construct.
  4. AspireObjects will then be formed for each row returned and submitted to the pipeline manager using the event defined for the appropriate action.
    1. You can configure events for insert, update and delete, which means that these actions can use differing pipelines.
      1. As the document is created, the value of the column identified (in the configuration) as the primary key is inserted into the fetchUrl element of the document.
      2. The extracted action value ('insert', 'update', or 'delete') will be placed in the action attribute of the document.
  5. The feeder will ensure that jobs created for a particular document id are finished before another for the same id is started, but jobs for differing ids could be submitted out of sequence order.
  6. When a job is completed successfully, the "post-process" SQL will be executed.
    1. This SQL will have the token :SEQ replaced with the sequence number of the job that has completed.
    2. This SQL should update the status in the queue table (from I to C) to indicate the update has been processed.
      Alternative Process: If you wish to keep the number of rows in the queue table to a minimum, use the "post-process" to delete the row.
      1. You may not need to use the "pre-process" SQL. In this case, this could be omitted from the configuration.

Example AspireObject for an update

 <doc action="update">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <ENTITY_TYPE source="RDBFeederImpl">productgroup</ENTITY_TYPE>
   <PG_UUID source="RDBFeederImpl">uuid val 1</PG_UUID>
   <PG_WEBSITE source="RDBFeederImpl">bbc.co.uk/eastenders/test</PG_WEBSITE>
   <PG_TYPE source="RDBFeederImpl">Series</PG_TYPE>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Example AspireObject for a delete

 <doc action="delete">
   <ID source="RDBFeederImpl">1</ID>
   <fetchUrl source="RDBFeederImpl">1</fetchUrl>
   <PG_ID source="RDBFeederImpl">1</PG_ID>
   <feederLabel source="RDBFeederImpl">EM3FullFeeder</feederLabel>
   <feederType source="RDBFeederImpl">RDBFeeder</feederType>
 </doc>

Configuration

This feeder takes all parameters from the Simple Feeder plus the following:

ElementTypeDefaultDescription
feederLabelstringRDBFeedThe feeder label submitted in the <feederLabel> of the published document.
rdbLocationStringrdbThe Aspire component name/path to the RDB service, which is an RDB Connection component which maintains a pool of RDB connections.
id/@columnStringNoneThe name of the column that holds the primary key (id) for the row.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

sequence/@columnStringNoneThe name of the column that holds the sequence from the queue table.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

action/@columnStringNoneThe name of the column that holds the action from the queue table.

NOTE: This must match the column name returned in the sql/update parameter. You may need to use the SQL AS construct.

sqlStringNoneThe various pieces of SQL to run. See below.
eventsStringSee belowThe events to publish documents against. See below.

SQL Configuration

The RDB feeder uses configurable SQL to specify what to feed.

ElementTypeDefaultDescription
sql/preUpdateStringNoneThe SQL to be run before an incremental update takes place. (optional)
sql/updateStringNoneThe SQL to be run to extract data for the incremental update. Should include the id, sequence and action as a minimum.
sql/postUpdateStringNoneThe SQL to be run as jobs for an incremental complete. The token :SEQ will be replaced by the sequence for the update that completed.
sql/fullSelectStringNoneThe SQL to be run to extract data for the full feed. (optional)

Event Configuration

The RDB feeder publishes differing actions against differing events meaning different pipelines can be used to handle different actions.

ElementTypeDefaultDescription
events/insert/@eventStringonPublishThe event to publish against for an insert.
events/update/@eventStringonPublishThe event to publish against for an update.
events/delete/@eventStringonDeleteThe event to publish against for a delete.

Branch Configuration

The RDB feeder publishes documents using the branch manager. It publishes using the events configured above. You must therefore include <branches> for these events in the configuration to publish to a pipeline within a pipeline manager. See Branch Handler for more details.

Example Configurations

Simple Configuration

 <component name="EM3FullFeeder" subType="default" factoryName="aspire-rdbfeeder">
   <branches>
     <branch event="onPublish" pipelineManager="/system/standard-pipe-manager"/>
     <branch event="onDelete" pipelineManager="/system/standard-pipe-manager" pipeline="delete-pipe">
   </branches>
   <feedWait>0</feedWait>
   <id column="ID"/>
   <sequence column="SEQ"/>
   <action column="ACTION"/>
   <sql>
     <preUpdate><![CDATA[update queue set status='I' where seq in 
                           (select top 3 seq from queue where status!='I' order by seq asc)]]></preUpdate>
     <update><![CDATA[select queue.seq AS SEQ, queue.id AS ID, queue.action AS ACTION, main.col1, main.col2, main.col3
                        from main right outer join queue
                        on main.id = queue.id
                        where queue.status = 'I'
                        order by queue.seq asc]]></update>
     <postUpdate><![CDATA[update queue set status='C' where seq = :SEQ]]></postUpdate>
     <fullSelect><![CDATA[select main.id, main.col1, main.col2, main.col3 from main]]></fullSelect>
   </sql>
 </component>

Simple Configuration: Only Full Feeds

This example shows a simple configuration where only full re-feeds are required (i.e. no incremental updates).

Note: ROWNUM column shown below is only appropriate for Oracle.

 <component name="CVsFullFeeder" subType="default" factoryName="aspire-rdbfeeder">
   <rdbLocation>RDBConnection</rdbLocation>
   <branches>
     <branch event="onPublish" pipelineManager="ProcessCVsBatchXMLFile"/>
   </branches>
   <id column="applicant_id"/>
   <feedWait>0</feedWait>
   <sql>
     <fullSelect><![CDATA[
       SELECT applicant_id, roles, main_skills, text_cv
       FROM
       ( 
         select applicant_id, roles, main_skills, text_cv from stcvs
       )
       WHERE ROWNUM <= 25
     ]]></fullSelect>
   </sql>
 </component>

Complex Configuration

  <component name="FeedRealTimeUpdates" subType="default" factoryName="aspire-rdbfeeder">
     <feederLabel>FeedRealTimeUpdates</feederLabel>
     <rdbLocation>RDBConnection</rdbLocation>
     <branches>
       <branch event="onPublish"  pipelineManager="ProcessRegistrant" pipeline="process-registrant-action"/>
       <branch event="onDelete"  pipelineManager="ProcessRegistrant" pipeline="process-registrant-delete"/>
     </branches>
     <loopWait>60000</loopWait>
     <feedWait>0</feedWait>
     <id column="REGISTRATION_ID"/>
     <sequence column="UPDATE_ID"/>
     <action column="ACTION"/>
     <sql>
       <preUpdate>
         <![CDATA[
         update REGISTRANT_UPDATES set status = 'I' where status = 'W'
         ]]>
       </preUpdate>
       <update>
         <![CDATA[
         SELECT queue.update_id AS UPDATE_ID, queue.action AS ACTION,
                      'true' AS USE_COMMIT_WITHIN, 
                      r.REGISTRATION_ID,     
                              FIRST_NAME,
                              CURRENT_LAST_NAME,
                              CURRENT_STATE,
                              to_char(BIRTH_DATE,'yyyy-mm-dd') AS BIRTH_DATE, 
                              GENDER,
                              DISPLAY_STATE,
                              to_char(CREATION_DATE,'yyyy-mm-dd') AS CREATION_DATE,
                              to_char(LAST_UPDATE_DATE,'yyyy-mm-dd') AS LAST_UPDATE_DATE
                       FROM   registrations r right outer join REGISTRANT_UPDATES queue
                              on r.registration_id = queue.registration_id   
                       WHERE  DISPLAY_STATE >= 0  AND  DISPLAY_STATE <= 2
                              and queue.status = 'I'
                       order by queue.update_id asc
         ]]>
       </update>
       <postUpdate>
         <![CDATA[
         UPDATE REGISTRANT_UPDATES set status = 'C'
                       WHERE update_id = :SEQ
                             and status = 'I'
         ]]>
       </postUpdate>
     </sql>
   </component>


  • No labels