You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Documentation Under construction


AuthN/AuthZ with SSL

Authentication and authorization access can be enabled in StageR to restrict use of the REST APIs through HTTPS connections using valid client certificates.

To enable secure access the application requires a valid server certificate/private key for the server hosting StageR (registered to the name of the server) and the certificate authority (CA) certificate. Clients applications will require a valid client certificate issued by the certificate authority. Authorized client certificates need to be registered through the security API.

Configuration

Specify the certificate configuration and the client certificate name of the default administration user under the secure configuration property.

{
  ...,
  secure: {
    adminUser: 'TestUser',
    keyLocation: 'config/sslcerts/server/server.key',
    certLocation: 'config/sslcerts/server/server.crt',
    caLocation: 'config/sslcerts/ca/ca.crt',
    passphrase: '123456',
    requestCert: true,
    rejectUnauthorized: false
  },
  ...
}

The adminUser has all permission rights over the administration and security APIs; it can add new administration users and assign user permissions (read/write) to different Storage Units through the security API.  

Content Encryption and Key Managers

Content can be stored encrypted in StageR. If enabled, StageR uses the aes-256-cbc algorithm, of the NodeJS crypto library, using an Initialization Vector (IV) and a Data Encryption Key (DEK) provided by a key manager.

Content is encrypted at the scope level and a enckeyid tag is added to each encrypted content scope.

The option can be enabled/disabled per Storage Unit through the administration API.

PUT admin/enableContentEncryption/<storage-unit>/<true-false>

key manager is a pluggable module that provides data encryption keys (DEK) to the application. The internal implementation of the key manager is up to the users needs. The module needs to provide a way to get a DEK given a enckeyid.


var crypto = require('crypto')

function basicKeyManager (options) {
  var masterKey

  if (!options.masterKey) {
    throw new Error('options.masterKey is required')
  }

  masterKey = options.masterKey

  return {
    generateiv: function () {
      // IV needs to be 16 bytes
      return crypto.randomBytes(16).toString('base64')
    },
    generateKeyId: function () {
      return 'localKey'
    },
    getDek: function (storageUnitName, key, callback) {
      if (key === 'localKey') {
        return callback(null, masterKey)
      } else {
        return callback(new Error('Invalid Dek key: ' + key))
      }
    }
  }
}

module.exports = basicKeyManager

StageR provides 3 base key manager implementations:

  • Basic: Uses a single master DEK set as a configurable parameter.

Configuration:

keyManager:{
    type:'basic',
     basic:{
         masterKey:'MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMzI='
     }
 }
  • File Based Master key: a file containing a list of master keys to encrypt the DEKs that will be used to encrypt content. There will be a finite number (configurable) of DEKs per Storage Unit that will be stored in a Mongo database (DEK). The DEK table will storage the encrypted DEK, the version of the master key and the IV used to encrypt the DEK. The Master Key file location is set as a configurable parameter of this key manager.

File Example:

MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTE=   9
?MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTI    5
?MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTM    7

Configuration:

keyManager:{
    type:'filebased',
    keysNumber: 1000,
    filebased:{
        masterKeyLocation: 'config/MasterKey.txt'
    }
}
  • Hadoop KMS: uses Hadoop Key Management Server for DEK encryption. Based on a master key from KMS, the key manager uses this to generate new keys that will be used to encrypt the DEKs. There will be a finite number (configurable) of DEKs per Storage Unit that will be stored in a Mongo database (DEK). The DEK table will store the encrypted DEK, the iv, the master key and a proxy key/iv pair from KMS that were used to encrypt the DEK.

Configuration:

keyManager:{
    type:'clouderakms',
    keysNumber: 1000,
    clouderakms:{
        masterKey:'master_key_1',
        server: 'server-name',
        port: '16000',
        user: 'hdfs',
        sslEnabled: true,
        sslOptions: {
            keyLocation: './config/sslcerts/kms/sr_client_key.pem',
            certLocation: './config/sslcerts/kms/sr_client_cert.crt',
            caLocation: './config/sslcerts/kms/cacert.pem',
            passphrase: 'sibiu7$',
            requestCert: true,
            rejectUnauthorized: true
        }
    }
}

Content Compression

Content can be stored compressed in StageR. If enabled, StageR uses the zlib library from NodeJS for compression. Both JSON and raw files stored in StageR can be compressed.

Content is compressed at the scope level and a isCompressed tag is added to each compressed content scope.

The option can be enabled/disabled per Storage Unit through the administration API.

PUT admin/enableContentCompression/<storage-unit>/<true-false>

Content Processing

Storage Unit has a set of events that are triggered during different actions performed while interacting with the Storage Unit. Content processing modules can be configured to be executed on these events. A content processing module is a JavaScript file that implements one or more events.

There are two different types of events: per document events and general events.

