Skip to content

Commit

Permalink
feat: wait for meilisearch index creation to succeed
Browse files Browse the repository at this point in the history
In `search.meilisearch.create_indexes`, we were not waiting for the
index creation tasks to complete. This was causing a potential race
condition, where the `create_indexes` function would fail because it
took a few seconds for the index creation to succeed.

See the relevant conversation here:
openedx/edx-platform#35743 (comment)
  • Loading branch information
regisb committed Oct 31, 2024
1 parent 91686e9 commit dad2cf8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 28 deletions.
2 changes: 1 addition & 1 deletion edxsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
""" Container module for testing / demoing search """

__version__ = '4.1.0'
__version__ = '4.1.1'
95 changes: 68 additions & 27 deletions search/meilisearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,20 @@ class MeilisearchEngine(SearchEngine):
compliant with edx-search's ElasticSearchEngine.
"""

def __init__(self, index=None):
def __init__(self, index=None) -> None:
super().__init__(index=index)
self.meilisearch_index = get_meilisearch_index(self.index_name)
self._meilisearch_index: t.Optional[meilisearch.index.Index] = None

@property
def meilisearch_index(self) -> meilisearch.index.Index:
"""
Lazy load meilisearch index.
"""
if self._meilisearch_index is None:
meilisearch_index_name = get_meilisearch_index_name(self.index_name)
meilisearch_client = get_meilisearch_client()
self._meilisearch_index = meilisearch_client.index(meilisearch_index_name)
return self._meilisearch_index

@property
def meilisearch_index_name(self):
Expand Down Expand Up @@ -211,7 +222,7 @@ def print_failed_meilisearch_tasks(count: int = 10):
print(result)


def create_indexes(index_filterables: dict[str, list[str]] = None):
def create_indexes(index_filterables: t.Optional[dict[str, list[str]]] = None):
"""
This is an initialization function that creates indexes and makes sure that they
support the right facetting.
Expand All @@ -225,38 +236,68 @@ def create_indexes(index_filterables: dict[str, list[str]] = None):
client = get_meilisearch_client()
for index_name, filterables in index_filterables.items():
meilisearch_index_name = get_meilisearch_index_name(index_name)
try:
index = client.get_index(meilisearch_index_name)
except meilisearch.errors.MeilisearchApiError as e:
if e.code != "index_not_found":
raise
client.create_index(
meilisearch_index_name, {"primaryKey": PRIMARY_KEY_FIELD_NAME}
)
# Get the index again
index = client.get_index(meilisearch_index_name)
index = get_or_create_meilisearch_index(client, meilisearch_index_name)
update_index_filterables(client, index, filterables)

# Update filterables if there are some new elements
if filterables:
existing_filterables = set(index.get_filterable_attributes())
if not set(filterables).issubset(existing_filterables):
all_filterables = list(existing_filterables.union(filterables))
index.update_filterable_attributes(all_filterables)

def get_or_create_meilisearch_index(
client: meilisearch.Client, index_name: str
) -> meilisearch.index.Index:
"""
Get an index. If it does not exist, create it.
def get_meilisearch_index(index_name: str):
This will fail with a RuntimeError if we fail to create the index. It will fail with
a MeilisearchApiError in other failure cases.
"""
Return a meilisearch index.
try:
return client.get_index(index_name)
except meilisearch.errors.MeilisearchApiError as e:
if e.code != "index_not_found":
raise
task_info = client.create_index(
index_name, {"primaryKey": PRIMARY_KEY_FIELD_NAME}
)
wait_for_task_to_succeed(client, task_info)
# Get the index again
return client.get_index(index_name)

Note that the index may not exist, and it will be created on first insertion.
ideally, the initialisation function `create_indexes` should be run first.

def update_index_filterables(
client: meilisearch.Client, index: meilisearch.index.Index, filterables: list[str]
) -> None:
"""
meilisearch_client = get_meilisearch_client()
meilisearch_index_name = get_meilisearch_index_name(index_name)
return meilisearch_client.index(meilisearch_index_name)
Make sure that the filterable fields of an index include the given list of fields.
If existing fields are present, they are preserved.
"""
if not filterables:
return
existing_filterables = set(index.get_filterable_attributes())
if set(filterables).issubset(existing_filterables):
# all filterables fields are already present
return
all_filterables = list(existing_filterables.union(filterables))
task_info = index.update_filterable_attributes(all_filterables)
wait_for_task_to_succeed(client, task_info)


def wait_for_task_to_succeed(
client: meilisearch.Client,
task_info: meilisearch.task.TaskInfo,
timeout_in_ms: int = 5000,
) -> None:
"""
Wait for a Meilisearch task to succeed. If it does not, raise RuntimeError.
"""
task = client.wait_for_task(task_info.task_uid, timeout_in_ms=timeout_in_ms)
if task.status != "succeeded":
raise RuntimeError(f"Failed meilisearch task: {task}")


def get_meilisearch_client():
"""
Return a Meilisearch client with the right settings.
"""
return meilisearch.Client(MEILISEARCH_URL, api_key=MEILISEARCH_API_KEY)


Expand Down Expand Up @@ -332,7 +373,7 @@ def get_search_params(
Return a dictionary of parameters that should be passed to the Meilisearch client
`.search()` method.
"""
params = {"showRankingScore": True}
params: dict[str, t.Any] = {"showRankingScore": True}

# Aggregation
if aggregation_terms:
Expand Down

0 comments on commit dad2cf8

Please sign in to comment.