Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page offers a series of practical examples for each method of the engine connection, showcasing a variety of functionalities including indexing, querying, document management, and advanced search operations like vector searches and bulk actions.

Note

Each example is crafted to be engine-agnostic, ensuring that the principles and techniques demonstrated are not tied to any specific search engine provider.

Designed to enhance understanding and implementation, the content outlines method descriptions, input parameters, expected outputs, and error handling strategies comprehensively. The goal is to equip developers and system architects with the necessary insights to effectively leverage search engine capabilities, streamline functionalities, and enhance system performance in a variety of applications.

get_vector_query

Retrieves vector-based query results given specific parameters.


Accessing the Connection Manager in Code

You can get the connection Manager in your code using:

Code Block
languagepy
themeDJango
from 
gaia_core
app.rest import connection_manager
Tip

We recommend using this import as a local import, calling it exactly when is required and not before, this is to prevent issues with dependencies

Getting Connection by Default

The default connection is the engine connection with the explicit default flag in true, or if none with the flag, the first in the list of connections. This connection is used as well for all internal transactions of GAIA API  such as logging, feedback, analytics,...

To get the default connection, just use the code line below: 

Code Block
languagepy
themeDJango
engine_conn = connection_manager.get_default()

Getting Connection by name

You can have multiple connections to the same or different types of NonSQL engines, all with a name you can refer to, in order to get said connection. 

To get a connection by name, just use the code line below:

Code Block
languagepy
themeDJango
engine = connection_manager.get_engine(name=engine_name)


Example of Methods Usage

Table of Contents

get_vector_query

Retrieves vector-based query results given specific parameters.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import VectorQueryType

self.engine_conn = connection_manager.get_default()

# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None

# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type)

Executes a search operation on a specified index using provided search data.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.search(index, data)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Search failed: {response.error}")

Performs multiple search queries across different indices.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import SearchRequest
from typing import VectorQueryTypeList, Tuple

self.engine_conn = connection_manager.get_default()

# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None

# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type)

Executes a search operation on a specified index using provided search data.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})()

# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1", SearchRequest(query={"match": {"field": "value"}})),
                                           ("index2", SearchRequest(query={"term": {"user": "example"}}))]

# Executing the method
response: Union[SearchResponse, ErrorResponse]results = self.engine_conn.multi_search(index, dataqueries)

# Note: Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Search failed: {response.error}")

Performs multiple search queries across different indices
handling would typically check each individual response, this method assumes successful implementation.

Initiates an asynchronous search on the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import= connection_manager
from gaia_core.models import= SearchRequest
from typing import List, Tuplestr

self.engine_conn = connection_manager.get_default()

# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1", 
index: str = "async_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})),

# Executing the method
async_id: str = self.engine_conn.async_search(index, data)

# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.

async_search_fetch

Fetches the results of an ongoing asynchronous search using the search ID.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchResponse, ErrorResponse
from typing import Union

self.engine_conn                 ("index2", SearchRequest(query={"term": {"user": "example"}}))]= connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id"

# Executing the method
resultsresponse: Union[SearchResponse, ErrorResponse] = self.engine_conn.multiasync_search(queries_fetch(index, search_id)

# Note: Error handling would typically check each individual response, this method assumes successful implementation.Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Asynchronous search fetch failed: {response.error}")

async_search_delete

Initiates

Deletes an ongoing asynchronous search

on

using the

specified index

search ID.

Code Block
languagepy
themeDJango
from gaia_core.rest =import connection_manager
from gaia_core.models =import SearchRequestErrorResponse
from typing import strUnion, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
datasearch_id: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})str = "async_search_id"

# Executing the method
async_idresult: strUnion[bool, ErrorResponse] = self.engine_conn.async_search(index, data)

# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.

async_search_fetch

Fetches the results of an ongoing asynchronous search using the search ID
_delete(index, search_id)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to delete asynchronous search: {result.error}")

Performs a k-nearest neighbors search on the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "async_index"
search_id: str = "async_search_id" = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})

# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.asyncknn_search_fetch(index, search_iddata)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"AsynchronousKNN search fetch failed: {response.error}")

async
_deleteDeletes an ongoing asynchronous search using the search ID
knnPerforms a k-nearest neighbors search on the specified index

Clears resources related to a scroll search identified by a scroll identifier.

Code Block
languagepy
themeDJango
from gaia_core.rest import= connection_manager
from gaia_core.models import SearchRequest, SearchResponse, = ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
indexscroll_id: str = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})scroll_id_to_clear"

# Executing the method
responseresult: Union[SearchResponsebool, ErrorResponse] = self.engine_conn.knnclear_scroll_search(index, datascroll_id)

# Error Handling
if isinstance(responseresult, ErrorResponse):
    raise Exception(f"KNNFailed to clear scroll search failed: {responseresult.error}")

