Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
themeRDark
'use strict'

var crypto = require('crypto')
var async = require('async')
var util = require('util')
var zlib = require('zlib')
var constants = require('../constants/general.server.constants')
var fs = require('fs-extra')
var path = require('path')

function localFileHandler (options) {
  options = options || {}
  var hashKey = options.hashkey || function (value) {
    return crypto.createHash('md5').update(value).digest('hex')
  }
  if (!options.encryption) {
    throw new Error('options.encryption is required')
  }
  var encryption = options.encryption
  if (!options.destination) {
    throw new Error('options.destination is required')
  }
  var destination = options.destination

  return {
    getFolderDestination: function (storageUnit, scope, key, callback) {
      var filePrefix = hashKey(key)
      var firstLevel = filePrefix.substring(0, 3)
      var secondLevel = filePrefix.substring(3, 6)
      var storeFileLocation = path.join(destination, storageUnit, scope, firstLevel, secondLevel)
      try {
        fs.existsSync(storeFileLocation) || fs.mkdirsSync(storeFileLocation)
      } catch (err) {
        return callback(err)
      }
      return callback(null, storeFileLocation)
    },
    saveStream: function (storageUnit, scope, key, filename, mimetype, stream, encrypt, compress, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
        if (err) {
          return callback(err)
        }

        var encodedFilename = hashKey(key) + '-' + filename
        var finalPath = path.join(destinationFolder, encodedFilename)
        var outStream = fs.createWriteStream(finalPath)

        async.waterfall([
          function (wfNext) {
            if (encrypt === true) {
              var encKey = encryption.generateKeyId()
              var encIv = encryption.generateiv()
              encryption.getCipher(storageUnit, encKey, encIv, function (err, cipher) {
                wfNext(err, cipher, encKey, encIv)
              })
            } else {
              wfNext(null, null, null, null)
            }
          },
          function (cipher, encKey, encIv, wfNext) {
            if (compress === true) {
              stream = stream.pipe(zlib.createDeflate())
            }
            if (encrypt === true) {
              stream = stream.pipe(cipher)
            }
            stream.pipe(outStream)
            outStream.on('error', wfNext)
            outStream.on('finish', function () {
              wfNext(null, destinationFolder, finalPath, outStream.bytesWritten, encKey, encIv, (compress === true))
            })
          }
        ], callback)
      })
    },
    getStream: function (storageUnit, scope, key, srFileId, encKey, encIv, isCompressed, callback) {
      var self = this
      async.waterfall([
        function (wfNext) {
          self.getFolderDestination(storageUnit, scope, key, function (err, destinationFolder) {
            if (err) {
              return wfNext(err)
            }
            var encodedFilename = hashKey(key) + '-' + srFileId
            var fileLocation = path.join(destinationFolder, encodedFilename)
            var fileStream = fs.createReadStream(fileLocation)
            wfNext(null, fileStream)
          })
        },
        function (fileStream, wfNext) {
          if (!util.isNullOrUndefined(encKey)) {
            encryption.getDecipher(storageUnit, encKey, encIv, function (err, decipher) {
              wfNext(err, fileStream.pipe(decipher))
            })
          } else {
            wfNext(null, fileStream)
          }
        }, function (fileStream, wfNext) {
          if (isCompressed === true) {
            wfNext(null, fileStream.pipe(zlib.createInflate()))
          } else {
            wfNext(null, fileStream)
          }
        }], callback)
    },
    deletePath: function (path, callback) {
      if (callback) {
        return fs.remove(path, callback)
      }
      return fs.removeSync(path)
    },
    deleteFileById: function (storageUnit, scope, key, srFileId, callback) {
      var self = this
      self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var filePath = path.join(destination, hashedKey + '-' + srFileId)
        self.deletePath(filePath, callback)
      })
    },
    deleteFilesByKey: function (storageUnit, scope, key, callback) {
      var self = this
      var hashedKey = hashKey(key)
      if (scope === constants.recordScope) {
        var filePath = path.join(destination, storageUnit, '*', '*', '*', hashedKey + '-*')
        self.deletePath(filePath, callback)
      } else {
        self.getFolderDestination(storageUnit, scope, key, function (err, destination) {
          if (err) {
            return callback(err)
          }
          var filePath = path.join(destination, hashedKey + '-*')
          self.deletePath(filePath, callback)
        })
      }
    },
    deleteFilesByScope: function (storageUnit, scope, callback) {
      var scopeLocation = path.join(destination, storageUnit, scope)
      this.deletePath(scopeLocation, callback)
    },
    deleteFilesByStorageUnit: function (storageUnit, callback) {
      var storageUnitLocation = path.join(destination, storageUnit)
      this.deletePath(storageUnitLocation, callback)
    },
    changeFileNameToId: function (storageUnit, scope, key, filename, srFileId, callback) {
      this.getFolderDestination(storageUnit, scope, key, function (err, destination) {
        if (err) {
          return callback(err)
        }
        var hashedKey = hashKey(key)
        var inFilePath = path.join(destination, hashedKey + '-' + filename)
        var outFilePath = path.join(destination, hashedKey + '-' + srFileId)
        fs.rename(inFilePath, outFilePath, function (err) {
          callback(err, outFilePath)
        })
      })
    },
    fileExists: function (path, callback) {
      if (callback) {
        return fs.exists(path, callback)
      }
      return fs.existsSync(path)
    }
  }
}

