Skip to content

Commit

Permalink
Clip saver unit test for notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-nn committed Dec 10, 2024
1 parent 94b51cf commit 000617c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 53 deletions.
48 changes: 20 additions & 28 deletions degirum_tools/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ def process_file_uploads():
queue_poll_interval_s = 0.1

storage = ObjectStorage(storage_cfg)
storage.ensure_bucket_exists()

pending_jobs: List[NotificationServer.Job] = []
queue_is_active = True

Expand Down Expand Up @@ -300,10 +302,7 @@ def __init__(
clip_pre_trigger_delay: int = 0,
clip_embed_ai_annotations: bool = True,
clip_target_fps: float = 30.0,
storage_endpoint: str = "",
storage_access_key: str = "",
storage_secret_key: str = "",
storage_bucket: str = "",
storage_config: Optional[ObjectStorageConfig] = None,
):
"""
Constructor
Expand Down Expand Up @@ -339,22 +338,25 @@ def __init__(
clip_pre_trigger_delay: delay before the event to start clip saving (in frames)
clip_embed_ai_annotations: True to embed AI inference annotations into video clip, False to use original image
clip_target_fps: target frames per second for saved videos
storage_endpoint: The object storage endpoint URL to save video clips
storage_access_key: The access key for the cloud account
storage_secret_key: The secret key for the cloud account
storage_bucket: The name of the bucket to upload video clips
storage_config: The object storage configuration (to save video clips)
"""

self._frame = 0
self._prev_cond = False
self._prev_frame = -1_000_000_000 # arbitrary big negative number
self._prev_time = -1_000_000_000.0
self._last_notifications: dict = {}
self._last_display_time = -1_000_000_000.0

self._name = name
self._message = message if message else f"Notification triggered: {name}"
self._show_overlay = show_overlay
self._annotation_color = annotation_color
self._annotation_font_scale = annotation_font_scale
self._annotation_pos = annotation_pos
self._annotation_cool_down = annotation_cool_down

# compile condition to evaluate it later
self._condition = compile(condition, "<string>", "eval")
self._clip_save = clip_save
self.notification_server: Optional[NotificationServer] = None