Per document events are triggered for each content record that is added, updated, deleted or fetched.

  • PreAdd: this event is triggered before the content scope is stored (added or updated) in the Storage Unit.
  • Process: this event is triggered after the content scope is stored (added or updated) in the Storage Unit. This event is also triggered when a record is reprocessed (see Reprocess API).
  • PreDelete: this event is triggered before the content scope is deleted from the Storage Unit.
  • PostDelete: this event is triggered after the content scope is deleted from the Storage Unit.
  • Fetch: this event is triggered after the content scope is fetched from the Storage Unit.
  • User Defined Document Events: Users can define other types of document events by invoking a transaction/execute or transaction/batch call with a custom action (see Transaction API) and a reference to the record key.

General events are triggered directly by the Storage Unit when specific operations occur.

  • BatchStart: this event is triggered when a new batch is created during content ingestion or content reprocessing. When a batch is created, a batch variable is added to the execution context which can be accessed by records on document events.
  • BatchEnd: this event is triggered when a batch is completed during content ingestion or content reprocessing.
  • User Defined General Events: Users can define other types of general events by invoking a transaction/execute or transaction/batch calls with a custom action (see Transaction API) and no record key.

The content processing JavaScript modules are placed inside the processing_modules  folder of the StageR server. Each module can implement one or more of the event functions. The name of the function needs to be the name of the implemented event. A module can have functions of both general and per document events.

Per document events receive five parameters:

  • key: The id of the record.
  • content: A Javascript Object with the content of the scope of the record that is being processed.
  • context: A Javascript Object with configuration variables and references to utility functions.
  • settings: A Javascript Object with the available configuration properties for the module.
  • callback: A callback function that should be called when the per document event function completes its execution. Callback parameters are: errcontentcontext.
exports.Process = function(key, content, context, settings, callback){
    if (context.isBatch !== undefined && context.isBatch === true) {
        if (content) {
            context.batchArray.push({index: {_index: context.elasticSearch.index, _type: context.elasticSearch.type, _id: key}});
            context.batchArray.push(content);
        }
        callback(null, content, context);
    } else {
        initialize(settings, function(client, index, type) {
            client.index({
                index: index,
                type: type,
                id: key,
                body: content
            }, function (err) {
                client.close();
                callback(err, content, context);
            });
        });
    }
}; 

General events receive three parameters:

  • context: A Javascript Object with configuration variables and references to utility functions.
  • settings: A Javascript Object with the available configuration properties for the module.
  • callback: A callback function that should be called when the general event function completes its execution. Callback return parameters are: errcontext.
exports.BatchStart = function(context, settings, callback){
    initialize(settings, function(client, index, type) {
        context.batchArray = [];
        context.elasticSearch = {};
        context.elasticSearch.client = client;
        context.elasticSearch.index = index;
        context.elasticSearch.type = type;
        callback(null, context);
    });
};

The admin/setContentProcessingModules API call configures content processing modules and module settings for a storage unit. Content processing modules are configured per scope. A default list of modules can be configured for scopes that are not explicitly defined. Each module can define its own list of settings. When executing the events for each module, these will be executed in the order in which they appear in the configuration array.

Content processing modules configuration consists of lists of modules for each scope and general settings.

{
    "modules" : {
        "connector": [ 
            {
                "module" : "FieldMapping"
            },
            {
                "settings" : {
                    "elasticsearch-index" : "aspiredocs",
                    "elasticsearch-type" : "aspiredoc"
                },
                "module" : "ESPublisher"
            }
        ], 
        "index" : [ 
            {
                "module" : "FieldMapping"
            }, 
            {
                "settings" : {
                    "elasticsearch-index" : "researchdocs",
                    "elasticsearch-type" : "researchdoc"
                },
                "module" : "ESPublisher"
            }
        ],
        "research" : [ 
            {
                "module" : "NormalizeCategory"
            }
        ]
    },
    "settings" : {
        "elasticsearch-port" : 9200,
        "elasticsearch-server" : "localhost"
    }
}

In this example the connector scope will execute events from AspireFieldMapping and ESPublisher in that order; index scope will execute FieldMapping and ESPublisher and; research scope will execute NormalizeCategory. For each event, the application will look for an implementation of that event on each content processing module, if it is available it will execute the event and move to the next module to find the same function event to execute and so on.

General events will usually be used to initialize common configuration to be used by document events, this configuration can be set in the context variable and it will be shared with all document events that belong to the same general event, for example all documents that belong to a batch will receive the same context variable that BatchStart returns, and BatchEnd will receive this same context variable with any modifications that could have been done by other events to the data/configuration of the context variable.

Foreign Key Joins

