Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page offers a series of practical examples for each method of the engine connection, showcasing a variety of functionalities including indexing, querying, document management, and advanced search operations like vector searches and bulk actions.

Note

Each example is crafted to be engine-agnostic, ensuring that the principles and techniques demonstrated are not tied to any specific search engine provider.

Designed to enhance understanding and implementation, the content outlines method descriptions, input parameters, expected outputs, and error handling strategies comprehensively. The goal is to equip developers and system architects with the necessary insights to effectively leverage search engine capabilities, streamline functionalities, and enhance system performance in a variety of applications.


Accessing the Connection Manager in Code

You can get the connection Manager in your code using:

Code Block
languagepy
themeDJango
from app.rest import connection_manager
Tip

We recommend using this import as a local import, calling it exactly when is required and not before, this is to prevent issues with dependencies

Getting Connection by Default

The default connection is the engine connection with the explicit default flag in true, or if none with the flag, the first in the list of connections. This connection is used as well for all internal transactions of GAIA API  such as logging, feedback, analytics,...

To get the default connection, just use the code line below: 

Code Block
languagepy
themeDJango
engine_conn = connection_manager.get_default()

Getting Connection by name

You can have multiple connections to the same or different types of NonSQL engines, all with a name you can refer to, in order to get said connection. 

To get a connection by name, just use the code line below:

Code Block
languagepy
themeDJango
engine = connection_manager.get_engine(name=engine_name)


Example of Methods Usage

Table of Contents

get_vector_query

Retrieves vector-based query results given specific parameters.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import VectorQueryType

self.engine_conn = connection_manager.get_default()

# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None

# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type)

Executes a search operation on a specified index using provided search data.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.search(index, data)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Search failed: {response.error}")

Performs multiple search queries across different indices.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest
from typing import List, Tuple

self.engine_conn = connection_manager.get_default()

# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1", SearchRequest(query={"match": {"field": "value"}})),
                                           ("index2", SearchRequest(query={"term": {"user": "example"}}))]

# Executing the method
results = self.engine_conn.multi_search(queries)

# Note: Error handling would typically check each individual response, this method assumes successful implementation.

Initiates an asynchronous search on the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = SearchRequest
from typing import str

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})

# Executing the method
async_id: str = self.engine_conn.async_search(index, data)

# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.

async_search_fetch

Fetches the results of an ongoing asynchronous search using the search ID.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id"

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.async_search_fetch(index, search_id)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Asynchronous search fetch failed: {response.error}")

async_search_delete

Deletes an ongoing asynchronous search using the search ID.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.async_search_delete(index, search_id)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to delete asynchronous search: {result.error}")

Performs a k-nearest neighbors search on the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.knn_search(index, data)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"KNN search failed: {response.error}")

Continues a search from a specific point using a scroll identifier.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "existing_scroll_id"

# Executing the method
response: Union[Dict, ErrorResponse] = self.engine_conn.scroll_search(scroll_id)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Scroll search continuation failed: {response.error}")

Clears resources related to a scroll search identified by a scroll identifier.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "scroll_id_to_clear"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.clear_scroll_search(scroll_id)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to clear scroll search: {result.error}")

create_index_template

Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
template_name: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.create_index_template(template_name, data)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to create index template: {result.error}")

create_index

Creates an index with specific settings and mappings based on provided data.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.create_index(index, data)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to create index: {result.error}")

delete_index

Deletes a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_delete"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.delete_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to delete index: {result.error}")

get_index

Retrieves information about the specified index, such as settings and mappings.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "existing_index"

# Executing the method
response: Union[Dict, ErrorResponse] = self.engine_conn.get_index(index)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to retrieve index information: {response.error}")

update_mapping

exist_

index

template

Checks if a specified index template exists.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing = Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
indextemplate: str = "indexmy_to_checktemplate"

# Executing the method
exists: Union[bool, ErrorResponse] = self.engine_conn.exist_indextemplate(indextemplate_name)

# Error Handling
if isinstance(exists, ErrorResponse):
    raise Exception(f"Failed to check if indextemplate
 exists: {exists.error}")

open

exist_index

Opens Checks if a previously closed index to make it available for read and write operationsspecified index exists.

Code Block
languagepy
themeDJango
from gaia_core.rest import= connection_manager
from gaia_core.models import= ErrorResponse
from typing import= Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_opencheck"

# Executing the method
resultexists: Union[bool, ErrorResponse] = self.engine_conn.openexist_index(index)

# Error Handling
if isinstance(resultexists, ErrorResponse):
    raise Exception(f"Failed to check openif index exists: {resultexists.error}")

close

open_index

Closes an Opens a previously closed index to make it read-only, reducing resource usageavailable for read and write operations.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_closeopen"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.closeopen_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to closeopen index: {result.error}")

stats

close_index

Retrieves statistical information about a specified indexCloses an index to make it read-only, reducing resource usage.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dictbool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_forto_statsclose"

# Executing the method
statsresult: Union[Dictbool, ErrorResponse] = self.engine_conn.statsclose_index(index)

