Note |
---|
The following files need to follow a specific name format which is <engine_name>_<name_of_the_file> |
Create the _aggragation_factory.py file
on the aggregation factory file
Code Block | ||||
---|---|---|---|---|
| ||||
from typing import Union, List, Dict
from app.engines import AggregationFactory
from framework.aggregations import *
from models.engines.response import ResponseAgg, RangeAggEntry, TermAggEntry, StatsAggEntry |
This class will tell how to build the aggregations for this specific engine, and it will be used in different parts of the code
Info |
---|
The aggregations definitions are declared in the module framework.aggregations, these aggregation declarations are used by all engine type, and used for the declaration of a normalize engine request |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleAggregationFactory(AggregationFactory): |
Code Block | ||||
---|---|---|---|---|
| ||||
@classmethod
def transform_aggregations(cls, aggs: any) -> List[ResponseAgg]:
# your code here, travers and return the nested aggregation for the engine in the engine's format
pass
@staticmethod
def bool_agg(config: BoolAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def browse_agg(config: BrowseAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def date_histogram_agg(config: DateHistogramAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def date_range_agg(config: DateRangeAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def histogram_agg(config: HistogramAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def range_agg(config: RangeAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def slider_agg(config: SliderAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def term_agg(config: TermAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def top_hits_agg(config: TermAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def filter_agg(config: FilterAgg) -> any:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def term_entry_transform(bucket: List[Dict]) -> List[TermAggEntry]:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def range_entry_transform(bucket: List[Dict]) -> Union[List[RangeAggEntry], None]:
# your code here, return the aggregation for the engine in the engine's format
pass
@staticmethod
def stats_entry_transform(agg: any) -> Union[List[StatsAggEntry], None]:
# your code here, return the aggregation for the engine in the engine's format
pass |
on the filter factory file
Code Block | ||||
---|---|---|---|---|
| ||||
from app.engines import FilterFactory
from framework.filters import * |
This class will tell how to build the filters for this specific engine, and it will be used in different parts of the code
Info |
---|
The filters definitions are declared in the module framework.filters, these filter declarations are used by all engine type, and used for the declaration of a normalize engine request |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleFilterFactory(FilterFactory): |
Code Block | ||||
---|---|---|---|---|
| ||||
@staticmethod
def geo_shape_filter(config: GeoShapeFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def range_filter(config: RangeFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def wildcard_filter(config: WildcardFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def term_filter(config: TermFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def terms_filter(config: TermFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def exist_filter(config: ExistFilter):
# your code here, return the filter for the engine in the engine's format
pass
@staticmethod
def bool_filter(config: BoolFilter):
# your code here, return the filter for the engine in the engine's format
pass |
on the config file
Code Block | ||||
---|---|---|---|---|
| ||||
from enum import Enum
from pydantic import Field
from models.engines import BaseEngineConnection |
Code Block | ||||
---|---|---|---|---|
| ||||
class BoolOption(Enum):
MUST = 'must'
SHOULD = 'should' |
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleConfig(BaseEngineConnection): |
on the config file
Code Block | ||||
---|---|---|---|---|
| ||||
from typing import Union, List, Tuple, Any, Dict
from app.engines import BaseEngine, BulkEntry
from app.engines.example.example_aggregation_factory import ExampleAggregationFactory
from app.engines.example.example_config import ExampleConfig
from app.engines.example.example_filter_factory import ExampleFilterFactory
from models.engines import HighlightConfig
from models.engines.response import ErrorResponse, ByQueryResponse, CRUDResponse, ResponseHit, SearchRequest, SearchResponse
from utils.data import find |
Here is where all the request will be defined
Code Block | ||||
---|---|---|---|---|
| ||||
class ExampleEngine(BaseEngine[ExampleConfig, ExampleFilterFactory, ExampleAggregationFactory]): |
The engine initialization, required to call the super __init__ with the configuration of the engine (this will be passed by the Connection Manager), and the instance of a FilterFactory and the instance of an AggregationFactory
Code Block | ||||
---|---|---|---|---|
| ||||
def __init__(self, config: ExampleConfig):
super().__init__(config=ExampleConfig(**config.dict()), filter_factory=ExampleFilterFactory(),
agg_factory=ExampleAggregationFactory())
@staticmethod
def generate_highlight(config: HighlightConfig) -> Any:
# your code here, generate the highlight config
pass
@staticmethod
def boolean_structure(body: Dict, must_not: List[Dict], must: List[Dict], should_not: List[Dict],
should: List[Dict]):
# your code here, generate the boolean structure
pass
@staticmethod
def transform_hit(hit: Dict) -> ResponseHit:
# return ResponseHit(index=, id=, score=, data=, highlight=,
# inner_hits=TestEngine.transform_inner_hits(find(hit, 'Inner hits location in hit')))
pass
@staticmethod
def transform_inner_hits(inner_hits: Dict) -> Union[Dict[str, SearchResponse], None]:
# If not interested in getting inner hits just leave pass
# if inner_hits is None or len(inner_hits) == 0:
# return None
#
# inner_hits_normalized: Dict[str, SearchResponse] = {}
#
# for key in inner_hits.keys():
# # your code here to fetch the inner hits information
#
# inner_hits_normalized[key] = SearchResponse(
# total=,
# max_score=,
# hits=[TestEngine.transform_hit(hit) for hit in find(inner_hits[key], 'hits location in inner hit', [])])
#
# return inner_hits_normalized
pass
@staticmethod
def transform_crud(response: Dict) -> CRUDResponse:
# return CRUDResponse(index=, id=, result=, version=)
pass
@staticmethod
def transform_by_query(response: Dict) -> ByQueryResponse:
# return ByQueryResponse(took=, timed_out=, total=, deleted=, updated=, created=, batches=, failures=)
pass
def search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]:
pass
def multi_search(self, ata: List[Tuple[str, SearchRequest]], **kwargs):
pass
def async_search(self, index: str, data: SearchRequest, **kwargs) -> str:
pass
def async_search_fetch(self, index: str, _id: str, **kwargs) -> Union[SearchResponse, ErrorResponse]:
pass
def async_search_delete(self, index: str, _id: str, **kwargs) -> [bool, ErrorResponse]:
pass
def knn_search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]:
pass
def scroll_search(self, scroll_id: str, **kwargs) -> Union[Dict, ErrorResponse]:
pass
def clear_scroll_search(self, scroll_id: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def create_index_template(self, template_name: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]:
pass
def create_index(self, index: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]:
pass
def delete_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def get_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
pass
def exist_template(self, template_name: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def exist_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def open_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def close_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
pass
def stats_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
pass
def get_mapping(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
pass
def update_mapping(self, index: str, data: Dict, **kwargs):
pass
def get_settings(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
pass
def update_settings(self, index: str, data: Dict, **kwargs) -> None:
pass
def index_doc(self, index: str, doc: Dict, doc_id: str = None, routing: str = None, **kwargs) -> Union[
CRUDResponse, ErrorResponse]:
pass
def get_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[ResponseHit, ErrorResponse]:
pass
def update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[
CRUDResponse, ErrorResponse]:
pass
def partial_update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[
CRUDResponse, ErrorResponse]:
pass
def script_update_doc(self, index: str, doc_id: str, script, **kwargs) -> Union[CRUDResponse, ErrorResponse]:
pass
def delete_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[CRUDResponse, ErrorResponse]:
pass
def bulk(self, *entries: BulkEntry, **kwargs) -> None:
pass
def delete_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]:
pass
def update_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]:
pass
def reindex(self, data: Dict, **kwargs) -> None:
pass
def count(self, index: str, data: Dict = None, **kwargs) -> Union[int, ErrorResponse]:
pass
def transform_response(self, response: Dict) -> Union[SearchResponse, ErrorResponse]:
# return SearchResponse( scroll_id=, took=, timed_out=, total=, max_score=, aggs=,
# hits=[self.transform_hit(hit) for hit in find(response, 'Hits location in response', [])])
pass |
on the __init__ file of the engine package
Code Block | ||||
---|---|---|---|---|
| ||||
from ._aggregation_factory import AggregationFactory
from ._filter_factory import FilterFactory
from ._base_engine import BaseEngine, BulkEntry, BulkAction
from .elasticsearch import ElasticsearchEngine
from .opensearch import OpensearchEngine
# Your new engine
from .example import ExampleEngine
|
Code Block | ||||
---|---|---|---|---|
| ||||
class EngineTypes(str, Enum):
ELASTIC = 'elasticsearch'
OPENSEARCH = 'opensearch'
NEW = 'new' # <=== your engine type here |