-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathmeilisearch.py
582 lines (488 loc) · 19.8 KB
/
meilisearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
"""
This is a search engine for Meilisearch. It implements the edx-search's SearchEngine
API, such that it can be setup as a drop-in replacement for the ElasticSearchEngine. To
switch to this engine, you should run a Meilisearch instance and define the following
setting:
SEARCH_ENGINE = "search.meilisearch.MeilisearchEngine"
You will then need to create the new indices by running:
./manage.py lms shell -c "import search.meilisearch; search.meilisearch.create_indexes()"
For more information about the Meilisearch API in Python, check
https://github.com/meilisearch/meilisearch-python
When implementing a new index, you might discover that you need to list explicit filterable
fields. Typically, you try to index new documents, and Meilisearch fails with the
following response:
meilisearch.errors.MeilisearchApiError: MeilisearchApiError. Error code: invalid_search_filter.
Error message: Attribute `field3` is not filterable. Available filterable attributes are:
`field1 field2 _pk`.
In such cases, the filterable field should be added to INDEX_FILTERABLES below. And you should
then run the `create_indexes()` function again, as indicated above.
This search engine was tested for the following indexes:
1. course_info ("course discovery"):
- Enable the course discovery feature: FEATURES["ENABLE_COURSE_DISCOVERY"] = True
- A search bar appears in the LMS landing page.
- Content is automatically indexed every time a course's "schedule & details" are
edited in the studio, course content is edited or the "reindex" button is clicked.
2. courseware_content ("courseware search"):
- Enable the courseware search waffle flag:
./manage.py lms waffle_flag --create --everyone courseware.mfe_courseware_search
- Enable the following feature flags:
FEATURES["ENABLE_COURSEWARE_INDEX"] = True
FEATURES["ENABLE_COURSEWARE_SEARCH"] = True
- Courseware content will be indexed by editing course sections and units.
- Alternatively, click the "Reindex" button in the Studio.
- Alternatively, index all courses by running: ./manage.py cms reindex_course --active
- In the learning MFE, a course search bar appears when opening a course.
Note that the index names could be tuned with the COURSEWARE_INFO_INDEX_NAME and
COURSEWARE_CONTENT_INDEX_NAME settings. However, if you decide to change these settings,
beware that many other applications do not respect them...
When facing issues with Meilisearch during indexing, you may want to look at the
Meilisearch logs. You might notice that some indexing tasks failed. In such cases, you
can troubleshoot these tasks by printing them with:
./manage.py lms shell -c "import search.meilisearch; search.meilisearch.print_failed_meilisearch_tasks()"
"""
from copy import deepcopy
from datetime import datetime
import hashlib
import json
import logging
import typing as t
import meilisearch
from django.conf import settings
from django.utils import timezone
from search.search_engine_base import SearchEngine
from search.utils import ValueRange
MEILISEARCH_API_KEY = getattr(settings, "MEILISEARCH_API_KEY", "")
MEILISEARCH_URL = getattr(settings, "MEILISEARCH_URL", "http://meilisearch")
MEILISEARCH_INDEX_PREFIX = getattr(settings, "MEILISEARCH_INDEX_PREFIX", "")
logger = logging.getLogger(__name__)
PRIMARY_KEY_FIELD_NAME = "_pk"
UTC_OFFSET_SUFFIX = "__utcoffset"
# In Meilisearch, we need to explicitly list fields for which we expect to define
# filters and aggregation functions.
# This is different than Elasticsearch where we can aggregate results over any field.
# Here, we list facet fields per index.
# Reference: https://www.meilisearch.com/docs/learn/filtering_and_sorting/search_with_facet_filters
# Note that index names are hard-coded here, because they are hardcoded anyway across all of edx-search.
INDEX_FILTERABLES: dict[str, list[str]] = {
getattr(settings, "COURSEWARE_INFO_INDEX_NAME", "course_info"): [
"language", # aggregate by language, mode, org
"modes",
"org",
"catalog_visibility", # exclude visibility="none"
"enrollment_end", # include only enrollable courses
],
getattr(settings, "COURSEWARE_CONTENT_INDEX_NAME", "courseware_content"): [
PRIMARY_KEY_FIELD_NAME, # exclude some specific documents based on ID
"course", # search courseware content by course
"org", # used during indexing
"catalog_visibility", # exclude visibility="none"
"start_date", # limit search to started courses
],
}
class MeilisearchEngine(SearchEngine):
"""
Meilisearch-compatible search engine. We work very hard to produce an output that is
compliant with edx-search's ElasticSearchEngine.
"""
def __init__(self, index=None) -> None:
super().__init__(index=index)
self._meilisearch_index: t.Optional[meilisearch.index.Index] = None
@property
def meilisearch_index(self) -> meilisearch.index.Index:
"""
Lazy load meilisearch index.
"""
if self._meilisearch_index is None:
meilisearch_index_name = get_meilisearch_index_name(self.index_name)
meilisearch_client = get_meilisearch_client()
self._meilisearch_index = meilisearch_client.index(meilisearch_index_name)
return self._meilisearch_index
@property
def meilisearch_index_name(self):
"""
The index UID is its name.
"""
return self.meilisearch_index.uid
def index(self, sources: list[dict[str, t.Any]], **kwargs):
"""
Index a number of documents, which can have just any type.
"""
logger.info(
"Index request: index=%s sources=%s kwargs=%s",
self.meilisearch_index_name,
sources,
kwargs,
)
processed_documents = [process_document(source) for source in sources]
self.meilisearch_index.add_documents(
processed_documents, serializer=DocumentEncoder
)
def search(
self,
query_string=None,
field_dictionary=None,
filter_dictionary=None,
exclude_dictionary=None,
aggregation_terms=None,
# exclude_ids=None, # deprecated
# use_field_match=False, # deprecated
log_search_params=False,
**kwargs,
): # pylint: disable=too-many-arguments
"""
See meilisearch docs: https://www.meilisearch.com/docs/reference/api/search
"""
opt_params = get_search_params(
field_dictionary=field_dictionary,
filter_dictionary=filter_dictionary,
exclude_dictionary=exclude_dictionary,
aggregation_terms=aggregation_terms,
**kwargs,
)
if log_search_params:
logger.info("Search query: opt_params=%s", opt_params)
meilisearch_results = self.meilisearch_index.search(query_string, opt_params)
processed_results = process_results(meilisearch_results, self.index_name)
return processed_results
def remove(self, doc_ids, **kwargs):
"""
Removing documents from the index is as simple as deleting the the documents
with the corresponding primary key.
"""
logger.info(
"Remove request: index=%s, doc_ids=%s kwargs=%s",
self.meilisearch_index_name,
doc_ids,
kwargs,
)
doc_pks = [id2pk(doc_id) for doc_id in doc_ids]
if doc_pks:
self.meilisearch_index.delete_documents(doc_pks)
class DocumentEncoder(json.JSONEncoder):
"""
Custom encoder, useful in particular to encode datetime fields.
Ref: https://github.com/meilisearch/meilisearch-python?tab=readme-ov-file#custom-serializer-for-documents-
"""
def default(self, o):
if isinstance(o, datetime):
return str(o)
return super().default(o)
def print_failed_meilisearch_tasks(count: int = 10):
"""
Useful function for troubleshooting.
Since indexing tasks are asynchronous, sometimes they fail and it's tricky to figure
out why. This will print failed tasks to stdout.
"""
client = get_meilisearch_client()
for result in client.task_handler.get_tasks(
{"statuses": "failed", "limit": count}
).results:
print(result)
def create_indexes(index_filterables: t.Optional[dict[str, list[str]]] = None):
"""
This is an initialization function that creates indexes and makes sure that they
support the right facetting.
The `index_filterables` will default to `INDEX_FILTERABLES` if undefined. Developers
can use this function to configure their own indices.
"""
if index_filterables is None:
index_filterables = INDEX_FILTERABLES
client = get_meilisearch_client()
for index_name, filterables in index_filterables.items():
meilisearch_index_name = get_meilisearch_index_name(index_name)
index = get_or_create_meilisearch_index(client, meilisearch_index_name)
update_index_filterables(client, index, filterables)
def get_or_create_meilisearch_index(
client: meilisearch.Client, index_name: str
) -> meilisearch.index.Index:
"""
Get an index. If it does not exist, create it.
This will fail with a RuntimeError if we fail to create the index. It will fail with
a MeilisearchApiError in other failure cases.
"""
try:
return client.get_index(index_name)
except meilisearch.errors.MeilisearchApiError as e:
if e.code != "index_not_found":
raise
task_info = client.create_index(
index_name, {"primaryKey": PRIMARY_KEY_FIELD_NAME}
)
wait_for_task_to_succeed(client, task_info)
# Get the index again
return client.get_index(index_name)
def update_index_filterables(
client: meilisearch.Client, index: meilisearch.index.Index, filterables: list[str]
) -> None:
"""
Make sure that the filterable fields of an index include the given list of fields.
If existing fields are present, they are preserved.
"""
if not filterables:
return
existing_filterables = set(index.get_filterable_attributes())
if set(filterables).issubset(existing_filterables):
# all filterables fields are already present
return
all_filterables = list(existing_filterables.union(filterables))
task_info = index.update_filterable_attributes(all_filterables)
wait_for_task_to_succeed(client, task_info)
def wait_for_task_to_succeed(
client: meilisearch.Client,
task_info: meilisearch.task.TaskInfo,
timeout_in_ms: int = 5000,
) -> None:
"""
Wait for a Meilisearch task to succeed. If it does not, raise RuntimeError.
"""
task = client.wait_for_task(task_info.task_uid, timeout_in_ms=timeout_in_ms)
if task.status != "succeeded":
raise RuntimeError(f"Failed meilisearch task: {task}")
def get_meilisearch_client():
"""
Return a Meilisearch client with the right settings.
"""
return meilisearch.Client(MEILISEARCH_URL, api_key=MEILISEARCH_API_KEY)
def get_meilisearch_index_name(index_name: str) -> str:
"""
Return the index name in Meilisearch associated to a hard-coded index name.
This is useful for multi-tenant Meilisearch: just define a different prefix for
every tenant.
Usually, meilisearch API keys are allowed to access only certain index prefixes.
Make sure that your API key matches the prefix.
"""
return MEILISEARCH_INDEX_PREFIX + index_name
def process_document(doc: dict[str, t.Any]) -> dict[str, t.Any]:
"""
Process document before indexing.
We make a copy to avoid modifying the source document.
"""
processed = process_nested_document(doc)
# Add primary key field
processed[PRIMARY_KEY_FIELD_NAME] = id2pk(doc["id"])
return processed
def process_nested_document(doc: dict[str, t.Any]) -> dict[str, t.Any]:
"""
Process nested dict inside top-level Meilisearch document.
"""
processed = {}
for key, value in doc.items():
if isinstance(value, timezone.datetime):
# Convert datetime objects to timestamp, and store the timezone in a
# separate field with a suffix given by UTC_OFFSET_SUFFIX.
utcoffset = None
if value.tzinfo:
utcoffset = value.utcoffset().seconds
processed[key] = value.timestamp()
processed[f"{key}{UTC_OFFSET_SUFFIX}"] = utcoffset
elif isinstance(value, dict):
processed[key] = process_nested_document(value)
else:
# Pray that there are not datetime objects inside lists.
# If there are, they will be converted to str by the DocumentEncoder.
processed[key] = value
return processed
def id2pk(value: str) -> str:
"""
Convert a document "id" field into a primary key that is compatible with Meilisearch.
This step is necessary because the "id" is typically a course id, which includes
colon ":" characters, which are not supported by Meilisearch. Source:
https://www.meilisearch.com/docs/learn/getting_started/primary_key#formatting-the-document-id
"""
return hashlib.sha1(value.encode()).hexdigest()
def get_search_params(
field_dictionary=None,
filter_dictionary=None,
exclude_dictionary=None,
aggregation_terms=None,
**kwargs,
) -> dict[str, t.Any]:
"""
Return a dictionary of parameters that should be passed to the Meilisearch client
`.search()` method.
"""
params: dict[str, t.Any] = {"showRankingScore": True}
# Aggregation
if aggregation_terms:
params["facets"] = list(aggregation_terms.keys())
# Exclusion and inclusion filters
filters = []
if field_dictionary:
filters += get_filter_rules(field_dictionary)
if filter_dictionary:
filters += get_filter_rules(filter_dictionary, optional=True)
if exclude_dictionary:
filters += get_filter_rules(exclude_dictionary, exclude=True)
if filters:
params["filter"] = filters
# Offset/Size
if "from_" in kwargs:
params["offset"] = kwargs["from_"]
if "size" in kwargs:
params["limit"] = kwargs["size"]
return params
def get_filter_rules(
rule_dict: dict[str, t.Any], exclude: bool = False, optional: bool = False
) -> list[str]:
"""
Convert inclusion/exclusion rules.
"""
rules = []
for key, value in rule_dict.items():
if isinstance(value, list):
for v in value:
rules.append(
get_filter_rule(key, v, exclude=exclude, optional=optional)
)
else:
rules.append(
get_filter_rule(key, value, exclude=exclude, optional=optional)
)
return rules
def get_filter_rule(
key: str, value: str, exclude: bool = False, optional: bool = False
) -> str:
"""
Meilisearch filter rule.
See: https://www.meilisearch.com/docs/learn/filtering_and_sorting/filter_expression_reference
"""
prefix = "NOT " if exclude else ""
if key == "id":
key = PRIMARY_KEY_FIELD_NAME
value = id2pk(value)
if isinstance(value, str):
rule = f'{prefix}{key} = "{value}"'
elif isinstance(value, ValueRange):
constraints = []
lower = value.lower
if isinstance(lower, timezone.datetime):
lower = lower.timestamp()
upper = value.upper
if isinstance(upper, timezone.datetime):
upper = upper.timestamp()
# I know that the following fails if value == 0, but we are being
# consistent with the behaviour in the elasticsearch engine.
if lower:
constraints.append(f"{key} >= {lower}")
if upper:
constraints.append(f"{key} <= {upper}")
rule = " AND ".join(constraints)
if len(constraints) > 1:
rule = f"({rule})"
else:
raise ValueError(f"Unknown value type: {value.__class__}")
if optional:
rule += f" OR {key} NOT EXISTS"
return rule
def process_results(results: dict[str, t.Any], index_name: str) -> dict[str, t.Any]:
"""
Convert results produced by Meilisearch into results that are compatible with the
edx-search engine API.
Example input:
{
'hits': [
{
'pk': 'f381d4f1914235c9532576c0861d09b484ade634',
'id': 'course-v1:OpenedX+DemoX+DemoCourse',
...
"_rankingScore": 0.865,
},
...
],
'query': 'demo',
'processingTimeMs': 0,
'limit': 20,
'offset': 0,
'estimatedTotalHits': 1
}
Example output:
{
'took': 13,
'total': 1,
'max_score': 0.4001565,
'results': [
{
'_index': 'course_info',
'_type': '_doc',
'_id': 'course-v1:OpenedX+DemoX+DemoCourse',
'_ignored': ['content.overview.keyword'], # removed
'data': {
'id': 'course-v1:OpenedX+DemoX+DemoCourse',
'course': 'course-v1:OpenedX+DemoX+DemoCourse',
'content': {
'display_name': 'Open edX Demo Course',
...
},
'image_url': '/asset-v1:OpenedX+DemoX+DemoCourse+type@asset+block@thumbnail_demox.jpeg',
'start': '2020-01-01T00:00:00+00:00',
...
},
'score': 0.4001565
}
],
'aggs': {
'modes': {
'terms': {'audit': 1},
'total': 1.0,
'other': 0
},
'org': {
'terms': {'OpenedX': 1}, 'total': 1.0, 'other': 0
},
'language': {'terms': {'en': 1}, 'total': 1.0, 'other': 0}
}
}
"""
# Base
processed = {
"took": results["processingTimeMs"],
"total": results["estimatedTotalHits"],
"results": [],
"aggs": {},
}
# Hits
max_score = 0
for result in results["hits"]:
result = process_hit(result)
score = result.pop("_rankingScore")
max_score = max(max_score, score)
processed_result = {
"_id": result["id"],
"_index": index_name,
"_type": "_doc",
"data": result,
}
processed["results"].append(processed_result)
processed["max_score"] = max_score
# Aggregates/Facets
for facet_name, facet_distribution in results.get("facetDistribution", {}).items():
total = sum(facet_distribution.values())
processed["aggs"][facet_name] = {
"terms": facet_distribution,
"total": total,
"other": 0,
}
return processed
def process_hit(hit: dict[str, t.Any]) -> dict[str, t.Any]:
"""
Convert a search result back to the ES format.
"""
processed = deepcopy(hit)
# Remove primary key field
try:
processed.pop(PRIMARY_KEY_FIELD_NAME)
except KeyError:
pass
# Convert datetime fields back to datetime
for key in list(processed.keys()):
if key.endswith(UTC_OFFSET_SUFFIX):
utcoffset = processed.pop(key)
key = key[: -len(UTC_OFFSET_SUFFIX)]
timestamp = hit[key]
tz = (
timezone.get_fixed_timezone(timezone.timedelta(seconds=utcoffset))
if utcoffset
else None
)
processed[key] = timezone.datetime.fromtimestamp(timestamp, tz=tz)
return processed