You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

With sapi.py (project manager)

Available since version 0.5

On the root of the Search API project execute on the terminal the following command:

python sapi.py new engine <name>

This will generate a bare minimum engine with all the steps and changes view on the manual section


Manual

Use this section if sapi.py is not available in your version

1) Create the package

  • Start by creating a folder for the engine in the folder app/engines
  • Create the __init__ file


The following files need to follow a specific name format which is <engine_name>_<name_of_the_file>

  • Create the _aggragation_factory.py file

  • Create the _filter_factory.py file
  • Create the  _config.py file
  • Create the _engine.py file





on the aggregation factory file



1) Add the imports

  • Special attention to AggregationFactory
from typing import Union, List, Dict

from app.engines import AggregationFactory
from framework.aggregations import *
from models.engines.response import ResponseAgg, RangeAggEntry, TermAggEntry, StatsAggEntry

2) Declare the AggregationFactory class

  • This class will tell how to build the aggregations for this specific engine, and it will be used in different parts of the code

    The aggregations definitions are declared in the module framework.aggregations, these aggregation declarations are used by all engine type, and used for the declaration of a normalize engine request

class ExampleAggregationFactory(AggregationFactory):

3) Implement the abstract methods

  • Each method is the implementation of a specific aggregation type, all except for transform_aggregations, this methods is used for iteration on nested aggregations. 
  • Use other engines as example on how to implement these methods
  • Type of the responses depends on engine implementing
    @classmethod
    def transform_aggregations(cls, aggs: any) -> List[ResponseAgg]:
        # your code here, travers and return the nested aggregation for the engine in the engine's format
        pass

    @staticmethod
    def bool_agg(config: BoolAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format
        pass

    @staticmethod
    def browse_agg(config: BrowseAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format 
        pass

    @staticmethod
    def date_histogram_agg(config: DateHistogramAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def date_range_agg(config: DateRangeAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def histogram_agg(config: HistogramAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def range_agg(config: RangeAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def slider_agg(config: SliderAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def term_agg(config: TermAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def top_hits_agg(config: TermAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def filter_agg(config: FilterAgg) -> any:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def term_entry_transform(bucket: List[Dict]) -> List[TermAggEntry]:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def range_entry_transform(bucket: List[Dict]) -> Union[List[RangeAggEntry], None]:
        # your code here, return the aggregation for the engine in the engine's format  
        pass

    @staticmethod
    def stats_entry_transform(agg: any) -> Union[List[StatsAggEntry], None]:
        # your code here, return the aggregation for the engine in the engine's format  
        pass




on the filter factory file



4) Add the imports

  • Special attention to FilterFactory
from app.engines import FilterFactory
from framework.filters import *

5) Declare the FilterFactory class

  • This class will tell how to build the filters for this specific engine, and it will be used in different parts of the code

    The filters definitions are declared in the module framework.filters, these filter declarations are used by all engine type, and used for the declaration of a normalize engine request


class ExampleFilterFactory(FilterFactory):

6) Implement the abstract methods

  • Each method is the implementation of a specific filter type
  • Use other engines as example on how to implement these methods
  • Type of the responses depends on engine implementing
        @staticmethod
    def geo_shape_filter(config: GeoShapeFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def range_filter(config: RangeFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def wildcard_filter(config: WildcardFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def term_filter(config: TermFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def terms_filter(config: TermFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def exist_filter(config: ExistFilter):
        # your code here, return the filter for the engine in the engine's format
        pass

    @staticmethod
    def bool_filter(config: BoolFilter):
        # your code here, return the filter for the engine in the engine's format
        pass




on the config file



7) Add the imports

  • Special attention to BaseEngineConnection
from enum import Enum

from pydantic import Field

from models.engines import BaseEngineConnection

8) Add BoolOption Enum

  • Optional but recommended
  • This is used for the bool structure 
class BoolOption(Enum):
    MUST = 'must'
    SHOULD = 'should'

9) Declare the Config class

  • Essentially a pydantic model, the is where your configuration will be
  • Inherits from the BaseEngineConnection, so automatically has all the parameters view in Engine Framework
class ExampleConfig(BaseEngineConnection):




on the config file



10) Add the imports

  • Special attention to BaseEngine, also import the FilterFactory, AggregationFactory and Config created for this engine
  • All responses and request can be find in models.engines.response, these are the official responses and request manage by the engine connection
from typing import Union, List, Tuple, Any, Dict

from app.engines import BaseEngine, BulkEntry
from app.engines.example.example_aggregation_factory import ExampleAggregationFactory
from app.engines.example.example_config import ExampleConfig
from app.engines.example.example_filter_factory import ExampleFilterFactory
from models.engines import HighlightConfig
from models.engines.response import ErrorResponse, ByQueryResponse, CRUDResponse, ResponseHit, SearchRequest, SearchResponse
from utils.data import find

11) Declare the Engine class

  • Inheriting BaseEngine, and BaseEngine accepts as types the AggregationFactory, FilterFactory and Config
  • Here is where all the request will be defined

class ExampleEngine(BaseEngine[ExampleConfig, ExampleFilterFactory, ExampleAggregationFactory]):

12) Implement the abstract methods

  • __init__
    • The engine initialization, required to call the super __init__ with the configuration of the engine (this will be passed by the Connection Manager), and the instance of a FilterFactory and the instance of an AggregationFactory

    • The instances of both Filter and Aggregation can be access anywhere in Search API via Connection Manager
  • Abstract Methods
    • Each method is the implementation of a specific request to the engine
    • You can add additional functions to support the response of this oficial request
    • Use other engines as example on how to implement these methods
    • Type of the responses and error are standardize
    • There are utility methods suggested to simplify the code
      • generate_highlight - use this to generate the highlight structure
      • boolean_structure - use this to manipulate boolean structures
      • transform_hit - Use this to standardize the hit responses
      • transform_inner_hits - Same as above but focus to nested hit structures
      • transform_crud - Transform for standard CRUD requests
      • transform_by_query - Transform query CRUD requests like delete_by_query, update_by_query, reindex ,...
    def __init__(self, config: ExampleConfig):
        super().__init__(config=ExampleConfig(**config.dict()), filter_factory=ExampleFilterFactory(),
                         agg_factory=ExampleAggregationFactory())

    @staticmethod
    def generate_highlight(config: HighlightConfig) -> Any:
        # your code here, generate the highlight config
        pass

    @staticmethod
    def boolean_structure(body: Dict, must_not: List[Dict], must: List[Dict], should_not: List[Dict],
                          should: List[Dict]):
        # your code here, generate the boolean structure
        pass

    @staticmethod
    def transform_hit(hit: Dict) -> ResponseHit:
        # return ResponseHit(index=, id=, score=, data=, highlight=,
        #                    inner_hits=TestEngine.transform_inner_hits(find(hit, 'Inner hits location in hit')))
        pass

    @staticmethod
    def transform_inner_hits(inner_hits: Dict) -> Union[Dict[str, SearchResponse], None]:
        # If not interested in getting inner hits just leave pass

        # if inner_hits is None or len(inner_hits) == 0:
        #     return None
        #
        # inner_hits_normalized: Dict[str, SearchResponse] = {}
        #
        # for key in inner_hits.keys():
        #     # your code here to fetch the inner hits information
        #
        #     inner_hits_normalized[key] = SearchResponse(
        #         total=,
        #         max_score=,
        #         hits=[TestEngine.transform_hit(hit) for hit in find(inner_hits[key], 'hits location in inner hit', [])])
        #
        # return inner_hits_normalized
        pass

    @staticmethod
    def transform_crud(response: Dict) -> CRUDResponse:
        # return CRUDResponse(index=, id=, result=, version=)
        pass

    @staticmethod
    def transform_by_query(response: Dict) -> ByQueryResponse:
        # return ByQueryResponse(took=, timed_out=, total=, deleted=, updated=, created=, batches=,  failures=)
        pass

    def search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]:
        pass

    def multi_search(self, ata: List[Tuple[str, SearchRequest]], **kwargs):
        pass

    def async_search(self, index: str, data: SearchRequest, **kwargs) -> str:
        pass

    def async_search_fetch(self, index: str, _id: str, **kwargs) -> Union[SearchResponse, ErrorResponse]:
        pass

    def async_search_delete(self, index: str, _id: str, **kwargs) -> [bool, ErrorResponse]:
        pass

    def knn_search(self, index: str, data: SearchRequest, **kwargs) -> Union[SearchResponse, ErrorResponse]:
        pass

    def scroll_search(self, scroll_id: str, **kwargs) -> Union[Dict, ErrorResponse]:
        pass

    def clear_scroll_search(self, scroll_id: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def create_index_template(self, template_name: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def create_index(self, index: str, data: Dict, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def delete_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def get_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
        pass

    def exist_template(self, template_name: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def exist_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def open_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def close_index(self, index: str, **kwargs) -> Union[bool, ErrorResponse]:
        pass

    def stats_index(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
        pass

    def get_mapping(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
        pass

    def update_mapping(self, index: str, data: Dict, **kwargs):
        pass

    def get_settings(self, index: str, **kwargs) -> Union[Dict, ErrorResponse]:
        pass

    def update_settings(self, index: str, data: Dict, **kwargs) -> None:
        pass

    def index_doc(self, index: str, doc: Dict, doc_id: str = None, routing: str = None, **kwargs) -> Union[
        CRUDResponse, ErrorResponse]:
        pass

    def get_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[ResponseHit, ErrorResponse]:
        pass

    def update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[
        CRUDResponse, ErrorResponse]:
        pass

    def partial_update_doc(self, index: str, doc_id: str, doc: Dict, routing: str = None, **kwargs) -> Union[
        CRUDResponse, ErrorResponse]:
        pass

    def script_update_doc(self, index: str, doc_id: str, script, **kwargs) -> Union[CRUDResponse, ErrorResponse]:
        pass

    def delete_doc(self, index: str, doc_id: str, routing: str = None, **kwargs) -> Union[CRUDResponse, ErrorResponse]:
        pass

    def bulk(self, *entries: BulkEntry, **kwargs) -> None:
        pass

    def delete_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]:
        pass

    def update_by_query(self, index: str, data: Dict, **kwargs) -> Union[ByQueryResponse, ErrorResponse]:
        pass

    def reindex(self, data: Dict, **kwargs) -> None:
        pass

    def count(self, index: str, data: Dict = None, **kwargs) -> Union[int, ErrorResponse]:
        pass

    def transform_response(self, response: Dict) -> Union[SearchResponse, ErrorResponse]:
        # return SearchResponse( scroll_id=, took=, timed_out=, total=, max_score=, aggs=,
        #     hits=[self.transform_hit(hit) for hit in find(response, 'Hits location in response', [])])
        pass
  • No labels