Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

AuthN/AuthZ with SSL

Authentication and authorization access can be enabled in StageR to restrict using REST APIs through HTTPS connections using valid client certificates.

Content Encryption and Key Managers

Encrypted that is stored in StageR is encrypted at the scope level and a enckeyid tag is added to each encrypted content scope.

Content Compression

Compressed content that is stored in StageR is compressed at the scope level and a isCompressed tag is added to each compressed content scope.

Content Processing

A storage unit has a set of events that are triggered during different actions performed while interacting with the storage unit. Content processing modules can be configured to be executed on these events. 

File Handlers

StageR can store binary files in a file storage independant from the database storage that is used to store JSON documents. The file handler is a pluggable module that provides StageR with the logic to store and read files from an external file storage application. 

Reprocessing and Automatic Updates

Content reprocessing provides the ability to execute content processing pipelines over content that is already in a storage unit. Reprocessing is done at the scope level of a storage unit.

Remote Replication

Remote replication allows a StageR storage unit to subscribe to a remote StageR storage unit and replicate the data locally.


Content Encryption and Key Managers

Content can be stored encrypted in StageR. If enabled, StageR uses the aes-256-cbc algorithm, of the NodeJS crypto library, using an Initialization Vector (IV) and a Data Encryption Key (DEK) provided by a key manager.

Content is encrypted at the scope level and a enckeyid tag is added to each encrypted content scope.

The option can be enabled/disabled per Storage Unit through the administration API.

Code Block
languagejs
themeRDark
PUT admin/enableContentEncryption/<storage-unit>/<true-false>

key manager is a pluggable module that provides data encryption keys (DEK) to the application. The internal implementation of the key manager is up to the users needs. The module needs to provide a way to get a DEK given a enckeyid.


Code Block
languagejs
themeRDark
var crypto = require('crypto')

function basicKeyManager (options) {
  var masterKey

  if (!options.masterKey) {
    throw new Error('options.masterKey is required')
  }

  masterKey = options.masterKey

  return {
    generateiv: function () {
      // IV needs to be 16 bytes
      return crypto.randomBytes(16).toString('base64')
    },
    generateKeyId: function () {
      return 'localKey'
    },
    getDek: function (storageUnitName, key, callback) {
      if (key === 'localKey') {
        return callback(null, masterKey)
      } else {
        return callback(new Error('Invalid Dek key: ' + key))
      }
    }
  }
}

module.exports = basicKeyManager

StageR provides 3 base key manager implementations:

  • Basic: Uses a single master DEK set as a configurable parameter.

Configuration:

Code Block
languagejs
themeRDark
keyManager:{
    type:'basic',
     basic:{
         masterKey:'MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMzI='
     }
 }
  • File Based Master key: a file containing a list of master keys to encrypt the DEKs that will be used to encrypt content. There will be a finite number (configurable) of DEKs per Storage Unit that will be stored in a Mongo database (DEK). The DEK table will storage the encrypted DEK, the version of the master key and the IV used to encrypt the DEK. The Master Key file location is set as a configurable parameter of this key manager.

File Example:

Code Block
languagetext
themeRDark
MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTE=   9
?MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTI    5
?MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTM    7

Configuration:

Code Block
languagejs
themeRDark
keyManager:{
    type:'filebased',
    keysNumber: 1000,
    filebased:{
        masterKeyLocation: 'config/MasterKey.txt'
    }
}
  • Hadoop KMS: uses Hadoop Key Management Server for DEK encryption. Based on a master key from KMS, the key manager uses this to generate new keys that will be used to encrypt the DEKs. There will be a finite number (configurable) of DEKs per Storage Unit that will be stored in a Mongo database (DEK). The DEK table will store the encrypted DEK, the iv, the master key and a proxy key/iv pair from KMS that were used to encrypt the DEK.

Configuration:

Code Block
languagejs
themeRDark
keyManager:{
    type:'clouderakms',
    keysNumber: 1000,
    clouderakms:{
        masterKey:'master_key_1',
        server: 'server-name',
        port: '16000',
        user: 'hdfs',
        sslEnabled: true,
        sslOptions: {
            keyLocation: './config/sslcerts/kms/sr_client_key.pem',
            certLocation: './config/sslcerts/kms/sr_client_cert.crt',
            caLocation: './config/sslcerts/kms/cacert.pem',
            passphrase: 'sibiu7$',
            requestCert: true,
            rejectUnauthorized: true
        }
    }
}

Content Compression

