get_vector_query
Retrieves vector-based query results given specific parameters.
gaia_coreget_vector_query
Retrieves vector-based query results given specific parameters.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import VectorQueryType
self.engine_conn = connection_manager.get_default()
# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None
# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type) |
search
Executes a search operation on a specified index using provided search data.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})
# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.search(index, data)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Search failed: {response.error}")
|
multi_search
Performs multiple search queries across different indices.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from typing import Tuple, Dict, List
from gaia_core.models import SearchFilters, ErrorResponse
from gaia_core.models.engines.enums import SearchRequest
from typing import VectorQueryTypeList, Tuple
self.engine_conn = connection_manager.get_default()
# Inputs
query: Dict = {"query": { "query_string": { "query": "sample text" }}}
query_type: VectorQueryType = VectorQueryType.SCORE_SCRIPT
min_score: float = 0.5
source: str = "knn_score"
field: str = "vector_field"
vector: List[float] = [0.5, 0.2, 0.1]
space_type: str = "l2"
child_type: Optional[str] = None
# Executing the method
result: Tuple[Any, SearchFilters] = self.engine_conn.get_vector_query(query, query_type, min_score, source, field, vector, space_type, child_type) |
search
Executes a search operation on a specified index using provided search data.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "sample_index"
data: SearchRequest = SearchRequest(query={"match_all": {}})()
# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1", SearchRequest(query={"match": {"field": "value"}})),
("index2", SearchRequest(query={"term": {"user": "example"}}))]
# Executing the method
response: Union[SearchResponse, ErrorResponse]results = self.engine_conn.multi_search(index, dataqueries)
# Note: Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Search failed: {response.error}")
handling would typically check each individual response, this method assumes successful implementation.
|
async_search
Initiates an asynchronous search on the specified index
multi_search
Performs multiple search queries across different indices.
Code Block |
---|
|
from gaia_core.rest import= connection_manager
from gaia_core.models import= SearchRequest
from typing import List, Tuplestr
self.engine_conn = connection_manager.get_default()
# Inputs
queries: List[Tuple[str, SearchRequest]] = [("index1",
index: str = "async_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})),
# Executing the method
async_id: str = self.engine_conn.async_search(index, data)
# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.
|
async_search_fetch
Fetches the results of an ongoing asynchronous search using the search ID.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import SearchResponse, ErrorResponse
from typing import Union
self.engine_conn ("index2", SearchRequest(query={"term": {"user": "example"}}))]= connection_manager.get_default()
# Inputs
index: str = "async_index"
search_id: str = "async_search_id"
# Executing the method
resultsresponse: Union[SearchResponse, ErrorResponse] = self.engine_conn.multiasync_search(queries_fetch(index, search_id)
# Note: Error handling would typically check each individual response, this method assumes successful implementation.Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Asynchronous search fetch failed: {response.error}")
|
async_search_delete
Initiates Deletes an ongoing asynchronous search on using the specified indexsearch ID.
Code Block |
---|
|
from gaia_core.rest =import connection_manager
from gaia_core.models =import SearchRequestErrorResponse
from typing import strUnion, bool
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "async_index"
datasearch_id: SearchRequest = SearchRequest(query={"match": {"field": "async_value"}})str = "async_search_id"
# Executing the method
async_idresult: strUnion[bool, ErrorResponse] = self.engine_conn.async_search(index, data)
# Asynchronous ID retrieval assumes successful initiation; real-world scenarios should handle possible exceptions.
_delete(index, search_id)
# Error Handling
if isinstance(result, ErrorResponse):
raise Exception(f"Failed to delete asynchronous search: {result.error}")
|
knn_search
Performs a k-nearest neighbors search on the specified index
async_search_fetch
Fetches the results of an ongoing asynchronous search using the search ID.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import SearchRequest, SearchResponse, ErrorResponse
from typing import Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "async_index"
search_id: str = "async_search_id" = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})
# Executing the method
response: Union[SearchResponse, ErrorResponse] = self.engine_conn.asyncknn_search_fetch(index, search_iddata)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"AsynchronousKNN search fetch failed: {response.error}")
|
async_deleteContinues a search from a specific point using a scroll identifierDeletes an ongoing asynchronous search using the search ID.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, boolDict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "async_index"
searchscroll_id: str = "asyncexisting_searchscroll_id"
# Executing the method
resultresponse: Union[boolDict, ErrorResponse] = self.engine_conn.async_search_delete(index, searchscroll_search(scroll_id)
# Error Handling
if isinstance(resultresponse, ErrorResponse):
raise Exception(f"FailedScroll tosearch deletecontinuation asynchronous searchfailed: {resultresponse.error}")
|
knnPerforms a k-nearest neighbors search on the specified indexClears resources related to a scroll search identified by a scroll identifier.
Code Block |
---|
|
from gaia_core.rest import= connection_manager
from gaia_core.models import SearchRequest, SearchResponse, = ErrorResponse
from typing import Union, bool
self.engine_conn = connection_manager.get_default()
# Inputs
indexscroll_id: str = "knn_index"
data: SearchRequest = SearchRequest(query={"match": {"field": "value"}})scroll_id_to_clear"
# Executing the method
responseresult: Union[SearchResponsebool, ErrorResponse] = self.engine_conn.knnclear_scroll_search(index, datascroll_id)
# Error Handling
if isinstance(responseresult, ErrorResponse):
raise Exception(f"KNNFailed to clear scroll search failed: {responseresult.error}")
|
scroll create_index_
searchtemplate
Continues a search from a specific point using a scroll identifierCreates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, bool
self.engine_conn = connection_manager.get_default()
# Inputs
scroll_id: str = "existing_scroll_id" Inputs
template_name: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}
# Executing the method
responseresult: Union[Dictbool, ErrorResponse] = self.engine_conn.scrollcreate_index_searchtemplate(scroll_idtemplate_name, data)
# Error Handling
if isinstance(responseresult, ErrorResponse):
raise Exception(f"ScrollFailed to searchcreate continuationindex failedtemplate: {responseresult.error}")
|
clearcreate_
scroll_searchindex
Creates an index with specific settings and mappings based on provided dataClears resources related to a scroll search identified by a scroll identifier.
Code Block |
---|
|
from gaia_core.rest =import connection_manager
from gaia_core.models =import ErrorResponse
from typing import Union, Dict, bool
self.engine_conn = connection_manager.get_default()
# Inputs
scroll_id: str = "scroll_id_to_clear" = connection_manager.get_default()
# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}
# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.clearcreate_scroll_search(scroll_idindex(index, data)
# Error Handling
if isinstance(result, ErrorResponse):
raise Exception(f"Failed to clearcreate scroll searchindex: {result.error}")
|
delete_index
Deletes a specified index
create_index_template
Creates a template for indices specifying settings and mappings that should be applied automatically when new indices are created.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import= ErrorResponse
from typing import Union, Dict, bool
self.engine_conn = connection_manager.get_default()
# Inputs
template_nameindex: str = "my_template"
data: Dict = {"settings": {"number_of_shards": 1}, "mappings": {"properties": {"field1": {"type": "text"}}}}index_to_delete"
# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.createdelete_index_template(template_name, data(index)
# Error Handling
if isinstance(result, ErrorResponse):
raise Exception(f"Failed to createdelete index template: {result.error}")
|
createget_index
Creates an index with specific Retrieves information about the specified index, such as settings and mappings based on provided data.
Code Block |
---|
|
from gaia_core.rest import= connection_manager
from gaia_core.models import= ErrorResponse
from typing import Union, Dict, bool
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "new_index"
data: Dict = {"settings": {"number_of_shards": 2}, "mappings": {"properties": {"field1": {"type": "keyword"}}}}_conn = connection_manager.get_default()
# Inputs
index: str = "existing_index"
# Executing the method
resultresponse: Union[boolDict, ErrorResponse] = self.engine_conn.createget_index(index, data)
# Error Handling
if isinstance(resultresponse, ErrorResponse):
raise Exception(f"Failed to createretrieve index information: {resultresponse.error}")
|
update_mapping
exist_template
Checks if a specified template exists
delete_index
Deletes a specified index.
Code Block |
---|
|
from gaia_core.rest import= connection_manager
from gaia_core.models = ErrorResponse
from typing import= Union, bool
self.engine_conn = connection_manager.get_default()
# Inputs
indextemplate: str = "indexmy_to_deletetemplate"
# Executing the method
resultexists: Union[bool, ErrorResponse] = self.engine_conn.deleteexist_indextemplate(indextemplate_name)
# Error Handling
if isinstance(resultexists, ErrorResponse):
raise Exception(f"Failed to delete indexcheck if template
exists: {resultexists.error}")
|
getexist_index
Retrieves information about the Checks if a specified index , such as settings and mappingsexists.
Code Block |
---|
|
from gaia_core.rest = connection_manager
from gaia_core.models = ErrorResponse
from typing import= Union, Dictbool
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "existingindex_to_indexcheck"
# Executing the method
responseexists: Union[Dictbool, ErrorResponse] = self.engine_conn.getexist_index(index)
# Error Handling
if isinstance(responseexists, ErrorResponse):
raise Exception(f"Failed to retrievecheck if index informationexists: {responseexists.error}")
|
exist open_index
Checks if a specified index existsOpens a previously closed index to make it available for read and write operations.
Code Block |
---|
|
from gaia_core.rest =import connection_manager
from gaia_core.models =import ErrorResponse
from typing =import Union, bool
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_to_checkopen"
# Executing the method
existsresult: Union[bool, ErrorResponse] = self.engine_conn.existopen_index(index)
# Error Handling
if isinstance(existsresult, ErrorResponse):
raise Exception(f"Failed to checkopen if index exists: {existsresult.error}")
|
openclose_index
Opens a previously closed Closes an index to make it available for read and write operationsread-only, reducing resource usage.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, bool
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_to_openclose"
# Executing the method
result: Union[bool, ErrorResponse] = self.engine_conn.openclose_index(index)
# Error Handling
if isinstance(result, ErrorResponse):
raise Exception(f"Failed to openclose index: {result.error}")
|
closestats_index
Closes an index to make it read-only, reducing resource usageRetrieves statistical information about a specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, boolDict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_tofor_closestats"
# Executing the method
resultstats: Union[boolDict, ErrorResponse] = self.engine_conn.closestats_index(index)
# Error Handling
if isinstance(resultstats, ErrorResponse):
raise Exception(f"Failed to closeget index indexstats: {resultstats.error}")
|
statsget_
indexmapping
Retrieves statistical information about a the mapping of the specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_to_forget_statsmapping"
# Executing the method
statsmapping: Union[Dict, ErrorResponse] = self.engine_conn.statsget_indexmapping(index)
# Error Handling
if isinstance(statsmapping, ErrorResponse):
raise Exception(f"Failed to get mapping for index stats: {statsmapping.error}")
|
getupdate_mapping
Retrieves Updates the mapping of the for a specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index: str = "index_to_get_mapping"_to_update_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}
# Executing the method
mapping: Union[Dict, ErrorResponse] = self.engine_conn.getupdate_mapping(index, data)
# Error Handling
if isinstance(mapping, ErrorResponse):
raise Exception(f"Failed to get mapping for index: {mapping.error}")
|
update_mapping
Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.
|
get_settings
Retrieves the settings of the Updates the mapping for a specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_to_updateget_mapping"
data: Dict = {"properties": {"new_field": {"type": "text"}}}
settings"
# Executing the method
settings: Union[Dict, ErrorResponse] = self.engine_conn.updateget_mappingsettings(index, data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.
Error Handling
if isinstance(settings, ErrorResponse):
raise Exception(f"Failed to get settings for index: {settings.error}")
|
update
get_settings
Retrieves the settings of the specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "index_to_get_settings"update_settings"
data = {"settings": {"refresh_interval": "1s"}}
# Executing the method
updated_settings: Union[Dict, ErrorResponse] = self.engine_conn.getupdate_settings(index, data)
# Error Handling
if isinstance(settings, ErrorResponse):
raise Exception(f"Failed to getupdate settings for index: {updated_settings.error}")
|
index_doc
Indexes a document in the specified index, potentially under a specific document ID.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc: Dict = {"field1": "value1"}
doc_id: str = "doc123" # Optional
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.index_doc(index, doc, doc_id=doc_id, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to index document: {response.error}")
|
get_doc
Retrieves a document by ID from the specified index.
Code Block |
---|
|
from gaia_core.rest = connection_manager
from gaia_core.models = ResponseHit, ErrorResponse
from typing = Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key" # Optional
# Executing the method
document: Union[ResponseHit, ErrorResponse] = self.engine_conn.get_doc(index, doc_id, routing=routing)
# Error Handling
if isinstance(document, ErrorResponse):
raise Exception(f"Failed to retrieve document: {document.error}")
|
update_doc
Updates a document by ID in the specified index.
Code Block |
---|
|
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "updated_value1"}
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.update_doc(index, doc_id, doc, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to update document: {response.error}")
|
upsert_doc
Inserts a new document or updates an existing document by ID in the specified index.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
doc: Dict = {"field1": "value1"}
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.upsert_doc(index, doc_id, doc, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to upsert document: {response.error}")
|
partial_update_doc
Partially updates a document by ID in the specified index, modifying only the specified fields.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
updates: Dict = {"field2": "new_value"}
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.partial_update_doc(index, doc_id, updates, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to partially update document: {response.error}")
|
script_update_doc
Updates a document by applying a script. Useful for complex updates where direct field modifications are insufficient.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import CRUDResponse, ErrorResponse
from typing import Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
script = "ctx._source.field2 += params.count"
params = {"count": 1}
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.script_update_doc(index, doc_id, script, params, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to update document with script: {response.error}")
|
delete_doc
Deletes a document by ID from the specified index.
Code Block |
---|
|
from gaia_core.rest = connection_manager
from gaia_core.models = CRUDResponse, ErrorResponse
from typing = Union
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
doc_id: str = "doc123"
routing: str = "routing_key" # Optional
refresh: str = "true" # Optional
# Executing the method
response: Union[CRUDResponse, ErrorResponse] = self.engine_conn.delete_doc(index, doc_id, routing=routing, refresh=refresh)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to delete document: {response.error}")
|
bulk
Executes multiple indexing, update, or delete operations in a single API call.
Code Block |
---|
|
from gaia_core.rest = connection_manager
from gaia_core.models = BulkEntry, ErrorResponse
from typing = Union, List
self.engine_conn = connection_manager.get_default()
# Inputs
actions: List[BulkEntry] = [
{"action": "index", "index": "my_index", "id": "1", "document": {"field1": "value1", "field2": "value2"}},
{"action": "create", "index": "my_index", "id": "2", "document": {"field1": "new_value1"}},
{"action": "update", "index": "my_index", "id": "1", "document": {"doc": {"field2": "updated_value2"}}},
{"action": "delete", "index": "my_index", "id": "3"}
]
refresh: str = "true"
# Executing the method
results = self.engine_conn.bulk(*actions, refresh=refresh)
# Note: Assuming results handling based on the nature of bulk operations; each entry should be checked for errors.
|
delete_by_query
Deletes documents from the specified index that match the given query.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
query: Dict = {"query": {"match": {"field1": "value_to_delete"}}}
# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.delete_by_query(index, query)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to delete by query: {response.error}")
|
update_by_query
Updates documents in the specified index based on a provided query and update rules.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ByQueryResponse, ErrorResponse
from typing import Union, Dict
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
update_query: Dict = {
"query": {"match_all": {}},
"script": {"source": "ctx._source.field1 = 'updated_value'"}
}
# Executing the method
response: Union[ByQueryResponse, ErrorResponse] = self.engine_conn.update_by_query(index, update_query)
# Error Handling
if isinstance(response, ErrorResponse):
raise Exception(f"Failed to update by query: {response.error}")
|
reindex
Reindexes documents from one index to another, potentially transforming document data during the process.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from typing import Dict
self.engine_conn = connection_manager.get_default()
# Inputs
reindex_data: Dict = {
"source": {"index": "old_index"},
"dest": {"index": "new_index"},
"script": {"source": "ctx._source.new_field = 'value'"}
}
# Executing the method
self.engine_conn.reindex(reindex_data)
# Note: Assuming the method executes without returning an ErrorResponse directly.
# Real-world implementation should include error handling as per API specifications.
|
count
Counts the number of documents in the specified index that match a given query.
Code Block |
---|
|
from gaia_core.rest import connection_manager
from gaia_core.models import ErrorResponse
from typing import Union, Dict, int
self.engine_conn = connection_manager.get_default()
# Inputs
index: str = "my_index"
count_query: Dict = {"query": {"match_all": {}}}
# Executing the method
count_result: Union[int, ErrorResponse] = self.engine_conn.count(index, count_query)
# Error Handling
if isinstance(count_result, ErrorResponse):
raise Exception(f"Failed to count documents: {count_result.error}")
|