Skip to content

Commit

Permalink
Add multi-threading option to ModelTransform and SAMBboxToInstanceMask (
Browse files Browse the repository at this point in the history
#1145)

- Add multi-threading option (`num_workers > 0`) to `ModelTransform` and
`SAMBboxToInstanceMask`.
- It is required if the model launcher can take multiple requests at the
same time and have high throughput.

Signed-off-by: Kim, Vinnam <[email protected]>
  • Loading branch information
vinnamkim authored Sep 12, 2023
1 parent ec05391 commit 0064504
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 52 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
(<https://github.com/openvinotoolkit/datumaro/pull/1133>)
- Remove deprecates announced to be removed in 1.5.0
(<https://github.com/openvinotoolkit/datumaro/pull/1140>)
- Add multi-threading option to ModelTransform and SAMBboxToInstanceMask
(<https://github.com/openvinotoolkit/datumaro/pull/1145>)

### Bug fixes
- Fix bugs for Tile transform
Expand Down
24 changes: 16 additions & 8 deletions notebooks/18_bbox_to_instance_mask_using_sam.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,20 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we apply `SAMBboxToInstanceMask` to the dataset.\n",
"This transform requires some arguments to execute properly.\n",
"`inference_server_type` is the type of inference server which SAM encoder and decoder are deployed.\n",
"In this example, we launched the OpenVINO™ Model Server instance, thus please choose `InferenceServerType.ovms`.\n",
"The gRPC endpoint address was `localhost:8001`.\n",
"Therefore, `host=\"localhost\"`, `port=8001`, and `protocol_type=ProtocolType.grpc` should be given.\n",
"Lastly, you can make `Polygon` output for the instance mask, but this time we assign `to_polygon` to `False`,\n",
"so that the output will be `Mask` annotation type."
"Now, we apply `SAMBboxToInstanceMask` to the dataset. This transform requires several arguments to execute properly.\n",
"\n",
"`inference_server_type` represents the type of inference server on which SAM encoder and decoder are deployed. In this example, we launched the OpenVINO™ Model Server instance. Therefore, please select `InferenceServerType.ovms`.\n",
"\n",
"The gRPC endpoint address was `localhost:8001`. To configure this, provide the following parameters:\n",
"- `host=\"localhost\"`\n",
"- `port=8001`\n",
"- `protocol_type=ProtocolType.grpc`\n",
"\n",
"You can also specify a `timeout=60.0` value, which represents the maximum seconds to wait for a response from the server instance.\n",
"\n",
"Additionally, you can choose to produce `Polygon` output for the instance mask. However, in this case, we have set `to_polygon` to `False`, resulting in an output of the `Mask` annotation type.\n",
"\n",
"Lastly, we've set `num_workers=0`. This means we will use synchronous iteration to send a model inference request to the server instance and wait for the inference results. If you need to handle multiple inference requests concurrently, you can increase this value to utilize a thread pool. This is particularly useful when dealing with server instances that have high throughput."
]
},
{
Expand All @@ -178,8 +184,10 @@
" inference_server_type=InferenceServerType.ovms,\n",
" host=\"localhost\",\n",
" port=8001,\n",
" timeout=60.0,\n",
" protocol_type=ProtocolType.grpc,\n",
" to_polygon=False,\n",
" num_workers=0,\n",
" )"
]
},
Expand Down
10 changes: 9 additions & 1 deletion src/datumaro/components/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def run_model(
*,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
**kwargs,
) -> Dataset:
"""
Expand All @@ -454,6 +455,8 @@ def run_model(
batch_size: The number of dataset items processed
simultaneously by the model
append_annotation: Whether append new annotation to existed annotations
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
**kwargs: Parameters for the model
Returns: self
Expand All @@ -465,11 +468,16 @@ def run_model(
launcher=model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
elif inspect.isclass(model) and isinstance(model, ModelTransform):
return self.transform(
model, batch_size=batch_size, append_annotation=append_annotation, **kwargs
model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
else:
raise TypeError("Unexpected 'model' argument type: %s" % type(model))
Expand Down
11 changes: 10 additions & 1 deletion src/datumaro/components/hl_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def run_model(
*,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
**kwargs,
) -> IDataset:
"""
Expand All @@ -207,6 +208,8 @@ def run_model(
batch_size: The number of dataset items processed
simultaneously by the model
append_annotation: Whether append new annotation to existed annotations
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
**kwargs: Parameters for the model
Returns: a wrapper around the input dataset, which is computed lazily
Expand All @@ -220,11 +223,17 @@ def run_model(
launcher=model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
elif inspect.isclass(model) and issubclass(model, ModelTransform):
return HLOps.transform(
dataset, model, batch_size=batch_size, append_annotation=append_annotation, **kwargs
dataset,
model,
batch_size=batch_size,
append_annotation=append_annotation,
num_workers=num_workers,
**kwargs,
)
else:
raise TypeError(f"Unexpected model argument type: {type(model)}")
Expand Down
84 changes: 69 additions & 15 deletions src/datumaro/components/transformer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (C) 2019-2022 Intel Corporation
#
# SPDX-License-Identifier: MIT
from typing import Generator, List, Optional
from multiprocessing.pool import ThreadPool
from typing import Iterator, List, Optional

import numpy as np

Expand All @@ -10,6 +11,7 @@
from datumaro.components.dataset_base import DatasetBase, DatasetItem, IDataset
from datumaro.components.launcher import Launcher
from datumaro.util import is_method_redefined, take_by
from datumaro.util.multi_procs_util import consumer_generator


class Transform(DatasetBase, CliPlugin):
Expand Down Expand Up @@ -71,35 +73,87 @@ def __iter__(self):


class ModelTransform(Transform):
"""A transformation class for applying a model's inference to dataset items.
This class takes an dataset, a launcher, and other optional parameters
to transform the dataset item from the model outputs by the launcher.
It can process items using multiple processes if specified, making it suitable for
parallelized inference tasks.
Parameters:
extractor: The dataset extractor to obtain items from.
launcher: The launcher responsible for model inference.
batch_size: The batch size for processing items. Default is 1.
append_annotation: Whether to append inference annotations to existing annotations.
Default is False.
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
"""

def __init__(
self,
extractor: IDataset,
launcher: Launcher,
batch_size: int = 1,
append_annotation: bool = False,
num_workers: int = 0,
):
super().__init__(extractor)
self._launcher = launcher
self._batch_size = batch_size
self._append_annotation = append_annotation

def __iter__(self) -> Generator[DatasetItem, None, None]:
for batch in take_by(self._extractor, self._batch_size):
inference = self._launcher.launch(
[item for item in batch if self._launcher.type_check(item)]
if not (isinstance(num_workers, int) and num_workers >= 0):
raise ValueError(
f"num_workers should be a non negative integer, but it is {num_workers}"
)

for item in self._yield_item(batch, inference):
self._num_workers = num_workers

def __iter__(self) -> Iterator[DatasetItem]:
if self._num_workers == 0:
return self._iter_single_proc()
return self._iter_multi_procs()

def _iter_multi_procs(self):
with ThreadPool(processes=self._num_workers) as pool:

def _producer_gen():
for batch in take_by(self._extractor, self._batch_size):
future = pool.apply_async(
func=self._process_batch,
args=(batch,),
)
yield future

with consumer_generator(producer_generator=_producer_gen()) as consumer_gen:
for future in consumer_gen:
for item in future.get():
yield item

def _iter_single_proc(self) -> Iterator[DatasetItem]:
for batch in take_by(self._extractor, self._batch_size):
for item in self._process_batch(batch=batch):
yield item

def _yield_item(
self, batch: List[DatasetItem], inference: List[List[Annotation]]
) -> Generator[DatasetItem, None, None]:
for item, annotations in zip(batch, inference):
def _process_batch(
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
inference = self._launcher.launch(
batch=[item for item in batch if self._launcher.type_check(item)]
)

for annotations in inference:
self._check_annotations(annotations)
if self._append_annotation:
annotations = item.annotations + annotations
yield self.wrap_item(item, annotations=annotations)

return [
self.wrap_item(
item,
annotations=item.annotations + annotations
if self._append_annotation
else annotations,
)
for item, annotations in zip(batch, inference)
]

def get_subset(self, name):
subset = self._extractor.get_subset(name)
Expand Down
21 changes: 15 additions & 6 deletions src/datumaro/plugins/missing_annotation_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: MIT

from typing import Generator, List, Optional, Set
from typing import List, Optional, Set

from datumaro.components.abstracts.merger import IMatcherContext
from datumaro.components.annotation import Annotation, AnnotationType, LabelCategories
Expand Down Expand Up @@ -83,18 +83,27 @@ def get_any_label_name(self, ann: Annotation, label_id: int) -> str:
),
}

def _yield_item(
self, batch: List[DatasetItem], inference: List[List[Annotation]]
) -> Generator[DatasetItem, None, None]:
for item, annotations in zip(batch, inference):
def _process_batch(
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
inference = self._launcher.launch(
batch=[item for item in batch if self._launcher.type_check(item)]
)

for annotations in inference:
self._check_annotations(annotations)
yield self.wrap_item(

return [
self.wrap_item(
item,
annotations=self._find_missing_anns(
gt_anns=item.annotations,
pseudo_anns=self._apply_score_threshold(annotations),
),
)
for item, annotations in zip(batch, inference)
]

def _apply_score_threshold(self, annotations: List[Annotation]) -> List[Annotation]:
if self._score_threshold is None:
Expand Down
63 changes: 43 additions & 20 deletions src/datumaro/plugins/sam_transforms/bbox_to_inst_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
"""Bbox-to-instance mask transform using Segment Anything Model"""

import os.path as osp
from typing import Generator, List, Optional
from typing import List, Optional

import datumaro.plugins.sam_transforms.interpreters.sam_decoder_for_bbox as sam_decoder_for_bbox_interp
import datumaro.plugins.sam_transforms.interpreters.sam_encoder as sam_encoder_interp
from datumaro.components.annotation import Mask, Polygon
from datumaro.components.annotation import Bbox, Mask, Polygon
from datumaro.components.cli_plugin import CliPlugin
from datumaro.components.dataset_base import DatasetItem, IDataset
from datumaro.components.transformer import ModelTransform
Expand All @@ -18,7 +18,6 @@
ProtocolType,
TLSConfig,
)
from datumaro.util import take_by
from datumaro.util.mask_tools import extract_contours

__all__ = ["SAMBboxToInstanceMask"]
Expand All @@ -44,6 +43,8 @@ class SAMBboxToInstanceMask(ModelTransform, CliPlugin):
tls_config: Configuration required if the server instance is in the secure mode
protocol_type: Communication protocol type with the server instance
to_polygon: If true, the output `Mask` annotations will be converted to `Polygon` annotations.
num_workers: The number of worker threads to use for parallel inference.
Set to 0 for single-process mode. Default is 0.
"""

def __init__(
Expand All @@ -56,6 +57,7 @@ def __init__(
tls_config: Optional[TLSConfig] = None,
protocol_type: ProtocolType = ProtocolType.grpc,
to_polygon: bool = False,
num_workers: int = 0,
):
if inference_server_type == InferenceServerType.ovms:
launcher_cls = OVMSLauncher
Expand Down Expand Up @@ -90,26 +92,47 @@ def __init__(
launcher=self._sam_encoder_launcher,
batch_size=1,
append_annotation=False,
num_workers=num_workers,
)
self._to_polygon = to_polygon

def __iter__(self) -> Generator[DatasetItem, None, None]:
for batch in take_by(self._extractor, self._batch_size):
batch = [item for item in batch if self._launcher.type_check(item)]
img_embeds = self._sam_encoder_launcher.launch(batch)

for item, img_embed in zip(batch, img_embeds):
# Nested list of mask [[mask_0, ...]]
nested_masks: List[List[Mask]] = self._sam_decoder_launcher.launch(
[item.wrap(annotations=item.annotations + img_embed)],
stack=False,
)

yield item.wrap(
annotations=self._convert_to_polygon(nested_masks[0])
if self._to_polygon
else nested_masks[0]
)
def _process_batch(
self,
batch: List[DatasetItem],
) -> List[DatasetItem]:
img_embeds = self._sam_encoder_launcher.launch(
batch=[item for item in batch if self._sam_encoder_launcher.type_check(item)]
)

items = []
for item, img_embed in zip(batch, img_embeds):
item_to_decode = item.wrap(annotations=item.annotations + img_embed)

if not any(isinstance(ann, Bbox) for ann in item_to_decode.annotations):
item_to_decode.annotations.pop() # Pop the added image embedding
items.append(item_to_decode)
continue

# Nested list of mask [[mask_0, ...]]
nested_masks: List[List[Mask]] = self._sam_decoder_launcher.launch(
[item_to_decode],
stack=False,
)

# Pop the added image embedding
item_to_decode.annotations.pop()
# Leave non-bbox annotations only
item_to_decode.annotations = [
ann for ann in item_to_decode.annotations if not isinstance(ann, Bbox)
]

item_to_decode.annotations += (
self._convert_to_polygon(nested_masks[0]) if self._to_polygon else nested_masks[0]
)

items.append(item_to_decode)

return items

@staticmethod
def _convert_to_polygon(masks: List[Mask]):
Expand Down
Loading

0 comments on commit 0064504

Please sign in to comment.