Content can be stored compressed in StageR. If enabled, StageR uses the zlib library from NodeJS for compression. Both JSON and raw files stored in StageR can be compressed.

Content is compressed at the scope level and a isCompressed tag is added to each compressed content scope.

The option can be enabled/disabled per Storage Unit through the administration API.

Code Block
languagejs
themeRDark
PUT admin/enableContentCompression/<storage-unit>/<true-false>

Content Processing

Storage Unit has a set of events that are triggered during different actions performed while interacting with the Storage Unit. Content processing modules can be configured to be executed on these events. A content processing module is a JavaScript file that implements one or more events.

There are two different types of events: per document events and general events.

Per document events are triggered for each content record that is added, updated, deleted or fetched.

  • PreAdd: this event is triggered before the content scope is stored (added or updated) in the Storage Unit.
  • Process: this event is triggered after the content scope is stored (added or updated) in the Storage Unit. This event is also triggered when a record is reprocessed (see Reprocess API).
  • PreDelete: this event is triggered before the content scope is deleted from the Storage Unit.
  • PostDelete: this event is triggered after the content scope is deleted from the Storage Unit.
  • Fetch: this event is triggered after the content scope is fetched from the Storage Unit.
  • User Defined Document Events: Users can define other types of document events by invoking a transaction/execute or transaction/batch call with a custom action (see Transaction API) and a reference to the record key.

General events are triggered directly by the Storage Unit when specific operations occur.

  • BatchStart: this event is triggered when a new batch is created during content ingestion or content reprocessing. When a batch is created, a batch variable is added to the execution context which can be accessed by records on document events.
  • BatchEnd: this event is triggered when a batch is completed during content ingestion or content reprocessing.
  • User Defined General Events: Users can define other types of general events by invoking a transaction/execute or transaction/batch calls with a custom action (see Transaction API) and no record key.

The content processing JavaScript modules are placed inside the processing_modules  folder of the StageR server. Each module can implement one or more of the event functions. The name of the function needs to be the name of the implemented event. A module can have functions of both general and per document events.

Per document events receive five parameters:

  • key: The id of the record.
  • content: A Javascript Object with the content of the scope of the record that is being processed.
  • context: A Javascript Object with configuration variables and references to utility functions.
  • settings: A Javascript Object with the available configuration properties for the module.
  • callback: A callback function that should be called when the per document event function completes its execution. Callback parameters are: errcontentcontext.
Code Block
languagejs
themeRDark
exports.Process = function(key, content, context, settings, callback){
    if (context.isBatch !== undefined && context.isBatch === true) {
        if (content) {
            context.batchArray.push({index: {_index: context.elasticSearch.index, _type: context.elasticSearch.type, _id: key}});
            context.batchArray.push(content);
        }
        callback(null, content, context);
    } else {
        initialize(settings, function(client, index, type) {
            client.index({
                index: index,
                type: type,
                id: key,
                body: content
            }, function (err) {
                client.close();
                callback(err, content, context);
            });
        });
    }
}; 

General events receive three parameters:

  • context: A Javascript Object with configuration variables and references to utility functions.
  • settings: A Javascript Object with the available configuration properties for the module.
  • callback: A callback function that should be called when the general event function completes its execution. Callback return parameters are: errcontext.
Code Block
languagejs
themeRDark
exports.BatchStart = function(context, settings, callback){
    initialize(settings, function(client, index, type) {
        context.batchArray = [];
        context.elasticSearch = {};
        context.elasticSearch.client = client;
        context.elasticSearch.index = index;
        context.elasticSearch.type = type;
        callback(null, context);
    });
};

The admin/setContentProcessingModules API call configures content processing modules and module settings for a storage unit. Content processing modules are configured per scope. A default list of modules can be configured for scopes that are not explicitly defined. Each module can define its own list of settings. When executing the events for each module, these will be executed in the order in which they appear in the configuration array.

Content processing modules configuration consists of lists of modules for each scope and general settings.

Code Block
languagejs
themeRDark
{
    "modules" : {
        "connector": [ 
            {
                "module" : "FieldMapping"
            },
            {
                "settings" : {
                    "elasticsearch-index" : "aspiredocs",
                    "elasticsearch-type" : "aspiredoc"
                },
                "module" : "ESPublisher"
            }
        ], 
        "index" : [ 
            {
                "module" : "FieldMapping"
            }, 
            {
                "settings" : {
                    "elasticsearch-index" : "researchdocs",
                    "elasticsearch-type" : "researchdoc"
                },
                "module" : "ESPublisher"
            }
        ],
        "research" : [ 
            {
                "module" : "NormalizeCategory"
            }
        ]
    },
    "settings" : {
        "elasticsearch-port" : 9200,
        "elasticsearch-server" : "localhost"
    }
}