StageR provides a content processing module ForeignKeyJoin for automatic merging of records from different storage units based on record keys. 1-to-N relations can be specified between storage unit records. If configured,ForeignKeyJoin will run on Process and Fetch events of each document for the specified scope.

The content of the record needs to define a field with the following format:

"foreignKeys": {
    "FOREIGN_KEY_NAME_1": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
    "FOREIGN_KEY_NAME_2": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
    ...
    "FOREIGN_KEY_NAME_N": {
        "storageUnit": "FOREIGN_STORAGE_UNIT_NAME",
        "scope": "FOREIGN_SCOPE",
        "ids": [
            "RECORD_ID_1",
            "RECORD_ID_2",
            ...
            "RECORD_ID_N"
        ]
    },
}

Output:

"PRIMARY_RECORD_SCOPE":{
    ...
    ...
    ...
    FOREIGN_KEY_NAME_1:[
        {
            RECORD_ID_1_FOREIGN_SCOPE_DATA
        },
        {
            RECORD_ID_2_FOREIGN_SCOPE_DATA
        },
        ...
        {
            RECORD_ID_N_FOREIGN_SCOPE_DATA
        }
    ]
}

Example:

  • Record with foreign key references:
"connector":{
    "url": "file:///server/myfolder/file1.txt",
    "content":"test content",
    "foreignKeys": {
        "acls": {
            "storageUnit": "DocAcls",
            "scope": "acls",
            "ids": [
                "1",
                "3",
                "7"
            ]
        }
    }
}
  • Foreign Key Records:
{key:"1", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "user1","type": "user"}}},
{key:"2", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group2","type": "group"}}},
{key:"3", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group3","type": "group"}}},
{key:"4", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group4","type": "group"}}},
{key:"5", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group5","type": "group"}}},
{key:"6", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group6","type": "group"}}},
{key:"7", content:{acls:{"access": "allow","domain": "search","scope": "global","name": "group7","type": "group"}}}
  • Output:
"connector":{
    "url": "file:///server/myfolder/file1.txt",
    "content":"test content",
    "acls": [
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "user1",
            "type": "user"
        },
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "group3",
            "type": "group"
        },
        {
            "access": "allow",
            "domain": "search",
            "scope": "global",
            "name": "group7",
            "type": "group"
        }
    ]
}

To configure the ForeignKeyJoin module, add it to the scope's content processing configuration and make sure the scope content contains the foreignKeys field.

POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules": {
        "connector": [
            {
                "module": "ForeignKeyJoin"
            },
            ...
        ]
    }
}

With this configuration, foreign key merges will happen for the connector scope on any Process or Fetch event.

Publishers

ElasticSearch Publisher

StageR provides a content processing module to publish content from a storage unit to Elasticsearch. This publisher triggers on Process and PostDelete events and will publish each content record as a new document to Elasticsearch using the record key as the id in the search engine.

Configure the Elasticsearch publisher by adding the module called HTTPESPublisher through the admin/setContentProcessingModules API call.

POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "elasticsearch-index" : "aspiredocs",
                    "elasticsearch-type" : "aspiredoc"
                },
                "module" : "HTTPESPublisher"
            }
        ],
        ...
    },
    "settings" : {
        "elasticsearch-hosts" : localhost:9200"
    }
}

The configuration above will publish to Elasticsearch (located in localhost:9200) all documents from the connector scope to the aspiredocs index as aspiredoc document type.

Solr Publisher

StageR provides a content processing module to publish content from a storage unit to Solr. This publisher triggers on Process and PostDelete events and will publish each content record as a new document to Solr using the record key as the id in the search engine.

Configure the Solr publisher by adding the module called SolrPublisher through the admin/setContentProcessingModules API call.

POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "solr-collection" : "testcollection"
                },
                "module" : "SolrPublisher"
            }
        ],
        ...
    },
    "settings" : {
        "solr-hosts" : localhost:8983"
    }
}

The configuration above will publish to Solr (located in localhost:8983) all documents from the connector scope to the testcollection index.

File Handlers

StageR can store binary files in a file storage independant from the database storage which is used to store JSON documents. The file handler is a pluggable module that provides StageR with the logic to store and read files from an external file storage application. The internal implementation of the file handler and the supporting file storage application used is up to the users needs. The module needs to provide a way to read and write file streams to the underlying file storage application.

StageR provides a simple file handler which uses a local file system folder its file storage application.

Use the sample code (of the local file handler) below as a template to create new file handlers.

'use strict'

var crypto = require('crypto')
var async = require('async')
var util = require('util')
var zlib = require('zlib')
var constants = require('../constants/general.server.constants')
var fs = require('fs-extra')
var path = require('path')

