This page offers a series of practical examples for each method of the engine connection, showcasing a variety of functionalities including indexing, querying, document management, and advanced search operations like vector searches and bulk actions.
Each example is crafted to be engine-agnostic, ensuring that the principles and techniques demonstrated are not tied to any specific search engine provider.
Designed to enhance understanding and implementation, the content outlines method descriptions, input parameters, expected outputs, and error handling strategies comprehensively. The goal is to equip developers and system architects with the necessary insights to effectively leverage search engine capabilities, streamline functionalities, and enhance system performance in a variety of applications.
Retrieves vector-based query results given specific parameters.
from gaia_core.rest import connection_manager from typing import Tuple, Dict, List from gaia_core.models import SearchFilters, ErrorResponse from gaia_core.models.engines.enums import VectorQueryType self.engine_conn = connection_manager.get_default() # Inputs query: Dict = {"query": { "query_string": { "query": "sample text" }}} query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT min_score: float = 0.5 source: str = "knn_score" field: str = "vector_field" vector: List[float] = [0.5, 0.2, 0.1] space_type: str = "l2" child_type: Optional[str] = None # Executing the method result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type)
Executes a search operation on a specified index using provided search data.
from gaia_core.rest import connection_manager from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse from typing import Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "sample_index" data: SearchRequest = SearchRequest(query={"match_all": {}}) # Executing the method response: Union[SearchResponse, ErrorResponse] = self.engine_conn.search(index, data) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Search failed: {response.error}")
Performs multiple search queries across different indices.
from gaia_core.rest import connection_manager from gaia_core.models import SearchRequest from typing import List, Tuple self.engine_conn = connection_manager.get_default() # Inputs queries: List[Tuple[str, SearchRequest]] = [("index1", SearchRequest(query={"match": {"field": "value"}})), ("index2", SearchRequest(query={"term": {"user": "example"}}))] # Executing the method results = self.engine_conn.multi_search(queries) # Note: Error handling would typically check each individual response, this method assumes successful implementation.
Initiates an asynchronous search on the specified index.
from gaia_core.rest = connection_manager from gaia_core.models = SearchRequest from typing import str self.engine_conn = connection_manager.get_default() # Inputs index: str = "async_index" data: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}}) # Executing the method async_id: str = self.engine_conn.async_search(index, data) # Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.
Fetches the results of an ongoing asynchronous search using the search ID.
from gaia_core.rest import connection_manager from gaia_core.models import SearchResponse, ErrorResponse from typing import Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "async_index" search_id: str = "async_search_id" # Executing the method response: Union[SearchResponse, ErrorResponse] = self.engine_conn.async_search_fetch(index, search_id) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Asynchronous search fetch failed: {response.error}")
Deletes an ongoing asynchronous search using the search ID.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "async_index" search_id: str = "async_search_id" # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.async_search_delete(index, search_id) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to delete asynchronous search: {result.error}")
Performs a k-nearest neighbors search on the specified index.
from gaia_core.rest import connection_manager from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse from typing import Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "knn_index" data: SearchRequest = SearchRequest(query={"match": {"field": "value"}}) # Executing the method response: Union[SearchResponse, ErrorResponse] = self.engine_conn.knn_search(index, data) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"KNN search failed: {response.error}")
Continues a search from a specific point using a scroll identifier.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs scroll_id: str = "existing_scroll_id" # Executing the method response: Union[Dict, ErrorResponse] = self.engine_conn.scroll_search(scroll_id) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Scroll search continuation failed: {response.error}")
Clears resources related to a scroll search identified by a scroll identifier.
from gaia_core.rest = connection_manager from gaia_core.models = ErrorResponse from typing import Union, bool self.engine_conn = connection_manager.get_default() # Inputs scroll_id: str = "scroll_id_to_clear" # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.clear_scroll_search(scroll_id) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to clear scroll search: {result.error}")
Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict, bool self.engine_conn = connection_manager.get_default() # Inputs template_name: str = "my_template" data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}} # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.create_index_template(template_name, data) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to create index template: {result.error}")
Creates an index with specific settings and mappings based on provided data.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "new_index" data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}} # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.create_index(index, data) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to create index: {result.error}")
Deletes a specified index.
from gaia_core.rest import connection_manager from gaia_core.models = ErrorResponse from typing import Union, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_delete" # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.delete_index(index) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to delete index: {result.error}")
Retrieves information about the specified index, such as settings and mappings.
from gaia_core.rest = connection_manager from gaia_core.models = ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "existing_index" # Executing the method response: Union[Dict, ErrorResponse] = self.engine_conn.get_index(index) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to retrieve index information: {response.error}")
Checks if a specified index exists.
from gaia_core.rest = connection_manager from gaia_core.models = ErrorResponse from typing = Union, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_check" # Executing the method exists: Union[bool, ErrorResponse] = self.engine_conn.exist_index(index) # Error Handling if isinstance(exists, ErrorResponse): raise Exception(f"Failed to check if index exists: {exists.error}")
Opens a previously closed index to make it available for read and write operations.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_open" # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.open_index(index) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to open index: {result.error}")
Closes an index to make it read-only, reducing resource usage.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, bool self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_close" # Executing the method result: Union[bool, ErrorResponse] = self.engine_conn.close_index(index) # Error Handling if isinstance(result, ErrorResponse): raise Exception(f"Failed to close index: {result.error}")
Retrieves statistical information about a specified index.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_for_stats" # Executing the method stats: Union[Dict, ErrorResponse] = self.engine_conn.stats_index(index) # Error Handling if isinstance(stats, ErrorResponse): raise Exception(f"Failed to get index stats: {stats.error}")
Retrieves the mapping of the specified index.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_get_mapping" # Executing the method mapping: Union[Dict, ErrorResponse] = self.engine_conn.get_mapping(index) # Error Handling if isinstance(mapping, ErrorResponse): raise Exception(f"Failed to get mapping for index: {mapping.error}")
Updates the mapping for a specified index.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_update_mapping" data: Dict = {"properties": {"new_field": {"type": "text"}}} # Executing the method self.engine_conn.update_mapping(index, data) # Note: Assuming the method executes without returning an ErrorResponse directly. # Real-world implementation should include error handling as per API specifications.
Retrieves the settings of the specified index.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "index_to_get_settings" # Executing the method settings: Union[Dict, ErrorResponse] = self.engine_conn.get_settings(index) # Error Handling if isinstance(settings, ErrorResponse): raise Exception(f"Failed to get settings for index: {settings.error}")
Indexes a document in the specified index, potentially under a specific document ID.
from gaia_core.rest import connection_manager from gaia_core.models import CRUDResponse, ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc: Dict = {"field1": "value1"} doc_id: str = "doc123" # Optional routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.index_doc(index, doc, doc_id=doc_id, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to index document: {response.error}")
Retrieves a document by ID from the specified index.
from gaia_core.rest = connection_manager from gaia_core.models = ResponseHit, ErrorResponse from typing = Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" routing: str = "routing_key" # Optional # Executing the method document: Union[ResponseHit, ErrorResponse] = self.engine_conn.get_doc(index, doc_id, routing=routing) # Error Handling if isinstance(document, ErrorResponse): raise Exception(f"Failed to retrieve document: {document.error}")
Updates a document by ID in the specified index.
from gaia_core.rest = connection_manager from gaia_core.models = CRUDResponse, ErrorResponse from typing = Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" doc: Dict = {"field1": "updated_value1"} routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.update_doc(index, doc_id, doc, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to update document: {response.error}")
Inserts a new document or updates an existing document by ID in the specified index.
from gaia_core.rest import connection_manager from gaia_core.models import CRUDResponse, ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" doc: Dict = {"field1": "value1"} routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.upsert_doc(index, doc_id, doc, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to upsert document: {response.error}")
Partially updates a document by ID in the specified index, modifying only the specified fields.
from gaia_core.rest import connection_manager from gaia_core.models import CRUDResponse, ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" updates: Dict = {"field2": "new_value"} routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.partial_update_doc(index, doc_id, updates, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to partially update document: {response.error}")
Updates a document by applying a script. Useful for complex updates where direct field modifications are insufficient.
from gaia_core.rest import connection_manager from gaia_core.models import CRUDResponse, ErrorResponse from typing import Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" script = "ctx._source.field2 += params.count" params = {"count": 1} routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.script_update_doc(index, doc_id, script, params, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to update document with script: {response.error}")
Deletes a document by ID from the specified index.
from gaia_core.rest = connection_manager from gaia_core.models = CRUDResponse, ErrorResponse from typing = Union self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" doc_id: str = "doc123" routing: str = "routing_key" # Optional refresh: str = "true" # Optional # Executing the method response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.delete_doc(index, doc_id, routing=routing, refresh=refresh) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to delete document: {response.error}")
Executes multiple indexing, update, or delete operations in a single API call.
from gaia_core.rest = connection_manager from gaia_core.models = BulkEntry, ErrorResponse from typing = Union, List self.engine_conn = connection_manager.get_default() # Inputs actions: List[BulkEntry] = [ {"action": "index", "index": "my_index", "id": "1", "document": {"field1": "value1", "field2": "value2"}}, {"action": "create", "index": "my_index", "id": "2", "document": {"field1": "new_value1"}}, {"action": "update", "index": "my_index", "id": "1", "document": {"doc": {"field2": "updated_value2"}}}, {"action": "delete", "index": "my_index", "id": "3"} ] refresh: str = "true" # Executing the method results = self.engine_conn.bulk(*actions, refresh=refresh) # Note: Assuming results handling based on the nature of bulk operations; each entry should be checked for errors.
Deletes documents from the specified index that match the given query.
from gaia_core.rest import connection_manager from gaia_core.models import ByQueryResponse, ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" query: Dict = {"query": {"match": {"field1": "value_to_delete"}}} # Executing the method response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.delete_by_query(index, query) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to delete by query: {response.error}")
Updates documents in the specified index based on a provided query and update rules.
from gaia_core.rest import connection_manager from gaia_core.models import ByQueryResponse, ErrorResponse from typing import Union, Dict self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" update_query: Dict = { "query": {"match_all": {}}, "script": {"source": "ctx._source.field1 = 'updated_value'"} } # Executing the method response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.update_by_query(index, update_query) # Error Handling if isinstance(response, ErrorResponse): raise Exception(f"Failed to update by query: {response.error}")
Reindexes documents from one index to another, potentially transforming document data during the process.
from gaia_core.rest import connection_manager from typing import Dict self.engine_conn = connection_manager.get_default() # Inputs reindex_data: Dict = { "source": {"index": "old_index"}, "dest": {"index": "new_index"}, "script": {"source": "ctx._source.new_field = 'value'"} } # Executing the method self.engine_conn.reindex(reindex_data) # Note: Assuming the method executes without returning an ErrorResponse directly. # Real-world implementation should include error handling as per API specifications.
Counts the number of documents in the specified index that match a given query.
from gaia_core.rest import connection_manager from gaia_core.models import ErrorResponse from typing import Union, Dict, int self.engine_conn = connection_manager.get_default() # Inputs index: str = "my_index" count_query: Dict = {"query": {"match_all": {}}} # Executing the method count_result: Union[int, ErrorResponse] = self.engine_conn.count(index, count_query) # Error Handling if isinstance(count_result, ErrorResponse): raise Exception(f"Failed to count documents: {count_result.error}")