In this example the connector scope will execute events from AspireFieldMapping and ESPublisher in that order; index scope will execute FieldMapping and ESPublisher and; research scope will execute NormalizeCategory. For each event, the application will look for an implementation of that event on each content processing module, if it is available it will execute the event and move to the next module to find the same function event to execute and so on.

General events will usually be used to initialize common configuration to be used by document events, this configuration can be set in the context variable and it will be shared with all document events that belong to the same general event, for example all documents that belong to a batch will receive the same context variable that BatchStart returns, and BatchEnd will receive this same context variable with any modifications that could have been done by other events to the data/configuration of the context variable.

Foreign Key Joins

StageR provides a content processing module ForeignKeyJoin for automatic merging of records from different storage units based on record keys. 1-to-N relations can be specified between storage unit records. If configured,ForeignKeyJoin will run on Process and Fetch events of each document for the specified scope.

The content of the record needs to define a field with the following format:

Code Block
languagejs
themeRDark
"foreignKeys": {
    "FOREIGN_KEY_NAME_1": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
    "FOREIGN_KEY_NAME_2": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
    ...
    "FOREIGN_KEY_NAME_N": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
}

Output:

Code Block
languagejs
themeRDark
"PRIMARY_RECORD_SCOPE":{
    ...
    ...
    ...
    FOREIGN_KEY_NAME_1:[
        {
            RECORD_ID_1_FOREIGN_SCOPE_DATA
        },
        {
            RECORD_ID_2_FOREIGN_SCOPE_DATA
        },
        ...
        {
            RECORD_ID_N_FOREIGN_SCOPE_DATA
        }
    ]
}

Example:

  • Record with foreign key references:
Code Block
languagejs
themeRDark
"connector":{
    "url": "file:///server/myfolder/file1.txt",
    "content":"test content",
    "foreignKeys": {
        "acls": {
            "storageUnit": "DocAcls",
            "scope": "acls",
            "ids": [
                "1",
                "3",
                "7"
            ]
        }
    }
}
  • Foreign Key Records:
Code Block
languagejs
themeRDark
{key:"1", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "user1","type": "user"}}},
{key:"2", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group2","type": "group"}}},
{key:"3", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group3","type": "group"}}},
{key:"4", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group4","type": "group"}}},
{key:"5", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group5","type": "group"}}},
{key:"6", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group6","type": "group"}}},
{key:"7", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group7","type": "group"}}}
  • Output:
Code Block
languagejs
themeRDark
"connector":{
    "url": "file:///server/myfolder/file1.txt",
    "content":"test content",
    "acls": [
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "user1",
            "type": "user"
        },
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "group3",
            "type": "group"
        },
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "group7",
            "type": "group"
        }
    ]
}

To configure the ForeignKeyJoin module, add it to the scope's content processing configuration and make sure the scope content contains the foreignKeys field.

Code Block
languagejs
themeRDark
POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules": {
        "connector": [
            {
                "module": "ForeignKeyJoin"
            },
            ...
        ]
    }
}


With this configuration, foreign key merges will happen for the connector scope on any Process or Fetch event.

Publishers

ElasticSearch Publisher

StageR provides a content processing module to publish content from a storage unit to Elasticsearch. This publisher triggers on Process and PostDelete events and will publish each content record as a new document to Elasticsearch using the record key as the id in the search engine.

Configure the Elasticsearch publisher by adding the module called HTTPESPublisher through the admin/setContentProcessingModules API call.

Code Block
languagejs
themeRDark
POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "elasticsearch-index" : "aspiredocs",
                    "elasticsearch-type" : "aspiredoc"
                },
                "module" : "HTTPESPublisher"
            }
        ],
        ...
    },
    "settings" : {
        "elasticsearch-hosts" : localhost:9200"
    }
}

The configuration above will publish to Elasticsearch (located in localhost:9200) all documents from the connector scope to the aspiredocs index as aspiredoc document type.

Solr Publisher

StageR provides a content processing module to publish content from a storage unit to Solr. This publisher triggers on Process and PostDelete events and will publish each content record as a new document to Solr using the record key as the id in the search engine.

Configure the Solr publisher by adding the module called SolrPublisher through the admin/setContentProcessingModules API call.

