Note |
---|
The following files need to follow a specific name format which is <engine_name>_<name_of_the_file> |
Create the _aggragation_factory.py file
on the aggregation factory file
Code Block | ||||
---|---|---|---|---|
| ||||
from typing import Union, List, Dict from app.engines import AggregationFactory from framework.aggregations import * from models.engines.response import ResponseAgg, RangeAggEntry, TermAggEntry, StatsAggEntry |
This class will tell how to build the aggregations for this specific engine, and it will be used in different parts of the code
Info |
---|
The aggregations definitions are declared in the module framework.aggregations, these aggregation declarations are used by all engine type, and used for the declaration of a normalize engine request |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleAggregationFactory(AggregationFactory): |
Code Block | ||||
---|---|---|---|---|
| ||||
@classmethod def transform_aggregations(cls, aggs: any) -> List[ResponseAgg]: # your code here, travers and return the nested aggregation for the engine in the engine's format pass @staticmethod def bool_agg(config: BoolAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def browse_agg(config: BrowseAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def date_histogram_agg(config: DateHistogramAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def date_range_agg(config: DateRangeAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def histogram_agg(config: HistogramAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def range_agg(config: RangeAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def slider_agg(config: SliderAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def term_agg(config: TermAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def top_hits_agg(config: TermAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def filter_agg(config: FilterAgg) -> any: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def term_entry_transform(bucket: List[Dict]) -> List[TermAggEntry]: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def range_entry_transform(bucket: List[Dict]) -> Union[List[RangeAggEntry], None]: # your code here, return the aggregation for the engine in the engine's format pass @staticmethod def stats_entry_transform(agg: any) -> Union[List[StatsAggEntry], None]: # your code here, return the aggregation for the engine in the engine's format pass |
on the filter factory file
Code Block | ||||
---|---|---|---|---|
| ||||
from app.engines import FilterFactory from framework.filters import * |
This class will tell how to build the filters for this specific engine, and it will be used in different parts of the code
Info |
---|
The filters definitions are declared in the module framework.filters, these filter declarations are used by all engine type, and used for the declaration of a normalize engine request |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleFilterFactory(FilterFactory): |
Code Block | ||||
---|---|---|---|---|
| ||||
@staticmethod def geo_shape_filter(config: GeoShapeFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def range_filter(config: RangeFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def wildcard_filter(config: WildcardFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def term_filter(config: TermFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def terms_filter(config: TermFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def exist_filter(config: ExistFilter): # your code here, return the filter for the engine in the engine's format pass @staticmethod def bool_filter(config: BoolFilter): # your code here, return the filter for the engine in the engine's format pass |
on the config file
Code Block | ||||
---|---|---|---|---|
| ||||
from enum import Enum from pydantic import Field from models.engines import BaseEngineConnection |
Code Block | ||||
---|---|---|---|---|
| ||||
class BoolOption(Enum): MUST = 'must' SHOULD = 'should' |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleConfig(BaseEngineConnection): |
on the config file
Code Block | ||||
---|---|---|---|---|
| ||||
from typing import Union, List, Tuple, Any, Dict from app.engines import BaseEngine, BulkEntry from app.engines.example.example_aggregation_factory import ExampleAggregationFactory from app.engines.example.example_config import ExampleConfig from app.engines.example.example_filter_factory import ExampleFilterFactory from models.engines import HighlightConfig from models.engines.response import ErrorResponse, ByQueryResponse, CRUDResponse, ResponseHit, SearchRequest, SearchResponse from utils.data import find |
Here is where all the request will be defined
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleEngine(BaseEngine[ExampleConfig, ExampleFilterFactory, ExampleAggregationFactory]): |
The engine initialization, required to call the super __init__ with the configuration of the engine (this will be passed by the Connection Manager), and the instance of a FilterFactory and the instance of an AggregationFactory
Code Block | ||||
---|---|---|---|---|
| ||||
def __init__(self, config: ExampleConfig): super().__init__(config=ExampleConfig(**config.dict()), filter_factory=ExampleFilterFactory(), agg_factory=ExampleAggregationFactory()) @staticmethod def generate_highlight(config: HighlightConfig) -> Any: # your code here, generate the highlight config pass @staticmethod def boolean_structure(body: Dict, must_not: List[Dict], must: List[Dict], should_not: List[Dict], should: List[Dict]): # your code here, generate the boolean structure pass @staticmethod def transform_hit(hit: Dict) -> ResponseHit: # return ResponseHit(index=, id=, score=, data=, highlight=, # inner_hits=TestEngine.transform_inner_hits(find(hit, 'Inner hits location in hit'))) pass @staticmethod def transform_inner_hits(inner_hits: Dict) -> Union[Dict[str, SearchResponse], None]: # If not interested in getting inner hits just leave pass # if inner_hits is None or len(inner_hits) == 0: # return None # # inner_hits_normalized: Dict[str, SearchResponse] = {} # # for key in inner_hits.keys(): # # your code here to fetch the inner hits information # # inner_hits_normalized[key] = SearchResponse( # total=, # max_score=, # hits=[TestEngine.transform_hit(hit) for hit in find(inner_hits[key], 'hits location in inner hit', [])]) # # return inner_hits_normalized pass @staticmethod def transform_crud(response: Dict) -> CRUDResponse: # return CRUDResponse(index=, id=, result=, version=) pass @staticmethod def transform_by_query(response: Dict) -> ByQueryResponse: # return ByQueryResponse(took=, timed_out=, total=, deleted=, updated=, created=, batches=, failures=) pass def search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def multi_search(self, ata: List[Tuple[str, SearchRequest]], **kwargs): pass def async_search(self, index: str, data: SearchRequest, **kwargs) -> str: pass def async_search_fetch(self, index: str, _id: str, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def async_search_delete(self, index: str, _id: str, **kwargs) -> [bool, ErrorResponse]: pass def knn_search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]: pass def scroll_search(self, scroll_id: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def clear_scroll_search(self, scroll_id: str, **kwargs) -> Union[bool, ErrorResponse]: pass def create_index_template(self, template_name: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]: pass def create_index(self, index: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]: pass def delete_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def get_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def exist_template(self, template_name: str, **kwargs) -> Union[bool, ErrorResponse]: pass def exist_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def open_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def close_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]: pass def stats_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def get_mapping(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def update_mapping(self, index: str, data: Dict, **kwargs): pass def get_settings(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]: pass def update_settings(self, index: str, data: Dict, **kwargs) -> None: pass def index_doc(self, index: str, doc: Dict, doc_id: str = None, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def get_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[ResponseHit, ErrorResponse]: pass def update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def partial_update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[ CRUDResponse, ErrorResponse]: pass def script_update_doc(self, index: str, doc_id: str, script, **kwargs) -> Union[CRUDResponse, ErrorResponse]: pass def delete_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[CRUDResponse, ErrorResponse]: pass def bulk(self, *entries: BulkEntry, **kwargs) -> None: pass def delete_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]: pass def update_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]: pass def reindex(self, data: Dict, **kwargs) -> None: pass def count(self, index: str, data: Dict = None, **kwargs) -> Union[int, ErrorResponse]: pass def transform_response(self, response: Dict) -> Union[SearchResponse, ErrorResponse]: # return SearchResponse( scroll_id=, took=, timed_out=, total=, max_score=, aggs=, # hits=[self.transform_hit(hit) for hit in find(response, 'Hits location in response', [])]) pass |
on the __init__ file of the engine package
Code Block | ||||
---|---|---|---|---|
| ||||
from ._aggregation_factory import AggregationFactory from ._filter_factory import FilterFactory from ._base_engine import BaseEngine, BulkEntry, BulkAction from .elasticsearch import ElasticsearchEngine from .opensearch import OpensearchEngine # Your new engine from .example import ExampleEngine |
Code Block | ||||
---|---|---|---|---|
| ||||
class EngineTypes(str, Enum): ELASTIC = 'elasticsearch' OPENSEARCH = 'opensearch' NEW = 'new' # <=== your engine type here |