function localFileHandler (options) {
  options = options || {}
  var hashKey = options.hashkey || function (value) {
    return crypto.createHash('md5').update(value).digest('hex')
  }
  if (!options.encryption) {
    throw new Error('options.encryption is required')
  }
  var encryption = options.encryption
  if (!options.destination) {
    throw new Error('options.destination is required')
  }
  var destination = options.destination

  return {
    getFolderDestination: function (storageUnit, scope, key, callback) {
      var filePrefix = hashKey(key)
      var firstLevel = filePrefix.substring(0, 3)
      var secondLevel = filePrefix.substring(3, 6)
      var storeFileLocation = path.join(destination, storageUnit, scope, firstLevel, secondLevel)
      try {
        fs.existsSync(storeFileLocation) || fs.mkdirsSync(storeFileLocation)
      } catch (err) {
        return callback(err)
      }
      return callback(null, storeFileLocation)
    },
    saveStream: function (storageUnit, scope, key, filename, mimetype, stream, encrypt, compress, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
        if (err) {
          return callback(err)
        }

        var encodedFilename = hashKey(key) + '-' + filename
        var finalPath = path.join(destinationFolder, encodedFilename)
        var outStream = fs.createWriteStream(finalPath)

        async.waterfall([
          function (wfNext) {
            if (encrypt === true) {
              var encKey = encryption.generateKeyId()
              var encIv = encryption.generateiv()
              encryption.getCipher(storageUnit, encKey, encIv, function (err, cipher) {
                wfNext(err, cipher, encKey, encIv)
              })
            } else {
              wfNext(null, null, null, null)
            }
          },
          function (cipher, encKey, encIv, wfNext) {
            if (compress === true) {
              stream = stream.pipe(zlib.createDeflate())
            }
            if (encrypt === true) {
              stream = stream.pipe(cipher)
            }
            stream.pipe(outStream)
            outStream.on('error', wfNext)
            outStream.on('finish', function () {
              wfNext(null, destinationFolder, finalPath, outStream.bytesWritten, encKey, encIv, (compress === true))
            })
          }
        ], callback)
      })
    },
    getStream: function (storageUnit, scope, key, srFileId, encKey, encIv, isCompressed, callback) {
      var self = this
      async.waterfall([
        function (wfNext) {
          self.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
            if (err) {
              return wfNext(err)
            }
            var encodedFilename = hashKey(key) + '-' + srFileId
            var fileLocation = path.join(destinationFolder, encodedFilename)
            var fileStream = fs.createReadStream(fileLocation)
            wfNext(null, fileStream)
          })
        },
        function (fileStream, wfNext) {
          if (!util.isNullOrUndefined(encKey)) {
            encryption.getDecipher(storageUnit, encKey, encIv, function (err, decipher) {
              wfNext(err, fileStream.pipe(decipher))
            })
          } else {
            wfNext(null, fileStream)
          }
        }, function (fileStream, wfNext) {
          if (isCompressed === true) {
            wfNext(null, fileStream.pipe(zlib.createInflate()))
          } else {
            wfNext(null, fileStream)
          }
        }], callback)
    },
    deletePath: function (path, callback) {
      if (callback) {
        return fs.remove(path, callback)
      }
      return fs.removeSync(path)
    },
    deleteFileById: function (storageUnit, scope, key, srFileId, callback) {
      var self = this
      self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var filePath = path.join(destination, hashedKey + '-' + srFileId)
        self.deletePath(filePath, callback)
      })
    },
    deleteFilesByKey: function (storageUnit, scope, key, callback) {
      var self = this
      var hashedKey = hashKey(key)
      if (scope === constants.recordScope) {
        var filePath = path.join(destination, storageUnit, '*', '*', '*', hashedKey + '-*')
        self.deletePath(filePath, callback)
      } else {
        self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
          if (err) {
            return callback(err)
          }
          var filePath = path.join(destination, hashedKey + '-*')
          self.deletePath(filePath, callback)
        })
      }
    },
    deleteFilesByScope: function (storageUnit, scope, callback) {
      var scopeLocation = path.join(destination, storageUnit, scope)
      this.deletePath(scopeLocation, callback)
    },
    deleteFilesByStorageUnit: function (storageUnit, callback) {
      var storageUnitLocation = path.join(destination, storageUnit)
      this.deletePath(storageUnitLocation, callback)
    },
    changeFileNameToId: function (storageUnit, scope, key, filename, srFileId, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var inFilePath = path.join(destination, hashedKey + '-' + filename)
        var outFilePath = path.join(destination, hashedKey + '-' + srFileId)
        fs.rename(inFilePath, outFilePath, function (err) {
          callback(err, outFilePath)
        })
      })
    },
    fileExists: function (path, callback) {
      if (callback) {
        return fs.exists(path, callback)
      }
      return fs.existsSync(path)
    }
  }
}

module.exports = localFileHandler

Reprocessing Queue and Automatic Updates


Remote Replication

Feature Under construction

  • No labels