Code Block
languagetext
themeRDark
POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "solr-collection" : "testcollection"
                },
                "module" : "SolrPublisher"
            }
        ],
        ...
    },
    "settings" : {
        "solr-hosts" : localhost:8983"
    }
}

The configuration above will publish to Solr (located in localhost:8983) all documents from the connector scope to the testcollection index.

File Handlers

StageR can store binary files in a file storage independant from the database storage which is used to store JSON documents. The file handler is a pluggable module that provides StageR with the logic to store and read files from an external file storage application. The internal implementation of the file handler and the supporting file storage application used is up to the users needs. The module needs to provide a way to read and write file streams to the underlying file storage application.

StageR provides a simple file handler which uses a local file system folder its file storage application.

Use the sample code (of the local file handler) below as a template to create new file handlers.

Code Block
languagetext
themeRDark
'use strict'

var crypto = require('crypto')
var async = require('async')
var util = require('util')
var zlib = require('zlib')
var constants = require('../constants/general.server.constants')
var fs = require('fs-extra')
var path = require('path')

function localFileHandler (options) {
  options = options || {}
  var hashKey = options.hashkey || function (value) {
    return crypto.createHash('md5').update(value).digest('hex')
  }
  if (!options.encryption) {
    throw new Error('options.encryption is required')
  }
  var encryption = options.encryption
  if (!options.destination) {
    throw new Error('options.destination is required')
  }
  var destination = options.destination

  return {
    getFolderDestination: function (storageUnit, scope, key, callback) {
      var filePrefix = hashKey(key)
      var firstLevel = filePrefix.substring(0, 3)
      var secondLevel = filePrefix.substring(3, 6)
      var storeFileLocation = path.join(destination, storageUnit, scope, firstLevel, secondLevel)
      try {
        fs.existsSync(storeFileLocation) || fs.mkdirsSync(storeFileLocation)
      } catch (err) {
        return callback(err)
      }
      return callback(null, storeFileLocation)
    },
    saveStream: function (storageUnit, scope, key, filename, mimetype, stream, encrypt, compress, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
        if (err) {
          return callback(err)
        }

        var encodedFilename = hashKey(key) + '-' + filename
        var finalPath = path.join(destinationFolder, encodedFilename)
        var outStream = fs.createWriteStream(finalPath)

        async.waterfall([
          function (wfNext) {
            if (encrypt === true) {
              var encKey = encryption.generateKeyId()
              var encIv = encryption.generateiv()
              encryption.getCipher(storageUnit, encKey, encIv, function (err, cipher) {
                wfNext(err, cipher, encKey, encIv)
              })
            } else {
              wfNext(null, null, null, null)
            }
          },
          function (cipher, encKey, encIv, wfNext) {
            if (compress === true) {
              stream = stream.pipe(zlib.createDeflate())
            }
            if (encrypt === true) {
              stream = stream.pipe(cipher)
            }
            stream.pipe(outStream)
            outStream.on('error', wfNext)
            outStream.on('finish', function () {
              wfNext(null, destinationFolder, finalPath, outStream.bytesWritten, encKey, encIv, (compress === true))
            })
          }
        ], callback)
      })
    },
    getStream: function (storageUnit, scope, key, srFileId, encKey, encIv, isCompressed, callback) {
      var self = this
      async.waterfall([
        function (wfNext) {
          self.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
            if (err) {
              return wfNext(err)
            }
            var encodedFilename = hashKey(key) + '-' + srFileId
            var fileLocation = path.join(destinationFolder, encodedFilename)
            var fileStream = fs.createReadStream(fileLocation)
            wfNext(null, fileStream)
          })
        },
        function (fileStream, wfNext) {
          if (!util.isNullOrUndefined(encKey)) {
            encryption.getDecipher(storageUnit, encKey, encIv, function (err, decipher) {
              wfNext(err, fileStream.pipe(decipher))
            })
          } else {
            wfNext(null, fileStream)
          }
        }, function (fileStream, wfNext) {
          if (isCompressed === true) {
            wfNext(null, fileStream.pipe(zlib.createInflate()))
          } else {
            wfNext(null, fileStream)
          }
        }], callback)
    },
    deletePath: function (path, callback) {
      if (callback) {
        return fs.remove(path, callback)
      }
      return fs.removeSync(path)
    },
    deleteFileById: function (storageUnit, scope, key, srFileId, callback) {
      var self = this
      self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var filePath = path.join(destination, hashedKey + '-' + srFileId)
        self.deletePath(filePath, callback)
      })
    },
    deleteFilesByKey: function (storageUnit, scope, key, callback) {
      var self = this
      var hashedKey = hashKey(key)
      if (scope === constants.recordScope) {
        var filePath = path.join(destination, storageUnit, '*', '*', '*', hashedKey + '-*')
        self.deletePath(filePath, callback)
      } else {
        self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
          if (err) {
            return callback(err)
          }
          var filePath = path.join(destination, hashedKey + '-*')
          self.deletePath(filePath, callback)
        })
      }
    },
    deleteFilesByScope: function (storageUnit, scope, callback) {
      var scopeLocation = path.join(destination, storageUnit, scope)
      this.deletePath(scopeLocation, callback)
    },
    deleteFilesByStorageUnit: function (storageUnit, callback) {
      var storageUnitLocation = path.join(destination, storageUnit)
      this.deletePath(storageUnitLocation, callback)
    },
    changeFileNameToId: function (storageUnit, scope, key, filename, srFileId, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var inFilePath = path.join(destination, hashedKey + '-' + filename)
        var outFilePath = path.join(destination, hashedKey + '-' + srFileId)
        fs.rename(inFilePath, outFilePath, function (err) {
          callback(err, outFilePath)
        })
      })
    },
    fileExists: function (path, callback) {
      if (callback) {
        return fs.exists(path, callback)
      }
      return fs.existsSync(path)
    }
  }
}