scroll

create_index_

search

template

Continues a search from a specific point using a scroll identifier

Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "existing_scroll_id" Inputs
template_name: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}

# Executing the method
responseresult: Union[Dictbool, ErrorResponse] = self.engine_conn.scrollcreate_index_searchtemplate(scroll_idtemplate_name, data)

# Error Handling
if isinstance(responseresult, ErrorResponse):
    raise Exception(f"ScrollFailed to searchcreate continuationindex failedtemplate: {responseresult.error}")

clear

create_

scroll_searchClears resources related to a scroll search identified by a scroll identifier

index

Creates an index with specific settings and mappings based on provided data.

Code Block
languagepy
themeDJango
from gaia_core.rest =import connection_manager
from gaia_core.models =import ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
scroll_id: str = "scroll_id_to_clear" = connection_manager.get_default()

# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.clearcreate_scroll_search(scroll_idindex(index, data)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to clearcreate scroll searchindex: {result.error}")

create_index_template

Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created

delete_index

Deletes a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import= ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
template_nameindex: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}index_to_delete"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.createdelete_index_template(template_name, data(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to createdelete index template: {result.error}")

create

get_index

Creates an index with specific

Retrieves information about the specified index, such as settings and mappings

based on provided data

.

Code Block
languagepy
themeDJango
from gaia_core.rest import= connection_manager
from gaia_core.models import= ErrorResponse
from typing import Union, Dict, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}_conn = connection_manager.get_default()

# Inputs
index: str = "existing_index"

# Executing the method
resultresponse: Union[boolDict, ErrorResponse] = self.engine_conn.createget_index(index, data)

# Error Handling
if isinstance(resultresponse, ErrorResponse):
    raise Exception(f"Failed to createretrieve index information: {resultresponse.error}")

delete_index

update_mapping

exist_template

Checks if a specified template exists

Deletes a specified index

.

Code Block
languagepy
themeDJango
from gaia_core.rest import= connection_manager
from gaia_core.models = ErrorResponse
from typing import= Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
indextemplate: str = "indexmy_to_deletetemplate"

# Executing the method
resultexists: Union[bool, ErrorResponse] = self.engine_conn.deleteexist_indextemplate(indextemplate_name)

# Error Handling
if isinstance(resultexists, ErrorResponse):
    raise Exception(f"Failed to delete indexcheck if template
 exists: {resultexists.error}")

get

exist_index

Retrieves information about the

Checks if a specified index

, such as settings and mappings

exists.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import= Union, Dictbool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "existingindex_to_indexcheck"

# Executing the method
responseexists: Union[Dictbool, ErrorResponse] = self.engine_conn.getexist_index(index)

# Error Handling
if isinstance(responseexists, ErrorResponse):
    raise Exception(f"Failed to retrievecheck if index informationexists: {responseexists.error}")

exist

open_index

Checks if a specified index exists

Opens a previously closed index to make it available for read and write operations.

Code Block
languagepy
themeDJango
from gaia_core.rest =import connection_manager
from gaia_core.models =import ErrorResponse
from typing =import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_checkopen"

# Executing the method
existsresult: Union[bool, ErrorResponse] = self.engine_conn.existopen_index(index)

# Error Handling
if isinstance(existsresult, ErrorResponse):
    raise Exception(f"Failed to checkopen if index exists: {existsresult.error}")

open

close_index

Opens a previously closed

Closes an index to make it

available for read and write operations

read-only, reducing resource usage.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_openclose"

# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.openclose_index(index)

# Error Handling
if isinstance(result, ErrorResponse):
    raise Exception(f"Failed to openclose index: {result.error}")

close

stats_index

Closes an index to make it read-only, reducing resource usage

Retrieves statistical information about a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, boolDict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_tofor_closestats"

# Executing the method
resultstats: Union[boolDict, ErrorResponse] = self.engine_conn.closestats_index(index)

# Error Handling
if isinstance(resultstats, ErrorResponse):
    raise Exception(f"Failed to closeget index indexstats: {resultstats.error}")

stats

get_

index

mapping

Retrieves

statistical information about a

the mapping of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_forget_statsmapping"

# Executing the method
statsmapping: Union[Dict, ErrorResponse] = self.engine_conn.statsget_indexmapping(index)

# Error Handling
if isinstance(statsmapping, ErrorResponse):
    raise Exception(f"Failed to get mapping for index stats: {statsmapping.error}")

get

update_mapping

Retrieves

Updates the mapping

of the

for a specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index: str = "index_to_get_mapping"_to_update_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}

# Executing the method
mapping: Union[Dict, ErrorResponse] = self.engine_conn.getupdate_mapping(index, data)

