Skip to content

Commit

Permalink
updates to keyed model handler
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Jul 24, 2023
1 parent 1a022d9 commit 2f95adc
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def run(
model_handler = HuggingFaceModelHandlerKeyedTensor(
model_uri=known_args.model_name,
model_class=known_args.model_class,
framework='pt',
max_batch_size=1)
if not known_args.input:
text = (
Expand Down
221 changes: 142 additions & 79 deletions sdks/python/apache_beam/ml/inference/huggingface_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import logging
import sys
from abc import ABC
from collections import defaultdict
from typing import Any
from typing import Callable
Expand All @@ -32,9 +31,7 @@
import tensorflow as tf
import torch
from apache_beam.ml.inference import utils
from apache_beam.ml.inference.base import ExampleT
from apache_beam.ml.inference.base import ModelHandler
from apache_beam.ml.inference.base import ModelT
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import PredictionT
from apache_beam.ml.inference.pytorch_inference import _convert_to_device
Expand All @@ -44,7 +41,6 @@
_LOGGER = logging.getLogger(__name__)

__all__ = [
"HuggingFaceModelHandler",
"HuggingFaceModelHandlerTensor",
"HuggingFaceModelHandlerKeyedTensor",
]
Expand Down Expand Up @@ -157,11 +153,17 @@ def _run_inference_tensorflow_keyed_tensor(
return utils._convert_to_result(batch, predictions, model_id)


class HuggingFaceModelHandler(ModelHandler[ExampleT, PredictionT, ModelT], ABC):
class HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str,
Union[tf.Tensor,
torch.Tensor]],
PredictionResult,
Union[AutoModel,
TFAutoModel]]):
def __init__(
self,
model_uri: str,
model_class: Union[AutoModel, TFAutoModel],
framework: str,
device: str = "CPU",
*,
inference_fn: Optional[Callable[..., Iterable[PredictionT]]] = None,
Expand All @@ -172,27 +174,29 @@ def __init__(
large_model: bool = False,
**kwargs):
"""
Implementation of the abstract base class of ModelHandler interface
for Hugging Face. This class shouldn't be instantiated directly.
Use HuggingFaceModelHandlerKeyedTensor or HuggingFaceModelHandlerTensor.
Implementation of the ModelHandler interface for HuggingFace with
Keyed Tensors for PyTorch/Tensorflow backend.
Example Usage model::
pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor(
model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM))
model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM,
framework='pt'))
Args:
model_uri (str): path to the pretrained model on the hugging face
models hub.
model_class: model class to load the repository from model_uri.
framework (str): Framework to use for the model. 'tf' for TensorFlow and
'pt' for PyTorch.
device: For torch tensors, specify device on which you wish to
run the model. Defaults to CPU.
inference_fn: the inference function to use during RunInference.
Default is _run_inference_torch_keyed_tensor or
_run_inference_tensorflow_keyed_tensor depending on the input type.
load_model_args (Dict[str, Any]): keyword arguments to provide load
options while loading models from Hugging Face Hub. Defaults to None.
inference_args (Dict[str, Any]): Non-batchable arguments
load_model_args (Dict[str, Any]): (Optional) Keyword arguments to provide
load options while loading models from Hugging Face Hub.
Defaults to None.
inference_args (Dict[str, Any]): (Optional) Non-batchable arguments
required as inputs to the model's inference function. Unlike Tensors
in `batch`, these parameters will not be dynamically batched.
Defaults to None.
Expand Down Expand Up @@ -221,7 +225,7 @@ def __init__(
if max_batch_size is not None:
self._batching_kwargs["max_batch_size"] = max_batch_size
self._large_model = large_model
self._framework = ""
self._framework = framework

_validate_constructor_args(
model_uri=self._model_uri, model_class=self._model_class)
Expand All @@ -230,47 +234,11 @@ def load_model(self):
"""Loads and initializes the model for processing."""
model = self._model_class.from_pretrained(
self._model_uri, **self._model_config_args)
if self._framework == 'pt':
if self._device == "GPU" and is_gpu_available_torch:
model.to(torch.device("cuda"))
return model

def update_model_path(self, model_path: Optional[str] = None):
self._model_uri = model_path if model_path else self._model_uri

def get_num_bytes(
self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int:
"""
Returns:
The number of bytes of data for the Tensors batch.
"""
if self._framework == "tf":
return sum(sys.getsizeof(element) for element in batch)
else:
return sum(
(el.element_size() for tensor in batch for el in tensor.values()))

def batch_elements_kwargs(self):
return self._batching_kwargs

def share_model_across_processes(self) -> bool:
return self._large_model


class HuggingFaceModelHandlerKeyedTensor(
HuggingFaceModelHandler[Dict[str, Union[tf.Tensor, torch.Tensor]],
PredictionResult,
Union[AutoModel, TFAutoModel]]):
"""
Implementation of the ModelHandler interface for HuggingFace with
Keyed Tensors for PyTorch/Tensorflow backend.
Depending on the type of tensors, the model framework is determined
automatically.
Example Usage model::
pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor(
model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM))
**Supported Versions:** HuggingFaceModelHandler supports transformers>=4.18.0.
"""
def run_inference(
self,
batch: Sequence[Dict[str, Union[tf.Tensor, torch.Tensor]]],
Expand All @@ -297,13 +265,6 @@ def run_inference(
An Iterable of type PredictionResult.
"""
inference_args = {} if not inference_args else inference_args
if not self._framework:
if isinstance(batch[0], tf.Tensor):
self._framework = "tf"
else:
self._framework = "torch"
if self._device == "GPU" and is_gpu_available_torch():
model.to(torch.device("cuda"))

