This page offers a series of practical examples for each method of the engine connection, showcasing a variety of functionalities including indexing, querying, document management, and advanced search operations like vector searches and bulk actions.

Each example is crafted to be engine-agnostic, ensuring that the principles and techniques demonstrated are not tied to any specific search engine provider.

Designed to enhance understanding and implementation, the content outlines method descriptions, input parameters, expected outputs, and error handling strategies comprehensively. The goal is to equip developers and system architects with the necessary insights to effectively leverage search engine capabilities, streamline functionalities, and enhance system performance in a variety of applications.


Accessing the Connection Manager in Code

You can get the connection Manager in your code using:

from app.rest import connection_manager

We recommend using this import as a local import, calling it exactly when is required and not before, this is to prevent issues with dependencies

Getting Connection by Default

The default connection is the engine connection with the explicit default flag in true, or if none with the flag, the first in the list of connections. This connection is used as well for all internal transactions of GAIA API  such as logging, feedback, analytics,...

To get the default connection, just use the code line below: 

engine_conn = connection_manager.get_default()

Getting Connection by name

You can have multiple connections to the same or different types of NonSQL engines, all with a name you can refer to, in order to get said connection. 

To get a connection by name, just use the code line below:

engine = connection_manager.get_engine(name=engine_name)


Example of Methods Usage

get_vector_query

Retrieves vector-based query results given specific parameters.

from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import VectorQueryType

self.engine_conn = connection_manager.get_default()

# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None

# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type)

Executes a search operation on a specified index using provided search data.

from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.search(index, data)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Search failed: {response.error}")

Performs multiple search queries across different indices.

from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest
from typing import List, Tuple

self.engine_conn = connection_manager.get_default()

# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1", SearchRequest(query={"match": {"field": "value"}})),
                                           ("index2", SearchRequest(query={"term": {"user": "example"}}))]

# Executing the method
results = self.engine_conn.multi_search(queries)

# Note: Error handling would typically check each individual response, this method assumes successful implementation.

Initiates an asynchronous search on the specified index.

from gaia_core.rest = connection_manager
from gaia_core.models = SearchRequest
from typing import str

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})

# Executing the method
async_id: str = self.engine_conn.async_search(index, data)

# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.

async_search_fetch

Fetches the results of an ongoing asynchronous search using the search ID.

from gaia_core.rest import connection_manager
from gaia_core.models import SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id"

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.async_search_fetch(index, search_id)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Asynchronous search fetch failed: {response.error}")

async_search_delete

Deletes an ongoing asynchronous search using the search ID.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.async_search_delete(index, search_id)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to delete asynchronous search: {result.error}")

Performs a k-nearest neighbors search on the specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.knn_search(index, data)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"KNN search failed: {response.error}")

Continues a search from a specific point using a scroll identifier.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "existing_scroll_id"

# Executing the method
response: Union[Dict, ErrorResponse] = self.engine_conn.scroll_search(scroll_id)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Scroll search continuation failed: {response.error}")

Clears resources related to a scroll search identified by a scroll identifier.

from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "scroll_id_to_clear"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.clear_scroll_search(scroll_id)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to clear scroll search: {result.error}")

create_index_template

Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
template_name: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.create_index_template(template_name, data)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to create index template: {result.error}")

create_index

Creates an index with specific settings and mappings based on provided data.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.create_index(index, data)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to create index: {result.error}")

delete_index

Deletes a specified index.

from gaia_core.rest import connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_delete"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.delete_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to delete index: {result.error}")

get_index

Retrieves information about the specified index, such as settings and mappings.

from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "existing_index"

# Executing the method
response: Union[Dict, ErrorResponse] = self.engine_conn.get_index(index)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to retrieve index information: {response.error}")

update_mapping

exist_template

Checks if a specified template exists.

from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing = Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
template: str = "my_template"

# Executing the method
exists: Union[bool, ErrorResponse] = self.engine_conn.exist_template(template_name)

# Error Handling
if isinstance(exists, ErrorResponse):
    raise Exception(f"Failed to check if template
 exists: {exists.error}")

exist_index

Checks if a specified index exists.

from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing = Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_check"

# Executing the method
exists: Union[bool, ErrorResponse] = self.engine_conn.exist_index(index)

# Error Handling
if isinstance(exists, ErrorResponse):
    raise Exception(f"Failed to check if index exists: {exists.error}")

open_index

Opens a previously closed index to make it available for read and write operations.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_open"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.open_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to open index: {result.error}")

close_index

Closes an index to make it read-only, reducing resource usage.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_close"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.close_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to close index: {result.error}")

stats_index

Retrieves statistical information about a specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_for_stats"

# Executing the method
stats: Union[Dict, ErrorResponse] = self.engine_conn.stats_index(index)