# Error Handling
if isinstance(mapping, ErrorResponse):
    raise Exception(f"Failed to get mapping for index: {mapping.error}")

update_mapping

Updates the mapping for a
 Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

get_settings

Retrieves the settings of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_updateget_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}
settings"

# Executing the method
settings: Union[Dict, ErrorResponse] = self.engine_conn.updateget_mappingsettings(index, data)

# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

 Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to get settings for index: {settings.error}")

update

get

_settings

Retrieves the settings of the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "index_to_get_settings"update_settings" 
data = {"settings": {"refresh_interval": "1s"}}

# Executing the method 
updated_settings: Union[Dict, ErrorResponse] = self.engine_conn.getupdate_settings(index, data)

# Error Handling
if isinstance(settings, ErrorResponse):
    raise Exception(f"Failed to getupdate settings for index: {updated_settings.error}")

index_doc

Indexes a document in the specified index, potentially under a specific document ID.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc: Dict = {"field1": "value1"}
doc_id: str = "doc123"  # Optional
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.index_doc(index, doc, doc_id=doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to index document: {response.error}")

get_doc

Retrieves a document by ID from the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = ResponseHit, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional

# Executing the method
document: Union[ResponseHit, ErrorResponse] = self.engine_conn.get_doc(index, doc_id, routing=routing)

# Error Handling
if isinstance(document, ErrorResponse):
    raise Exception(f"Failed to retrieve document: {document.error}")

update_doc

Updates a document by ID in the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "updated_value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.update_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document: {response.error}")

upsert_doc

Inserts a new document or updates an existing document by ID in the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "value1"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.upsert_doc(index, doc_id, doc, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to upsert document: {response.error}")

partial_update_doc

Partially updates a document by ID in the specified index, modifying only the specified fields.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
updates: Dict = {"field2": "new_value"}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.partial_update_doc(index, doc_id, updates, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to partially update document: {response.error}")

script_update_doc

Updates a document by applying a script. Useful for complex updates where direct field modifications are insufficient.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
script = "ctx._source.field2 += params.count"
params = {"count": 1}
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.script_update_doc(index, doc_id, script, params, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update document with script: {response.error}")

delete_doc

Deletes a document by ID from the specified index.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key"  # Optional
refresh: str = "true"  # Optional

# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.delete_doc(index, doc_id, routing=routing, refresh=refresh)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete document: {response.error}")

bulk

Executes multiple indexing, update, or delete operations in a single API call.

Code Block
languagepy
themeDJango
from gaia_core.rest = connection_manager
from gaia_core.models = BulkEntry, ErrorResponse
from typing = Union, List

self.engine_conn = connection_manager.get_default()

# Inputs
actions: List[BulkEntry] = [
    {"action": "index", "index": "my_index", "id": "1", "document": {"field1": "value1", "field2": "value2"}},
    {"action": "create", "index": "my_index", "id": "2", "document": {"field1": "new_value1"}},
    {"action": "update", "index": "my_index", "id": "1", "document": {"doc": {"field2": "updated_value2"}}},
    {"action": "delete", "index": "my_index", "id": "3"}
]

refresh: str = "true"

# Executing the method
results = self.engine_conn.bulk(*actions, refresh=refresh)
# Note: Assuming results handling based on the nature of bulk operations; each entry should be checked for errors.

delete_by_query

Deletes documents from the specified index that match the given query.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
query: Dict = {"query": {"match": {"field1": "value_to_delete"}}}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.delete_by_query(index, query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to delete by query: {response.error}")

update_by_query

Updates documents in the specified index based on a provided query and update rules.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
update_query: Dict = {
    "query": {"match_all": {}},
    "script": {"source": "ctx._source.field1 = 'updated_value'"}
}

# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.update_by_query(index, update_query)

# Error Handling
if isinstance(response, ErrorResponse):
    raise Exception(f"Failed to update by query: {response.error}")

reindex

Reindexes documents from one index to another, potentially transforming document data during the process.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from typing import Dict

self.engine_conn = connection_manager.get_default()

# Inputs
reindex_data: Dict = {
    "source": {"index": "old_index"},
    "dest": {"index": "new_index"},
    "script": {"source": "ctx._source.new_field = 'value'"}
}

# Executing the method
self.engine_conn.reindex(reindex_data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.

count

Counts the number of documents in the specified index that match a given query.

Code Block
languagepy
themeDJango
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, int

self.engine_conn = connection_manager.get_default()

# Inputs
index: str = "my_index"
count_query: Dict = {"query": {"match_all": {}}}

# Executing the method
count_result: Union[int, ErrorResponse] = self.engine_conn.count(index, count_query)

# Error Handling
if isinstance(count_result, ErrorResponse):
    raise Exception(f"Failed to count documents: {count_result.error}")