# Error Handling
if isinstance(statsresult, ErrorResponse):
    raise Exception(f"Failed to getclose index stats: {statsresult.error}")

get

stats_

mapping

index

Retrieves the mapping of the statistical information about a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_tofor_get_mappingstats"

# Executing the method
mappingstats: Union[Dict, ErrorResponse] = self.engine_conn.getstats_mappingindex(index)

# Error Handling
if isinstance(mappingstats, ErrorResponse):
    raise Exception(f"Failed to get mappingindex for indexstats: {mappingstats.error}")

update

get_mapping

Updates Retrieves the mapping for a of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_updateget_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}
# Executing the method
mapping: Union[Dict, ErrorResponse] = self.engine_conn.get_mapping(index)

# Error Handling
if isinstance(mapping, ErrorResponse):
    raise Exception(f"Failed to get mapping for index: {mapping.error}")

update_mapping

Updates the mapping for a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_update_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}

# Executing the method
self.engine_conn.update_mapping(index, data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

get_settings

Retrieves the settings of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_get_settings"

# Executing the method
settings: Union[Dict, ErrorResponse] = self.engine_conn.updateget_mappingsettings(index, data)

# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

 Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to get settings for index: {settings.error}")

update

get

_settings

Retrieves the settings of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_get_settings"update_settings" 
data = {"settings": {"refresh_interval": "1s"}}

# Executing the method 
updated_settings: Union[Dict, ErrorResponse] = self.engine_conn.getupdate_settings(index, data)

# Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to getupdate settings for index: {updated_settings.error}")

index_doc

Indexes a document in the specified index, potentially under a specific document ID.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc: Dict = {"field1": "value1"}
doc_id: str = "doc123"  # Optional
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.index_doc(index, doc, doc_id=doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to index document: {response.error}")

get_doc

Retrieves a document by ID from the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ResponseHit, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional

# Executing the method
document: Union[ResponseHit, ErrorResponse] = self.engine_conn.get_doc(index, doc_id, routing=routing)

# Error Handling
if isinstance(document, ErrorResponse):
    raise Exception(f"Failed to retrieve document: {document.error}")

update_doc

Updates a document by ID in the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "updated_value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.update_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document: {response.error}")

upsert_doc

Inserts a new document or updates an existing document by ID in the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.upsert_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to upsert document: {response.error}")

partial_update_doc

Partially updates a document by ID in the specified index, modifying only the specified fields.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
updates: Dict = {"field2": "new_value"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.partial_update_doc(index, doc_id, updates, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to partially update document: {response.error}")

script_update_doc

Updates a document by applying a script. Useful for complex updates where direct field modifications are insufficient.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
script = "ctx._source.field2 += params.count"
params = {"count": 1}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.script_update_doc(index, doc_id, script, params, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document with script: {response.error}")

delete_doc

Deletes a document by ID from the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.delete_doc(index, doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete document: {response.error}")

bulk

Executes multiple indexing, update, or delete operations in a single API call.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = BulkEntry, ErrorResponse
from typing = Union, List

self.engine_conn = connection_manager.get_default()

# Inputs
actions: List[BulkEntry] = [
    {"action": "index", "index": "my_index", "id": "1", "document": {"field1": "value1", "field2": "value2"}},
    {"action": "create", "index": "my_index", "id": "2", "document": {"field1": "new_value1"}},
    {"action": "update", "index": "my_index", "id": "1", "document": {"doc": {"field2": "updated_value2"}}},
    {"action": "delete", "index": "my_index", "id": "3"}
]

refresh: str = "true"

# Executing the method
results = self.engine_conn.bulk(*actions, refresh=refresh)
# Note: Assuming results handling based on the nature of bulk operations; each entry should be checked for errors.

delete_by_query

Deletes documents from the specified index that match the given query.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
query: Dict = {"query": {"match": {"field1": "value_to_delete"}}}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.delete_by_query(index, query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete by query: {response.error}")

update_by_query

Updates documents in the specified index based on a provided query and update rules.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
update_query: Dict = {
    "query": {"match_all": {}},
    "script": {"source": "ctx._source.field1 = 'updated_value'"}
}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.update_by_query(index, update_query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update by query: {response.error}")

reindex

Reindexes documents from one index to another, potentially transforming document data during the process.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from typing import Dict

self.engine_conn = connection_manager.get_default()

# Inputs
reindex_data: Dict = {
    "source": {"index": "old_index"},
    "dest": {"index": "new_index"},
    "script": {"source": "ctx._source.new_field = 'value'"}
}

# Executing the method
self.engine_conn.reindex(reindex_data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

count

Counts the number of documents in the specified index that match a given query.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, int

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
count_query: Dict = {"query": {"match_all": {}}}

# Executing the method
count_result: Union[int, ErrorResponse] = self.engine_conn.count(index, count_query)

# Error Handling
if isinstance(count_result, ErrorResponse):
    raise Exception(f"Failed to count documents: {count_result.error}")