Dictionaries are resources used to define entities usually based on pattern recognition.
Reference Dictionary Tagger Stage.
The Dictionary Tagger Stage and Simple Regex Stage are examples of the use of dictionaries as resources. These resources are loaded into Elasticsearch as part of the Saga session.
A comprehensive definition of the structure of dictionary resources can a file can be seen here. Dictionary resources for different recognizers may differ from one another, please check the documentation of the recognizer for the correct specification.
This section will show an example on how to import a dictionary directly into Elasticsearch for the Entity Recognizer (Dictionary Tagger Stage), using Python and an Elasticsearch client.Create an entity recognizer to assig
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
{ "name": "tagname", "assigned": { "DictionaryTaggerStage": { "stage": "DictionaryTaggerStage", "display": "Entity", "config": { "dictionary": <name>test_entities, "skipFlags": [], "boundaryFlags": [ "TEXT_BLOCK_SPLIT" ], "requiredFlags": [ "TOKEN" ], "atLeastOneFlag": [ "ALL_LOWER_CASE" ], "debug": False }, "enable": True, "baseline-pipeline": DEFAULT_PIPELINE"baseline-pipeline" } }, "updatedAt": <time>, "createdAt": <time> } |
This is the basic structure for the dictionary entries:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
{ "id": <document_id>, "display": <display_label>, "fields": {}, "confAdjust": <confidence_adjustment_value>, "updatedAt": <time>, "createdAt": <time>, "tag": <tag_name>, "patterns" : ["<pattern1>", "<pattern2>", "<patternN>"] } |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
# pip install elasticsearch
# pip install datetime
import json
from elasticsearch import Elasticsearch
from datetime import datetime
DEFAULT_ES_READ_TIMEOUT = 120
HOSTS = ['localhost']
BATCH_SIZE = 10000
CSV_PATH = 'inputFile.csv'
TAG = 'testy'
WORKSPACE = "test"
EPOCH = datetime.utcfromtimestamp(0)
# Defaults for new database
DEFAULT_PIPELINE = 'baseline-pipeline'
DEFAULT_PROVIDER_NAME = 'saga-provider'
# util method for timestamp
def unix_time_millis(dt):
return (dt - EPOCH).total_seconds() * 1000.0
# util class for document indexing
class ElasticSearchClient(object):
def __init__(self):
self.es_client = Elasticsearch(HOSTS, timeout=DEFAULT_ES_READ_TIMEOUT)
self.batch_size = BATCH_SIZE
def publish(self, index, doc, doc_type, id=None):
self.es_client.index(index=index, doc_type=doc_type, body=doc, id=id)
def main():
es_client = ElasticSearchClient()
# json document
tag_doc = {
'name': TAG,
'assigned': {
'DictionaryTaggerStage': {
'stage': 'DictionaryTaggerStage',
'display': 'Entity1',
'config': {
'dictionary': DEFAULT_PROVIDER_NAME + ':' + WORKSPACE + '_entities',
'skipFlags': [],
'boundaryFlags': [
'TEXT_BLOCK_SPLIT'
],
'requiredFlags': [
'TOKEN'
],
'atLeastOneFlag': [
'ALL_LOWER_CASE'
],
'debug': False
},
'enable': True,
'baseline-pipeline': DEFAULT_PIPELINE
}
},
'updatedAt': unix_time_millis(datetime.now()),
'createdAt': unix_time_millis(datetime.now()),
}
es_client.publish(WORKSPACE + '_tags', tag_doc, 'tag', TAG)
with open(CSV_PATH, encoding='utf8') as fp:
line = fp.readline()
while line:
line = fp.readline()
row = line.split(';')
try:
if len(row) >= 3:
print(row)
entry_doc = {
'id': row[0].strip(),
'display': row[1].strip(),
'fields': {},
'confAdjust': row[2].strip(),
'updatedAt': unix_time_millis(datetime.now()),
'createdAt': unix_time_millis(datetime.now()),
'tag': TAG,
'patterns': row[3].strip().split(',')
}
es_client.publish(WORKSPACE + '_entities', entry_doc, 'entity')
else:
print("Missing tabs " + line)
except MemoryError:
print("Error on: " + line)
print('Done')
if __name__ == '__main__':
main() |
Panel | |
---|---|
In this page:
|