module.exports = localFileHandler

Anchor
ReprocessingQueue
ReprocessingQueue
Reprocessing

...

and Automatic Updates

Remote Replication

Content reprocessing is a key feature of StageR. It provides the ability to execute content processing pipelines over content that is already stored in a storage unit. Reprocessing is done at the scope level of a storage unit.

Records can be reprocessed through the reprocess API calls which allow for single key, multiple key and whole scope reprocessing. Items selected for reprocessing are put in a queue that is then consumed by reprocessing worker processes. Multiple reprocessing workers can be configured for a single installation.

Reprocessing will execute the content processing modules configured for the specified scope of the key(s).

To configure reprocessing, specify reprocessQueue properties under reprocessQueue (batchSize: number of documents to process as a batch, querySize: number of records by reprocess worker to retrieve at a time, timeout: wait timeout when there are no items in the queue) and define the number of reprocessing workers on the StageR configuration file:

Code Block
languagetext
themeRDark
{
  ...,
  reprocessQueue: {
    batchSize: 20,
    querySize: 40,
    timeout: 5000
  },
  workers: {
    restapi: 1,
    reprocess: 4,
    replication: 1
  },
  ...
}

Specify the content processing modules, for the scope to be reprocessed, using the admin/setContentProcessingModules API call:

Code Block
languagetext
themeRDark
POST admin/setContentProcessingModules/STORAGE_UNIT
{
    "modules" : {
        "connector": [
            {
                "settings" : {
                    "solr-collection" : "testcollection"
                },
                "module" : "SolrPublisher"
            }
        ]
    },
    "settings" : {
        "solr-hosts" : localhost:8983"
    }
}

Enable the reprocessing queue at the storage unit level. This will register the storage unit to be scanned by the reprocess workers.

Code Block
languagetext
themeRDark
PUT admin/enableReprocessingQueue/STORAGE_UNIT/true

Automatic Updates

When foreign keys are defined for a content record and the ForeignKeyJoin processing module is defined for a storage unit, a foreign key lookup table is created in StageR which allows the foreign item to know which primary items are referencing it, allowing for any updates to the foreign item to trigger an automatic update (send the keys to reprocess) of the primary keys.

As long as the reprocessing queue for the primary item's storage unit is enabled and there are reprocessing workers configured, automatic updates will be triggered.

Remote Replication

Remote replication allows a StageR storage unit to subscribe to a remote StageR storage unit and replicate the data locally. When a storage unit is subscribed for remote replication, the remote server will be sending all of its transactions to the subscribed storage unit, this way, the subscribing storage unit will know exactly how to replicate the incoming data.

Remote replication subscriptions are managed by the replication API.

To configure remote replication, specify the replication properties (interval: wait timeout when no more transactions were available, querySize: number of records to retrieve at a time) and define the number of replication workers on the StageR configuration file:

Code Block
languagetext
themeRDark
{
  ...,
  replication: {
    interval: 15000,
    querySize: 40
  },
  workers: {
    restapi: 1,
    reprocess: 1,
    replication: 4
  },
  ...
}
Warning
Feature Under construction