Skip to content

Commit

Permalink
Vec 416: Index update feature (#64)
Browse files Browse the repository at this point in the history
Index update functionality.
  • Loading branch information
rahul-aerospike authored Nov 13, 2024
1 parent 11fe217 commit 0f7cfc5
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 9 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/extensive_vector_search_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Run long running vector search tests
on:
push:
branches:
- dev
- main

jobs:
test-exhaustive-vector-search:
Expand All @@ -18,10 +18,10 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand Down
52 changes: 51 additions & 1 deletion src/aerospike_vector_search/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def index_create(
index_params: Optional[types.HnswParams] = None,
index_labels: Optional[dict[str, str]] = None,
index_storage: Optional[types.IndexStorage] = None,
timeout: Optional[int] = None,
timeout: Optional[int] = 100_000,
) -> None:
"""
Create an index.
Expand Down Expand Up @@ -172,6 +172,56 @@ def index_create(
logger.error("Failed waiting for creation with error: %s", e)
raise types.AVSServerError(rpc_error=e)

def index_update(
self,
*,
namespace: str,
name: str,
index_labels: Optional[dict[str, str]] = None,
hnsw_update_params: Optional[types.HnswIndexUpdate] = None,
timeout: Optional[int] = 100_000,
) -> None:
"""
Update an existing index.
:param namespace: The namespace for the index.
:type namespace: str
:param name: The name of the index.
:type name: str
:param index_labels: Optional labels associated with the index. Defaults to None.
:type index_labels: Optional[dict[str, str]]
:param hnsw_update_params: Parameters for updating HNSW index settings.
:type hnsw_update_params: Optional[types.HnswIndexUpdate]
:param timeout: Time in seconds (default 100_000) this operation will wait before raising an error.
:type timeout: int
Raises:
AVSServerError: Raised if an error occurs during the RPC communication with the server while attempting to update the index.
"""
(index_stub, index_update_request, kwargs) = self._prepare_index_update(
namespace = namespace,
name = name,
index_labels = index_labels,
hnsw_update_params = hnsw_update_params,
timeout = timeout,
logger = logger,
)

try:
index_stub.Update(
index_update_request,
credentials=self._channel_provider.get_token(),
**kwargs,
)
except grpc.RpcError as e:
logger.error("Failed to update index with error: %s", e)
raise types.AVSServerError(rpc_error=e)


def index_drop(
self, *, namespace: str, name: str, timeout: Optional[int] = None
) -> None:
Expand Down
58 changes: 56 additions & 2 deletions src/aerospike_vector_search/aio/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def index_create(
index_params: Optional[types.HnswParams] = None,
index_labels: Optional[dict[str, str]] = None,
index_storage: Optional[types.IndexStorage] = None,
timeout: Optional[int] = None,
timeout: Optional[int] = 100_000,
) -> None:
"""
Create an index.
Expand Down Expand Up @@ -140,7 +140,7 @@ async def index_create(
Note:
This method creates an index with the specified parameters and waits for the index creation to complete.
It waits for up to 100,000 seconds for the index creation to complete.
It waits for up to 100,000 seconds or the specified timeout for the index creation to complete.
"""

await self._channel_provider._is_ready()
Expand Down Expand Up @@ -176,6 +176,60 @@ async def index_create(
logger.error("Failed waiting for creation with error: %s", e)
raise types.AVSServerError(rpc_error=e)


async def index_update(
self,
*,
namespace: str,
name: str,
index_labels: Optional[dict[str, str]] = None,
hnsw_update_params: Optional[types.HnswIndexUpdate] = None,
timeout: Optional[int] = 100_000,
) -> None:
"""
Update an existing index.
:param namespace: The namespace for the index.
:type namespace: str
:param name: The name of the index.
:type name: str
:param index_labels: Optional labels associated with the index. Defaults to None.
:type index_labels: Optional[dict[str, str]]
:param hnsw_update_params: Parameters for updating HNSW index settings.
:type hnsw_update_params: Optional[types.HnswIndexUpdate]
:param timeout: Timeout in seconds for internal index update tasks. Defaults to 100_000.
:type timeout: int
Raises:
AVSServerError: Raised if an error occurs during the RPC communication with the server while attempting to update the index.
"""

await self._channel_provider._is_ready()

(index_stub, index_update_request, kwargs) = self._prepare_index_update(
namespace = namespace,
name = name,
index_labels = index_labels,
hnsw_update_params = hnsw_update_params,
logger = logger,
timeout = timeout
)

try:
await index_stub.Update(
index_update_request,
credentials=self._channel_provider.get_token(),
**kwargs,
)
except grpc.RpcError as e:
logger.error("Failed to update index with error: %s", e)
raise types.AVSServerError(rpc_error=e)


async def index_drop(
self, *, namespace: str, name: str, timeout: Optional[int] = None
) -> None:
Expand Down
46 changes: 45 additions & 1 deletion src/aerospike_vector_search/shared/admin_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Any, Optional, Union
from typing import Any, Optional, Tuple, Dict
import time

import google.protobuf.empty_pb2
Expand Down Expand Up @@ -83,6 +83,50 @@ def _prepare_index_create(
index_create_request = index_pb2.IndexCreateRequest(definition=index_definition)
return (index_stub, index_create_request, kwargs)

def _prepare_index_update(
self,
namespace: str,
name: str,
index_labels: Optional[Dict[str, str]],
hnsw_update_params: Optional[types.HnswIndexUpdate],
timeout: Optional[int],
logger: logging.Logger
) -> tuple[index_pb2_grpc.IndexServiceStub, index_pb2.IndexUpdateRequest, dict[str, Any]]:
"""
Prepares the index update request for updating an existing index.
"""

logger.debug(
"Updating index: namespace=%s, name=%s, labels=%s, hnsw_update_params=%s, timeout=%s",
namespace,
name,
index_labels,
hnsw_update_params,
timeout,
)

kwargs = {}
if timeout is not None:
kwargs["timeout"] = timeout

index_stub = self._get_index_stub()
index_id = self._get_index_id(namespace, name)

# Prepare HNSW update parameters if provided
hnsw_update = None
if hnsw_update_params is not None:
hnsw_update = hnsw_update_params._to_pb2()

# Create the IndexUpdateRequest with optional fields
index_update_request = index_pb2.IndexUpdateRequest(
indexId=index_id,
labels=index_labels,
hnswIndexUpdate=hnsw_update,
)

return (index_stub, index_update_request, kwargs)


def _prepare_index_drop(self, namespace, name, timeout, logger) -> None:

logger.debug(
Expand Down
120 changes: 120 additions & 0 deletions src/aerospike_vector_search/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,126 @@ def _to_pb2(self):
return params


class HnswIndexUpdate:
"""
Represents parameters for updating HNSW index settings.
:param batching_params: Configures batching behavior for batch-based index update.
:type batching_params: Optional[HnswBatchingParams]
:param max_mem_queue_size: Maximum size of in-memory queue for inserted/updated vector records.
:type max_mem_queue_size: Optional[int]
:param index_caching_params: Configures caching for HNSW index.
:type index_caching_params: Optional[HnswCachingParams]
:param healer_params: Configures index healer parameters.
:type healer_params: Optional[HnswHealerParams]
:param merge_params: Configures merging of batch indices to the main index.
:type merge_params: Optional[HnswIndexMergeParams]
:param enable_vector_integrity_check: Verifies if the underlying vector has changed before returning the kANN result.
:type enable_vector_integrity_check: Optional[bool]
:param record_caching_params: Configures caching for vector records.
:type record_caching_params: Optional[HnswCachingParams]
"""

def __init__(
self,
*,
batching_params: Optional[HnswBatchingParams] = None,
max_mem_queue_size: Optional[int] = None,
index_caching_params: Optional[HnswCachingParams] = None,
healer_params: Optional[HnswHealerParams] = None,
merge_params: Optional[HnswIndexMergeParams] = None,
enable_vector_integrity_check: Optional[bool] = True,
record_caching_params: Optional[HnswCachingParams] = None,
) -> None:
self.batching_params = batching_params
self.max_mem_queue_size = max_mem_queue_size
self.index_caching_params = index_caching_params
self.healer_params = healer_params
self.merge_params = merge_params
self.enable_vector_integrity_check = enable_vector_integrity_check
self.record_caching_params = record_caching_params

def _to_pb2(self) -> types_pb2.HnswIndexUpdate:
"""
Converts the HnswIndexUpdate instance to its protobuf representation.
"""
params: types_pb2.HnswIndexUpdate = types_pb2.HnswIndexUpdate()

if self.batching_params:
params.batchingParams.CopyFrom(self.batching_params._to_pb2())

if self.max_mem_queue_size is not None:
params.maxMemQueueSize = self.max_mem_queue_size

if self.index_caching_params:
params.indexCachingParams.CopyFrom(self.index_caching_params._to_pb2())

if self.healer_params:
params.healerParams.CopyFrom(self.healer_params._to_pb2())

if self.merge_params:
params.mergeParams.CopyFrom(self.merge_params._to_pb2())

if self.enable_vector_integrity_check is not None:
params.enableVectorIntegrityCheck = self.enable_vector_integrity_check

if self.record_caching_params:
params.recordCachingParams.CopyFrom(self.record_caching_params._to_pb2())

return params

def __repr__(self) -> str:
return (
f"HnswIndexUpdate(batching_params={self.batching_params}, "
f"max_mem_queue_size={self.max_mem_queue_size}, "
f"index_caching_params={self.index_caching_params}, "
f"healer_params={self.healer_params}, "
f"merge_params={self.merge_params}, "
f"enable_vector_integrity_check={self.enable_vector_integrity_check}, "
f"record_caching_params={self.record_caching_params})"
)

def __str__(self) -> str:
return (
f"HnswIndexUpdate {{\n"
f" batching_params: {self.batching_params},\n"
f" max_mem_queue_size: {self.max_mem_queue_size},\n"
f" index_caching_params: {self.index_caching_params},\n"
f" healer_params: {self.healer_params},\n"
f" merge_params: {self.merge_params},\n"
f" enable_vector_integrity_check: {self.enable_vector_integrity_check},\n"
f" record_caching_params: {self.record_caching_params}\n"
f"}}"
)

def __eq__(self, other) -> bool:
if not isinstance(other, HnswIndexUpdate):
return NotImplemented
return (
self.batching_params == other.batching_params
and self.max_mem_queue_size == other.max_mem_queue_size
and self.index_caching_params == other.index_caching_params
and self.healer_params == other.healer_params
and self.merge_params == other.merge_params
and self.enable_vector_integrity_check == other.enable_vector_integrity_check
and self.record_caching_params == other.record_caching_params
)

def __getitem__(self, key):
if not hasattr(self, key):
raise AttributeError(f"'HnswIndexUpdate' object has no attribute '{key}'")
return getattr(self, key)

def __setitem__(self, key, value):
return setattr(self, key, value)


class IndexStorage(object):
"""
Helper class primarily used to specify which namespace and set to build the index on.
Expand Down
Loading

0 comments on commit 0f7cfc5

Please sign in to comment.