# Error Handling
if isinstance(stats, ErrorResponse):
    raise Exception(f"Failed to get index stats: {stats.error}")

get_mapping

Retrieves the mapping of the specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_get_mapping"

# Executing the method
mapping: Union[Dict, ErrorResponse] = self.engine_conn.get_mapping(index)

# Error Handling
if isinstance(mapping, ErrorResponse):
    raise Exception(f"Failed to get mapping for index: {mapping.error}")

update_mapping

Updates the mapping for a specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_update_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}

# Executing the method
self.engine_conn.update_mapping(index, data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

get_settings

Retrieves the settings of the specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_get_settings"

# Executing the method
settings: Union[Dict, ErrorResponse] = self.engine_conn.get_settings(index)

# Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to get settings for index: {settings.error}")

update_settings

Retrieves the settings of the specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_update_settings" 
data = {"settings": {"refresh_interval": "1s"}}

# Executing the method 
updated_settings: Union[Dict, ErrorResponse] = self.engine_conn.update_settings(index, data)

# Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to update settings for index: {updated_settings.error}")

index_doc

Indexes a document in the specified index, potentially under a specific document ID.

from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc: Dict = {"field1": "value1"}
doc_id: str = "doc123"  # Optional
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.index_doc(index, doc, doc_id=doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to index document: {response.error}")

get_doc

Retrieves a document by ID from the specified index.

from gaia_core.rest = connection_manager
from gaia_core.models = ResponseHit, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional

# Executing the method
document: Union[ResponseHit, ErrorResponse] = self.engine_conn.get_doc(index, doc_id, routing=routing)

# Error Handling
if isinstance(document, ErrorResponse):
    raise Exception(f"Failed to retrieve document: {document.error}")

update_doc

Updates a document by ID in the specified index.

from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "updated_value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.update_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document: {response.error}")

upsert_doc

Inserts a new document or updates an existing document by ID in the specified index.

from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.upsert_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to upsert document: {response.error}")

partial_update_doc

Partially updates a document by ID in the specified index, modifying only the specified fields.

from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
updates: Dict = {"field2": "new_value"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.partial_update_doc(index, doc_id, updates, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to partially update document: {response.error}")

script_update_doc

Updates a document by applying a script. Useful for complex updates where direct field modifications are insufficient.

from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
script = "ctx._source.field2 += params.count"
params = {"count": 1}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.script_update_doc(index, doc_id, script, params, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document with script: {response.error}")

delete_doc

Deletes a document by ID from the specified index.

from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.delete_doc(index, doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete document: {response.error}")

bulk

Executes multiple indexing, update, or delete operations in a single API call.

from gaia_core.rest = connection_manager
from gaia_core.models = BulkEntry, ErrorResponse
from typing = Union, List

self.engine_conn = connection_manager.get_default()

# Inputs
actions: List[BulkEntry] = [
    {"action": "index", "index": "my_index", "id": "1", "document": {"field1": "value1", "field2": "value2"}},
    {"action": "create", "index": "my_index", "id": "2", "document": {"field1": "new_value1"}},
    {"action": "update", "index": "my_index", "id": "1", "document": {"doc": {"field2": "updated_value2"}}},
    {"action": "delete", "index": "my_index", "id": "3"}
]

refresh: str = "true"

# Executing the method
results = self.engine_conn.bulk(*actions, refresh=refresh)
# Note: Assuming results handling based on the nature of bulk operations; each entry should be checked for errors.

delete_by_query

Deletes documents from the specified index that match the given query.

from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
query: Dict = {"query": {"match": {"field1": "value_to_delete"}}}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.delete_by_query(index, query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete by query: {response.error}")

update_by_query

Updates documents in the specified index based on a provided query and update rules.

from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
update_query: Dict = {
    "query": {"match_all": {}},
    "script": {"source": "ctx._source.field1 = 'updated_value'"}
}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.update_by_query(index, update_query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update by query: {response.error}")

reindex

Reindexes documents from one index to another, potentially transforming document data during the process.

from gaia_core.rest import connection_manager
from typing import Dict

self.engine_conn = connection_manager.get_default()

# Inputs
reindex_data: Dict = {
    "source": {"index": "old_index"},
    "dest": {"index": "new_index"},
    "script": {"source": "ctx._source.new_field = 'value'"}
}

# Executing the method
self.engine_conn.reindex(reindex_data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

count

Counts the number of documents in the specified index that match a given query.

from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, int

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
count_query: Dict = {"query": {"match_all": {}}}

# Executing the method
count_result: Union[int, ErrorResponse] = self.engine_conn.count(index, count_query)

# Error Handling
if isinstance(count_result, ErrorResponse):
    raise Exception(f"Failed to count documents: {count_result.error}")