module.exports = localFileHandler

Anchor
ReprocessingQueue
ReprocessingQueue
Reprocessing and Automatic Updates

Content reprocessing is a key feature of StageR. It provides the ability to execute content processing pipelines over content that is already stored in a storage unit. Reprocessing is done at the scope level of a storage unit.

Records can be reprocessed through the reprocess API calls which allow for single key, multiple key and whole scope reprocessing. Items selected for reprocessing are put in a queue that is then consumed by reprocessing worker processes. Multiple reprocessing workers can be configured for a single installation.

Reprocessing will execute the content processing modules configured for the specified scope of the key(s).

To configure reprocessing, specify reprocessQueue properties under reprocessQueue (batchSize: number of documents to process as a batch, querySize: number of records by reprocess worker to retrieve at a time, timeout: wait timeout when there are no items in the queue) and define the number of reprocessing workers on the StageR configuration file:

Code Block
languagetext
themeRDark
{
  ...,
  reprocessQueue: {
    batchSize: 20,
    querySize: 40,
    timeout: 5000
  },
  workers: {
    restapi: 1,
    reprocess: 4,
    replication: 1
  },
  ...
}

Specify the content processing modules, for the scope to be reprocessed, using the admin/setContentProcessingModules API call:

Code Block
languagetext
themeRDark
POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "solr-collection" : "testcollection"
                },
                "module" : "SolrPublisher"
            }
        ]
    },
    "settings" : {
        "solr-hosts" : localhost:8983"
    }
}

Enable the reprocessing queue at the storage unit level. This will register the storage unit to be scanned by the reprocess workers.

Code Block
languagetext
themeRDark
PUT admin/enableReprocessingQueue/STORAGE_UNIT/true

Automatic Updates

When foreign keys are defined for a content record and the ForeignKeyJoin processing module is defined for a storage unit, a foreign key lookup table is created in StageR which allows the foreign item to know which primary items are referencing it, allowing for any updates to the foreign item to trigger an automatic update (send the keys to reprocess) of the primary keys.

As long as the reprocessing queue for the primary item's storage unit is enabled and there are reprocessing workers configured, automatic updates will be triggered.

Remote Replication

Remote replication allows a StageR storage unit to subscribe to a remote StageR storage unit and replicate the data locally. When a storage unit is subscribed for remote replication, the remote server will be sending all of its transactions to the subscribed storage unit, this way, the subscribing storage unit will know exactly how to replicate the incoming data.

Remote replication subscriptions are managed by the replication API. When specifying a remote subscription, a zoneId specified on the replication calls will be assigned to all incoming item keys as a prefix to distinguish items from the remote server with the ones created locally, if any. 

To configure remote replication, specify the replication properties (interval: wait timeout when no more transactions were available, querySize: number of records to retrieve at a time) and define the number of replication workers on the StageR configuration file:

Code Block
languagetext
themeRDark
{
  ...,
  replication: {
    interval: 15000,
    querySize: 40
  },
  workers: {
    restapi: 1,
    reprocess: 1,
    replication: 4
  },
  ...
}