Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index metadata fields #3656

Merged
merged 13 commits into from
Oct 13, 2023
98 changes: 96 additions & 2 deletions lambdas/es/indexer/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import os
import pathlib
import re
import urllib.parse
from os.path import split
from typing import Optional, Tuple
from urllib.parse import unquote_plus

import boto3
import botocore
import jsonpointer
import nbformat
from dateutil.tz import tzutc
from document_queue import (
Expand Down Expand Up @@ -179,6 +181,9 @@
USER_AGENT_EXTRA = " quilt3-lambdas-es-indexer"


logger = get_quilt_logger()


def now_like_boto3():
"""ensure timezone UTC for consistency with boto3:
Example of what boto3 returns on head_object:
Expand Down Expand Up @@ -299,6 +304,94 @@ def do_index(
logger_.debug("%s indexed as package (%s)", key, event_type)


def _try_parse_date(s: str) -> Optional[datetime.datetime]:
# XXX: do we need to support more formats?
if s[-1:] == "Z":
s = s[:-1]
try:
return datetime.datetime.fromisoformat(s)
except ValueError:
return None


MAX_KEYWORD_LEN = 256


def _get_metadata_fields(path: tuple, d: dict):
for k, raw_value in d.items():
if isinstance(raw_value, dict):
yield from _get_metadata_fields(path + (k,), raw_value)
else:
v = raw_value
if isinstance(v, str):
date = _try_parse_date(v)
if date is not None:
type_ = "date"
v = date
else:
type_ = "keyword" if len(v) <= MAX_KEYWORD_LEN else "text"
elif isinstance(v, bool):
type_ = "boolean"
elif isinstance(v, (int, float)):
# XXX: do something on ints that can't be converted to float without loss?
type_ = "double"
elif isinstance(v, list):
if not (v and all(isinstance(x, str) for x in v)):
continue
type_ = "keyword" if all(len(x) <= MAX_KEYWORD_LEN for x in v) else "text"
else:
logger.warning("ignoring value of type %s", type(v))
continue

yield path + (k,), type_, raw_value, v


def get_metadata_fields(meta):
if not isinstance(meta, dict):
# XXX: can we do something better?
return None
return [
{
"json_pointer": jsonpointer.JsonPointer.from_parts(path).path,
"type": type_,
"text": json.dumps(raw_value, ensure_ascii=False),
type_: value,
}
for path, type_, raw_value, value in _get_metadata_fields((), meta)
]


def _prepare_workflow_for_es(workflow, bucket):
if workflow is None:
return None

try:
config_url = workflow["config"]
if not config_url.startswith(f"s3://{bucket}/.quilt/workflows/config.yml"):
raise Exception(f"Bad workflow config URL {config_url}")

config_url_parsed = urllib.parse.urlparse(config_url)
query = urllib.parse.parse_qs(config_url_parsed.query)
version_id = query.pop('versionId', [None])[0]
if query:
raise Exception(f"Unexpected S3 query string: {config_url_parsed.query!r}")

return {
"config_version_id": version_id, # XXX: how to handle None?
"id": workflow["id"],
"schemas": [
{
"id": k,
"url": v,
}
for k, v in workflow.get("schemas", {}).items()
],
}
except Exception:
logger.exception("Bad workflow object: %s", json.dumps(workflow, indent=2))
return None


def index_if_package(
s3_client,
doc_queue: DocumentQueue,
Expand Down Expand Up @@ -351,7 +444,6 @@ def get_pkg_data():
return

user_meta = first.get("user_meta")
user_meta = json.dumps(user_meta) if user_meta else None

return {
"key": key,
Expand All @@ -363,8 +455,10 @@ def get_pkg_data():
"pointer_file": pointer_file,
"hash": package_hash,
"package_stats": stats,
"metadata": user_meta,
"metadata": json.dumps(user_meta) if user_meta else None,
"metadata_fields": get_metadata_fields(user_meta),
"comment": str(first.get("message", "")),
"workflow": _prepare_workflow_for_es(first.get("workflow"), bucket),
}

data = get_pkg_data() or {}
Expand Down
1 change: 1 addition & 0 deletions lambdas/es/indexer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ importlib-metadata==6.6.0
ipython-genutils==0.2.0
jmespath==0.9.4
jsonschema==3.2.0
jsonpointer==2.4
jupyter-core==4.11.2
lxml==4.9.2
nbformat==5.1.3
Expand Down
180 changes: 180 additions & 0 deletions lambdas/es/indexer/test/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,16 @@ def test_index_if_package(self, append_mock, select_meta_mock, select_stats_mock
"hash": pkg_hash,
"package_stats": select_stats_mock.return_value,
"metadata": json.dumps(meta),
"metadata_fields": [
{
"json_pointer": "/foo",
"type": "keyword",
"keyword": "bar",
"text": '"bar"',
},
],
"comment": message,
"workflow": None,
})

def test_index_if_package_skip(self):
Expand Down Expand Up @@ -1829,3 +1838,174 @@ def test_extract_pptx():
result = index.extract_pptx(buf, len(lorem) * 4 - 1)

assert result == "\n".join([lorem] * 3)


TEXT_VALUE = (index.MAX_KEYWORD_LEN + 1) * "a"
KEYWORD_VALUE = "a"


@pytest.mark.parametrize(
"src_value, expected_field",
[
(
TEXT_VALUE,
{
"type": "text",
"text": TEXT_VALUE,
},
),
(
[TEXT_VALUE, TEXT_VALUE],
{
"type": "text",
"text": [TEXT_VALUE, TEXT_VALUE],
},
),
(
[KEYWORD_VALUE, KEYWORD_VALUE],
{
"type": "keyword",
"keyword": [KEYWORD_VALUE, KEYWORD_VALUE],
"text": json.dumps([KEYWORD_VALUE, KEYWORD_VALUE]),
},
),
(
[KEYWORD_VALUE, TEXT_VALUE],
{
"type": "text",
"text": [KEYWORD_VALUE, TEXT_VALUE],
},
),
(
1,
{
"type": "double",
"text": json.dumps(1),
"double": 1,
},
),
(
1.2,
{
"type": "double",
"text": json.dumps(1.2),
"double": 1.2,
},
),
(
"2023-10-13T09:10:23.873434",
{
"type": "date",
"text": json.dumps("2023-10-13T09:10:23.873434"),
"date": datetime.datetime(2023, 10, 13, 9, 10, 23, 873434),
},
),
(
"2023-10-13T09:10:23.873434Z",
{
"type": "date",
"text": json.dumps("2023-10-13T09:10:23.873434Z"),
"date": datetime.datetime(2023, 10, 13, 9, 10, 23, 873434),
},
),
(
True,
{
"type": "boolean",
"text": json.dumps(True),
"boolean": True,
},
),
],
)
def test_get_metadata_fields_values(src_value, expected_field):
field_name = "a"

assert index.get_metadata_fields(
{
field_name: src_value,
}
) == [
{
"json_pointer": f"/{field_name}",
**expected_field,
}
]


@pytest.mark.parametrize(
"src_value",
[
None,
[1, TEXT_VALUE],
],
)
def test_get_metadata_fields_values_ignored(src_value):
field_name = "a"

assert index.get_metadata_fields(
{
field_name: src_value,
}
) == []


@pytest.mark.parametrize(
"metadata, expected_json_pointer",
[
(
{
"a": TEXT_VALUE,
},
"/a",
),
(

{
"a": {
"a": TEXT_VALUE,
},
},
"/a/a",
),
(

{
"a.a": TEXT_VALUE,
},
"/a.a",
),
(

{
"a/a": TEXT_VALUE,
},
"/a~1a",
),
],
)
def test_get_metadata_fields_json_pointer(metadata, expected_json_pointer):
field, = index.get_metadata_fields(metadata)
assert field["json_pointer"] == expected_json_pointer


def test_prepare_workflow_for_es():
assert index._prepare_workflow_for_es(
{
"config": "s3://BUCKET/.quilt/workflows/config.yml?versionId=asdf",
"id": "workflow-id",
"schemas": {
"schema-id": "schema-url",
},
},
"BUCKET",
) == {
"config_version_id": "asdf",
"id": "workflow-id",
"schemas": [
{
"id": "schema-id",
"url": "schema-url",
}
],
}