# pip install elasticsearch
# pip install datetime
import json
from elasticsearch import Elasticsearch
from datetime import datetime
DEFAULT_ES_READ_TIMEOUT = 120
HOSTS = ['localhost']
BATCH_SIZE = 10000
CSV_PATH = 'inputFile.csv'
TAG = 'testy'
WORKSPACE = "test"
EPOCH = datetime.utcfromtimestamp(0)
# Defaults for new database
DEFAULT_PIPELINE = 'baseline-pipeline'
DEFAULT_PROVIDER_NAME = 'saga-provider'
# util method for timestamp
def unix_time_millis(dt):
return (dt - EPOCH).total_seconds() * 1000.0
# util class for document indexing
class ElasticSearchClient(object):
def __init__(self):
self.es_client = Elasticsearch(HOSTS, timeout=DEFAULT_ES_READ_TIMEOUT)
self.batch_size = BATCH_SIZE
def publish(self, index, doc, doc_type, id=None):
self.es_client.index(index=index, doc_type=doc_type, body=doc, id=id)
def main():
es_client = ElasticSearchClient()
# json document
tag_doc = {
'name': TAG,
'assigned': {
'DictionaryTaggerStage': {
'stage': 'DictionaryTaggerStage',
'display': 'Entity1',
'config': {
'dictionary': DEFAULT_PROVIDER_NAME + ':' + WORKSPACE + '_entities',
'skipFlags': [],
'boundaryFlags': [
'TEXT_BLOCK_SPLIT'
],
'requiredFlags': [
'TOKEN'
],
'atLeastOneFlag': [
'ALL_LOWER_CASE'
],
'debug': False
},
'enable': True,
'baseline-pipeline': DEFAULT_PIPELINE
}
},
'updatedAt': unix_time_millis(datetime.now()),
'createdAt': unix_time_millis(datetime.now()),
}
es_client.publish(WORKSPACE + '_tags', tag_doc, 'tag', TAG)
with open(CSV_PATH, encoding='utf8') as fp:
line = fp.readline()
while line:
line = fp.readline()
row = line.split(';')
try:
if len(row) >= 3:
print(row)
entry_doc = {
'id': row[0].strip(),
'display': row[1].strip(),
'fields': {},
'confAdjust': row[2].strip(),
'updatedAt': unix_time_millis(datetime.now()),
'createdAt': unix_time_millis(datetime.now()),
'tag': TAG,
'patterns': row[3].strip().split(',')
}
es_client.publish(WORKSPACE + '_entities', entry_doc, 'entity')
else:
print("Missing tabs " + line)
except MemoryError:
print("Error on: " + line)
print('Done')
if __name__ == '__main__':
main() |