Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
'use strict' var crypto = require('crypto') var async = require('async') var util = require('util') var zlib = require('zlib') var constants = require('../constants/general.server.constants') var fs = require('fs-extra') var path = require('path') function localFileHandler (options) { options = options || {} var hashKey = options.hashkey || function (value) { return crypto.createHash('md5').update(value).digest('hex') } if (!options.encryption) { throw new Error('options.encryption is required') } var encryption = options.encryption if (!options.destination) { throw new Error('options.destination is required') } var destination = options.destination return { getFolderDestination: function (storageUnit, scope, key, callback) { var filePrefix = hashKey(key) var firstLevel = filePrefix.substring(0, 3) var secondLevel = filePrefix.substring(3, 6) var storeFileLocation = path.join(destination, storageUnit, scope, firstLevel, secondLevel) try { fs.existsSync(storeFileLocation) || fs.mkdirsSync(storeFileLocation) } catch (err) { return callback(err) } return callback(null, storeFileLocation) }, saveStream: function (storageUnit, scope, key, filename, mimetype, stream, encrypt, compress, callback) { this.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) { if (err) { return callback(err) } var encodedFilename = hashKey(key) + '-' + filename var finalPath = path.join(destinationFolder, encodedFilename) var outStream = fs.createWriteStream(finalPath) async.waterfall([ function (wfNext) { if (encrypt === true) { var encKey = encryption.generateKeyId() var encIv = encryption.generateiv() encryption.getCipher(storageUnit, encKey, encIv, function (err, cipher) { wfNext(err, cipher, encKey, encIv) }) } else { wfNext(null, null, null, null) } }, function (cipher, encKey, encIv, wfNext) { if (compress === true) { stream = stream.pipe(zlib.createDeflate()) } if (encrypt === true) { stream = stream.pipe(cipher) } stream.pipe(outStream) outStream.on('error', wfNext) outStream.on('finish', function () { wfNext(null, destinationFolder, finalPath, outStream.bytesWritten, encKey, encIv, (compress === true)) }) } ], callback) }) }, getStream: function (storageUnit, scope, key, srFileId, encKey, encIv, isCompressed, callback) { var self = this async.waterfall([ function (wfNext) { self.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) { if (err) { return wfNext(err) } var encodedFilename = hashKey(key) + '-' + srFileId var fileLocation = path.join(destinationFolder, encodedFilename) var fileStream = fs.createReadStream(fileLocation) wfNext(null, fileStream) }) }, function (fileStream, wfNext) { if (!util.isNullOrUndefined(encKey)) { encryption.getDecipher(storageUnit, encKey, encIv, function (err, decipher) { wfNext(err, fileStream.pipe(decipher)) }) } else { wfNext(null, fileStream) } }, function (fileStream, wfNext) { if (isCompressed === true) { wfNext(null, fileStream.pipe(zlib.createInflate())) } else { wfNext(null, fileStream) } }], callback) }, deletePath: function (path, callback) { if (callback) { return fs.remove(path, callback) } return fs.removeSync(path) }, deleteFileById: function (storageUnit, scope, key, srFileId, callback) { var self = this self.getFolderDestination(storageUnit, scope, key, function (err, destination) { if (err) { return callback(err) } var hashedKey = hashKey(key) var filePath = path.join(destination, hashedKey + '-' + srFileId) self.deletePath(filePath, callback) }) }, deleteFilesByKey: function (storageUnit, scope, key, callback) { var self = this var hashedKey = hashKey(key) if (scope === constants.recordScope) { var filePath = path.join(destination, storageUnit, '*', '*', '*', hashedKey + '-*') self.deletePath(filePath, callback) } else { self.getFolderDestination(storageUnit, scope, key, function (err, destination) { if (err) { return callback(err) } var filePath = path.join(destination, hashedKey + '-*') self.deletePath(filePath, callback) }) } }, deleteFilesByScope: function (storageUnit, scope, callback) { var scopeLocation = path.join(destination, storageUnit, scope) this.deletePath(scopeLocation, callback) }, deleteFilesByStorageUnit: function (storageUnit, callback) { var storageUnitLocation = path.join(destination, storageUnit) this.deletePath(storageUnitLocation, callback) }, changeFileNameToId: function (storageUnit, scope, key, filename, srFileId, callback) { this.getFolderDestination(storageUnit, scope, key, function (err, destination) { if (err) { return callback(err) } var hashedKey = hashKey(key) var inFilePath = path.join(destination, hashedKey + '-' + filename) var outFilePath = path.join(destination, hashedKey + '-' + srFileId) fs.rename(inFilePath, outFilePath, function (err) { callback(err, outFilePath) }) }) }, fileExists: function (path, callback) { if (callback) { return fs.exists(path, callback) } return fs.existsSync(path) } } } module.exports = localFileHandler |
Anchor | ||||
---|---|---|---|---|
|
...
and Automatic Updates
Remote Replication
Content reprocessing is a key feature of StageR. It provides the ability to execute content processing pipelines over content that is already stored in a storage unit. Reprocessing is done at the scope level of a storage unit.
Records can be reprocessed through the reprocess API calls which allow for single key, multiple key and whole scope reprocessing. Items selected for reprocessing are put in a queue that is then consumed by reprocessing worker processes. Multiple reprocessing workers can be configured for a single installation.
Reprocessing will execute the content processing modules configured for the specified scope of the key(s).
To configure reprocessing, specify reprocessQueue properties under reprocessQueue (batchSize: number of documents to process as a batch, querySize: number of records by reprocess worker to retrieve at a time, timeout: wait timeout when there are no items in the queue) and define the number of reprocessing workers on the StageR configuration file:
Code Block | ||||
---|---|---|---|---|
| ||||
{
...,
reprocessQueue: {
batchSize: 20,
querySize: 40,
timeout: 5000
},
workers: {
restapi: 1,
reprocess: 4,
replication: 1
},
...
} |
Specify the content processing modules, for the scope to be reprocessed, using the admin/setContentProcessingModules API call:
Code Block | ||||
---|---|---|---|---|
| ||||
POST admin/setContentProcessingModules/STORAGE_UNIT
{
"modules" : {
"connector": [
{
"settings" : {
"solr-collection" : "testcollection"
},
"module" : "SolrPublisher"
}
]
},
"settings" : {
"solr-hosts" : localhost:8983"
}
} |
Enable the reprocessing queue at the storage unit level. This will register the storage unit to be scanned by the reprocess workers.
Code Block | ||||
---|---|---|---|---|
| ||||
PUT admin/enableReprocessingQueue/STORAGE_UNIT/true |
Automatic Updates
When foreign keys are defined for a content record and the ForeignKeyJoin processing module is defined for a storage unit, a foreign key lookup table is created in StageR which allows the foreign item to know which primary items are referencing it, allowing for any updates to the foreign item to trigger an automatic update (send the keys to reprocess) of the primary keys.
As long as the reprocessing queue for the primary item's storage unit is enabled and there are reprocessing workers configured, automatic updates will be triggered.
Remote Replication
Remote replication allows a StageR storage unit to subscribe to a remote StageR storage unit and replicate the data locally. When a storage unit is subscribed for remote replication, the remote server will be sending all of its transactions to the subscribed storage unit, this way, the subscribing storage unit will know exactly how to replicate the incoming data.
Remote replication subscriptions are managed by the replication API.
To configure remote replication, specify the replication properties (interval: wait timeout when no more transactions were available, querySize: number of records to retrieve at a time) and define the number of replication workers on the StageR configuration file:
Code Block | ||||
---|---|---|---|---|
| ||||
{
...,
replication: {
interval: 15000,
querySize: 40
},
workers: {
restapi: 1,
reprocess: 1,
replication: 4
},
...
} | ||||
Warning | ||||
Feature Under construction |