# parse holdoff duration
self._holdoff_frames = 0
Expand All @@ -375,9 +377,11 @@ def __init__(
else:
raise TypeError(f"Invalid holdoff time type: {holdoff}")

# compile condition to evaluate it later
self._condition = compile(condition, "<string>", "eval")

# instantiate clip saver if required
self._clip_save = clip_save
if clip_save:
if clip_save and storage_config:
self._clip_path = tempfile.mkdtemp()
full_clip_prefix = self._clip_path + "/" + clip_sub_dir + "/"

Expand All @@ -389,15 +393,9 @@ def __init__(
save_ai_result_json=True,
target_fps=clip_target_fps,
)
self._storage_cfg = ObjectStorageConfig(
endpoint=storage_endpoint,
access_key=storage_access_key,
secret_key=storage_secret_key,
bucket=storage_bucket,
)
self._storage_cfg = storage_config

# setting up notification server
self.notification_server: Optional[NotificationServer] = None
if (isinstance(notification_config, str) and notification_config) or clip_save:
self.notification_server = NotificationServer(
notification_config,
Expand All @@ -406,13 +404,6 @@ def __init__(
self._storage_cfg if clip_save else None,
)

self._frame = 0
self._prev_cond = False
self._prev_frame = -1_000_000_000 # arbitrary big negative number
self._prev_time = -1_000_000_000.0
self._last_notifications: dict = {}
self._last_display_time = -1_000_000_000.0

def analyze(self, result):
"""
Generate notification by analyzing given result according to the condition expression.
Expand Down Expand Up @@ -545,7 +536,8 @@ def annotate(self, result, image: np.ndarray) -> np.ndarray:
)

def __del__(self):
self._clip_saver.join_all_saver_threads()
if self._clip_save:
self._clip_saver.join_all_saver_threads()

if self.notification_server:
self.notification_server.terminate()
Expand Down
38 changes: 36 additions & 2 deletions degirum_tools/object_storage_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,50 @@ def ensure_bucket_exists(self):
f"Error occurred when ensuring bucket '{self._config.bucket}' exists: {e}"
) from e

def delete_bucket(self):
def list_bucket_contents(self):
"""
List the contents of the bucket in cloud object storage
Returns:
List of objects in the bucket of None if the bucket does not exist
"""

try:
if self._client.bucket_exists(self._config.bucket):
return self._client.list_objects(self._config.bucket, recursive=True)
except self._minio.S3Error as e:
raise RuntimeError(
f"Error occurred when listing bucket '{self._config.bucket}': {e}"
) from e
return None

def delete_bucket_contents(self) -> bool:
"""
Delete the bucket from cloud object storage
Delete the bucket contents from cloud object storage
Returns:
True if bucket contents were deleted, False if bucket does not exist
"""

try:
if self._client.bucket_exists(self._config.bucket):
objects = self._client.list_objects(self._config.bucket, recursive=True)
for obj in objects:
self._client.remove_object(self._config.bucket, obj.object_name)
return True
except self._minio.S3Error as e:
raise RuntimeError(
f"Error occurred when deleting bucket '{self._config.bucket}': {e}"
)
return False

def delete_bucket(self):
"""
Delete the bucket with all contents from cloud object storage
"""

try:
if self.delete_bucket_contents():
self._client.remove_bucket(self._config.bucket)
except self._minio.S3Error as e:
raise RuntimeError(
Expand Down
103 changes: 80 additions & 23 deletions test/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@
import pytest


def test_notifier():
def test_notifier(s3_credentials):
"""
Test for EventNotifier analyzer
"""

import degirum_tools

# helper class to convert dictionary to object
class D2C:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
import degirum_tools, degirum as dg
import numpy as np

test_cases: List[dict] = [
# ----------------------------------------------------------------
Expand Down Expand Up @@ -187,30 +182,92 @@ def __init__(self, **kwargs):
],
"res": [{"test": "e1"}, {}, {"test": "e2"}],
},
# -------------------------------------------------------
# Clip saving tests
# -------------------------------------------------------
{
"params": {
"name": "test",
"condition": "a",
"message": "{clip_url}",
"notification_tags": "Test",
"notification_config": "json://unittest",
"clip_save": True,
"clip_sub_dir": "test",
"clip_duration": 3,
"clip_pre_trigger_delay": 1,
"clip_embed_ai_annotations": False,
"storage_config": degirum_tools.ObjectStorageConfig(**s3_credentials),
},
"inp": [
{"events_detected": {""}},
{"events_detected": {""}},
{"events_detected": {"a"}, "msg": "e1"},
{"events_detected": {""}},
{"events_detected": {""}},
],
"res": [{}, {}, {"test": ""}, {}, {}],
},
]

for ci, case in enumerate(test_cases):
params = case["params"]

if "res" not in case:
# expected to fail
with pytest.raises(Exception):
notifier = degirum_tools.EventNotifier(**case["params"])
notifier = degirum_tools.EventNotifier(**params)
continue

if (
"clip_save" in params
and params["clip_save"]
and (
params["storage_config"].access_key is None
or params["storage_config"].secret_key is None
)
):
print(
f"Case {ci} skipped: S3_ACCESS_KEY and/or S3_SECRET_KEY environment variables are not set"
)
continue

notifier = degirum_tools.EventNotifier(**case["params"])
notifier = degirum_tools.EventNotifier(**params)

for i, input in enumerate(case["inp"]):
result = D2C(**input)

if case["res"] is None:
with pytest.raises(Exception):
notifier.analyze(result)
else:
notifier.analyze(result)
assert (
result.notifications == case["res"][i] # type: ignore[attr-defined]
), (
f"Case {ci} failed at step {i}: "
+ f"notifications `{result.notifications}` " # type: ignore[attr-defined]
+ f"do not match expected `{case['res'][i]}`."
+ f"\nConfig: {case['params']}"
try:
result = dg.postprocessor.InferenceResults(
model_params=None,
input_image=(
np.zeros((100, 100, 3)) if notifier._clip_save else None
),
inference_results={},
conversion=None,
)
result.__dict__.update(input)

if case["res"] is None:
with pytest.raises(Exception):
notifier.analyze(result)
else:
notifier.analyze(result)
assert (
result.notifications == case["res"][i] # type: ignore[attr-defined]
), (
f"Case {ci} failed at step {i}: "
+ f"notifications `{result.notifications}` " # type: ignore[attr-defined]
+ f"do not match expected `{case['res'][i]}`."
+ f"\nConfig: {case['params']}"
)

finally:
if notifier._clip_save:
# cleanup bucket contents for clip saving tests
storage = degirum_tools.ObjectStorage(notifier._storage_cfg)

storage.delete_bucket_contents()
bucket_contents = storage.list_bucket_contents()
assert (
bucket_contents is not None and len(list(bucket_contents)) == 0
)

0 comments on commit 000617c

Please sign in to comment.