The following files need to follow a specific name format which is <engine_name>_<name_of_the_file>
Create the _aggragation_factory.py file
on the aggregation factory file
from typing import Union, List, Dict from app.engines import AggregationFactory from framework.aggregations import * from models.engines.response import ResponseAgg, RangeAggEntry, TermAggEntry, StatsAggEntry
This class will tell how to build the aggregations for this specific engine, and it will be used in different parts of the code
The aggregations definitions are declared in the module framework.aggregations, these aggregation declarations are used by all engine type, and used for the declaration of a normalize engine request
class ExampleAggregationFactory(AggregationFactory):
@classmethod def transform_aggregations(cls, aggs: any) -> List[ResponseAgg]: # your code here, travers and return the nested aggregation for the engine in the engine's format pass @staticmethod def bool_agg(config: BoolAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def browse_agg(config: BrowseAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def date_histogram_agg(config: DateHistogramAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def date_range_agg(config: DateRangeAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def histogram_agg(config: HistogramAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def range_agg(config: RangeAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def slider_agg(config: SliderAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def term_agg(config: TermAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def top_hits_agg(config: TermAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def filter_agg(config: FilterAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def term_entry_transform(bucket: List[Dict]) -> List[TermAggEntry]: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def range_entry_transform(bucket: List[Dict]) -> Union[List[RangeAggEntry], None]: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def stats_entry_transform(agg: any) -> Union[List[StatsAggEntry], None]: # your code here, return the aggregation for the engine in the engine's format pass
on the filter factory file
from app.engines import FilterFactory from framework.filters import *
This class will tell how to build the filters for this specific engine, and it will be used in different parts of the code
The filters definitions are declared in the module framework.filters, these filter declarations are used by all engine type, and used for the declaration of a normalize engine request
class ExampleFilterFactory(FilterFactory):
@staticmethod def geo_shape_filter(config: GeoShapeFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def range_filter(config: RangeFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def wildcard_filter(config: WildcardFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def term_filter(config: TermFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def terms_filter(config: TermFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def exist_filter(config: ExistFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def bool_filter(config: BoolFilter): # your code here, return the filter for the engine in the engine's format pass
on the config file
from enum import Enum from pydantic import Field from models.engines import BaseEngineConnection
class BoolOption(Enum): MUST = 'must' SHOULD = 'should'
class ExampleConfig(BaseEngineConnection):
on the config file
from typing import Union, List, Tuple, Any, Dict from app.engines import BaseEngine, BulkEntry from app.engines.example.example_aggregation_factory import ExampleAggregationFactory from app.engines.example.example_config import ExampleConfig from app.engines.example.example_filter_factory import ExampleFilterFactory from models.engines import HighlightConfig from models.engines.response import ErrorResponse, ByQueryResponse, CRUDResponse, ResponseHit, SearchRequest, SearchResponse from utils.data import find
Here is where all the request will be defined
class ExampleEngine(BaseEngine[ExampleConfig, ExampleFilterFactory, ExampleAggregationFactory]):
The engine initialization, required to call the super __init__ with the configuration of the engine (this will be passed by the Connection Manager), and the instance of a FilterFactory and the instance of an AggregationFactory
def __init__(self, config: ExampleConfig): super().__init__(config=ExampleConfig(**config.dict()), filter_factory=ExampleFilterFactory(), agg_factory=ExampleAggregationFactory()) @staticmethod def generate_highlight(config: HighlightConfig) -> Any: # your code here, generate the highlight config pass @staticmethod def boolean_structure(body: Dict, must_not: List[Dict], must: List[Dict], should_not: List[Dict], should: List[Dict]): # your code here, generate the boolean structure pass @staticmethod def transform_hit(hit: Dict) -> ResponseHit: # return ResponseHit(index=, id=, score=, data=, highlight=, # inner_hits=TestEngine.transform_inner_hits(find(hit, 'Inner hits location in hit'))) pass @staticmethod def transform_inner_hits(inner_hits: Dict) -> Union[Dict[str, SearchResponse], None]: # If not interested in getting inner hits just leave pass # if inner_hits is None or len(inner_hits) == 0: # return None # # inner_hits_normalized: Dict[str, SearchResponse] = {} # # for key in inner_hits.keys(): # # your code here to fetch the inner hits information # # inner_hits_normalized[key] = SearchResponse( # total=, # max_score=, # hits=[TestEngine.transform_hit(hit) for hit in find(inner_hits[key], 'hits location in inner hit', [])]) # # return inner_hits_normalized pass @staticmethod def transform_crud(response: Dict) -> CRUDResponse: # return CRUDResponse(index=, id=, result=, version=) pass @staticmethod def transform_by_query(response: Dict) -> ByQueryResponse: # return ByQueryResponse(took=, timed_out=, total=, deleted=, updated=, created=, batches=, failures=) pass def search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def multi_search(self, ata: List[Tuple[str, SearchRequest]], **kwargs): pass def async_search(self, index: str, data: SearchRequest, **kwargs) -> str: pass def async_search_fetch(self, index: str, _id: str, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def async_search_delete(self, index: str, _id: str, **kwargs) -> [bool, ErrorResponse]: pass def knn_search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def scroll_search(self, scroll_id: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def clear_scroll_search(self, scroll_id: str, **kwargs) -> Union[bool, ErrorResponse]: pass def create_index_template(self, template_name: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]: pass def create_index(self, index: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]: pass def delete_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def get_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def exist_template(self, template_name: str, **kwargs) -> Union[bool, ErrorResponse]: pass def exist_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def open_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def close_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def stats_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def get_mapping(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def update_mapping(self, index: str, data: Dict, **kwargs): pass def get_settings(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def update_settings(self, index: str, data: Dict, **kwargs) -> None: pass def index_doc(self, index: str, doc: Dict, doc_id: str = None, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def get_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[ResponseHit, ErrorResponse]: pass def update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def partial_update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def script_update_doc(self, index: str, doc_id: str, script, **kwargs) -> Union[CRUDResponse, ErrorResponse]: pass def delete_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[CRUDResponse, ErrorResponse]: pass def bulk(self, *entries: BulkEntry, **kwargs) -> None: pass def delete_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]: pass def update_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]: pass def reindex(self, data: Dict, **kwargs) -> None: pass def count(self, index: str, data: Dict = None, **kwargs) -> Union[int, ErrorResponse]: pass def transform_response(self, response: Dict) -> Union[SearchResponse, ErrorResponse]: # return SearchResponse( scroll_id=, took=, timed_out=, total=, max_score=, aggs=, # hits=[self.transform_hit(hit) for hit in find(response, 'Hits location in response', [])]) pass
on the __init__ file of the engine package
from ._aggregation_factory import AggregationFactory from ._filter_factory import FilterFactory from ._base_engine import BaseEngine, BulkEntry, BulkAction from .elasticsearch import ElasticsearchEngine from .opensearch import OpensearchEngine # Your new engine from .example import ExampleEngine
class EngineTypes(str, Enum): ELASTIC = 'elasticsearch' OPENSEARCH = 'opensearch' NEW = 'new' # <=== your engine type here