if self._inference_fn:
return self._inference_fn(
Expand All @@ -316,6 +277,27 @@ def run_inference(
return _run_inference_torch_keyed_tensor(
batch, model, self._device, inference_args, self._model_uri)

def update_model_path(self, model_path: Optional[str] = None):
self._model_uri = model_path if model_path else self._model_uri

def get_num_bytes(
self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int:
"""
Returns:
The number of bytes of data for the Tensors batch.
"""
if self._framework == "tf":
return sum(sys.getsizeof(element) for element in batch)
else:
return sum(
(el.element_size() for tensor in batch for el in tensor.values()))

def batch_elements_kwargs(self):
return self._batching_kwargs

def share_model_across_processes(self) -> bool:
return self._large_model

def get_metrics_namespace(self) -> str:
"""
Returns:
Expand Down Expand Up @@ -353,25 +335,86 @@ def _default_inference_fn_tensorflow(
return utils._convert_to_result(batch, predictions, model_id)


class HuggingFaceModelHandlerTensor(HuggingFaceModelHandler[Union[tf.Tensor,
torch.Tensor],
PredictionResult,
Union[AutoModel,
TFAutoModel]]
):
"""
Implementation of the ModelHandler interface for HuggingFace with
Tensors for PyTorch/Tensorflow backend.
class HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor],
PredictionResult,
Union[AutoModel,
TFAutoModel]]):
def __init__(
self,
model_uri: str,
model_class: Union[AutoModel, TFAutoModel],
device: str = "CPU",
*,
inference_fn: Optional[Callable[..., Iterable[PredictionT]]] = None,
load_model_args: Optional[Dict[str, Any]] = None,
inference_args: Optional[Dict[str, Any]] = None,
min_batch_size: Optional[int] = None,
max_batch_size: Optional[int] = None,
large_model: bool = False,
**kwargs):
"""
Implementation of the ModelHandler interface for HuggingFace with
Tensors for PyTorch/Tensorflow backend.
Depending on the type of tensors, the model framework is determined
automatically.
Depending on the type of tensors, the model framework is determined
automatically.
Example Usage model:
pcoll | RunInference(HuggingFaceModelHandlerTensor(
model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM))
Example Usage model:
pcoll | RunInference(HuggingFaceModelHandlerTensor(
model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM))
Args:
model_uri (str): path to the pretrained model on the hugging face
models hub.
model_class: model class to load the repository from model_uri.
device: For torch tensors, specify device on which you wish to
run the model. Defaults to CPU.
inference_fn: the inference function to use during RunInference.
Default is _run_inference_torch_keyed_tensor or
_run_inference_tensorflow_keyed_tensor depending on the input type.
load_model_args (Dict[str, Any]): (Optional) keyword arguments to provide
load options while loading models from Hugging Face Hub.
Defaults to None.
inference_args (Dict[str, Any]): (Optional) Non-batchable arguments
required as inputs to the model's inference function. Unlike Tensors
in `batch`, these parameters will not be dynamically batched.
Defaults to None.
min_batch_size: the minimum batch size to use when batching inputs.
max_batch_size: the maximum batch size to use when batching inputs.
large_model: set to true if your model is large enough to run into
memory pressure if you load multiple copies. Given a model that
consumes N memory and a machine with W cores and M memory, you should
set this to True if N*W > M.
kwargs: 'env_vars' can be used to set environment variables
before loading the model.
**Supported Versions:** HuggingFaceModelHandler supports
transformers>=4.18.0.
"""
self._model_uri = model_uri
self._model_class = model_class
self._device = device
self._inference_fn = inference_fn
self._model_config_args = load_model_args if load_model_args else {}
self._inference_args = inference_args if inference_args else {}
self._batching_kwargs = {}
self._env_vars = kwargs.get("env_vars", {})
if min_batch_size is not None:
self._batching_kwargs["min_batch_size"] = min_batch_size
if max_batch_size is not None:
self._batching_kwargs["max_batch_size"] = max_batch_size
self._large_model = large_model
self._framework = ""

_validate_constructor_args(
model_uri=self._model_uri, model_class=self._model_class)

def load_model(self):
"""Loads and initializes the model for processing."""
model = self._model_class.from_pretrained(
self._model_uri, **self._model_config_args)
return model

**Supported Versions:** HuggingFaceModelHandler supports transformers>=4.18.0.
"""
def run_inference(
self,
batch: Sequence[Union[tf.Tensor, torch.Tensor]],
Expand Down Expand Up @@ -403,9 +446,11 @@ def run_inference(
if isinstance(batch[0], tf.Tensor):
self._framework = "tf"
else:
self._framework = "torch"
if self._device == "GPU" and is_gpu_available_torch():
model.to(torch.device("cuda"))
self._framework = "pt"

if (self._framework == 'pt' and self._device == "GPU" and
is_gpu_available_torch()):
model.to(torch.device("cuda"))

if self._inference_fn:
return self._inference_fn(
Expand All @@ -418,6 +463,24 @@ def run_inference(
return _default_inference_fn_torch(
batch, model, self._device, inference_args, self._model_uri)

def get_num_bytes(
self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int:
"""
Returns:
The number of bytes of data for the Tensors batch.
"""
if self._framework == "tf":
return sum(sys.getsizeof(element) for element in batch)
else:
return sum(
(el.element_size() for tensor in batch for el in tensor.values()))

def batch_elements_kwargs(self):
return self._batching_kwargs

def share_model_across_processes(self) -> bool:
return self._large_model

def get_metrics_namespace(self) -> str:
"""
Returns:
Expand Down

0 comments on commit 2f95adc

Please sign in to comment.