Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added method to load pretrained models from huggingface #790

Merged
merged 8 commits into from
May 20, 2024
366 changes: 363 additions & 3 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ statsmodels = "^0.14.1"
torch = "^2.3.0"
torchvision = "^0.18.0"
xxhash = "^3.4.1"
transformers = "^4.40.2"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.2.1,<9.0.0"
Expand Down
90 changes: 2 additions & 88 deletions src/safeds/data/image/typing/_image_size.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
from __future__ import annotations
from safeds.ml.nn.typing import ConstantImageSize

import sys
from typing import TYPE_CHECKING

from safeds._utils import _structural_hash
from safeds._validation import _check_bounds, _ClosedBound

if TYPE_CHECKING:
from safeds.data.image.containers import Image


class ImageSize:
class ImageSize(ConstantImageSize):
"""
A container for image size data.

Expand All @@ -31,82 +22,5 @@ class ImageSize:
if an invalid channel is given
"""

def __init__(self, width: int, height: int, channel: int, *, _ignore_invalid_channel: bool = False) -> None:
_check_bounds("width", width, lower_bound=_ClosedBound(1))
_check_bounds("height", height, lower_bound=_ClosedBound(1))
if not _ignore_invalid_channel and channel not in (1, 3, 4):
raise ValueError(f"Channel {channel} is not a valid channel option. Use either 1, 3 or 4")
_check_bounds("channel", channel, lower_bound=_ClosedBound(1))

self._width = width
self._height = height
self._channel = channel

@staticmethod
def from_image(image: Image) -> ImageSize:
"""
Create a `ImageSize` of a given image.

Parameters
----------
image:
the given image for the `ImageSize`

Returns
-------
image_size:
the calculated `ImageSize`
"""
return ImageSize(image.width, image.height, image.channel)

def __eq__(self, other: object) -> bool:
if not isinstance(other, ImageSize):
return NotImplemented
return (self is other) or (
self._width == other._width and self._height == other._height and self._channel == other._channel
)

def __hash__(self) -> int:
return _structural_hash(self._width, self._height, self._channel)

def __sizeof__(self) -> int:
return sys.getsizeof(self._width) + sys.getsizeof(self._height) + sys.getsizeof(self._channel)

def __str__(self) -> str:
return f"{self._width}x{self._height}x{self._channel} (WxHxC)"

@property
def width(self) -> int:
"""
Get the width of this `ImageSize` in pixels.

Returns
-------
width:
The width of this `ImageSize`.
"""
return self._width

@property
def height(self) -> int:
"""
Get the height of this `ImageSize` in pixels.

Returns
-------
height:
The height of this `ImageSize`.
"""
return self._height

@property
def channel(self) -> int:
"""
Get the channel of this `ImageSize` in pixels.

Returns
-------
channel:
The channel of this `ImageSize`.
"""
return self._channel
2 changes: 1 addition & 1 deletion src/safeds/data/tabular/containers/_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def get_distinct_values(
else:
series = self._series

return series.unique().sort().to_list()
return series.unique(maintain_order=True).to_list()

def get_value(self, index: int) -> T_co:
"""
Expand Down
4 changes: 2 additions & 2 deletions src/safeds/exceptions/_ml.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from safeds.data.image.typing import ImageSize
from safeds.ml.nn.typing import ModelImageSize


class DatasetMissesFeaturesError(ValueError):
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self) -> None:
class InputSizeError(Exception):
"""Raised when the amount of features being passed to a network does not match with its input size."""

def __init__(self, data_size: int | ImageSize, input_layer_size: int | ImageSize) -> None:
def __init__(self, data_size: int | ModelImageSize, input_layer_size: int | ModelImageSize) -> None:
super().__init__(
f"The data size being passed to the network({data_size}) does not match with its input size({input_layer_size}). Consider changing the data size of the model or reformatting the data.",
)
Expand Down
162 changes: 158 additions & 4 deletions src/safeds/ml/nn/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from safeds._validation import _check_bounds, _ClosedBound
from safeds.data.image.containers import ImageList
from safeds.data.labeled.containers import ImageDataset, TabularDataset, TimeSeriesDataset
from safeds.data.labeled.containers._image_dataset import _ColumnAsTensor
from safeds.data.tabular.containers import Table
from safeds.data.tabular.transformation import OneHotEncoder
from safeds.exceptions import (
FeatureDataMismatchError,
InputSizeError,
Expand All @@ -27,17 +29,18 @@
ForwardLayer,
)
from safeds.ml.nn.layers._pooling2d_layer import _Pooling2DLayer
from safeds.ml.nn.typing import ConstantImageSize, ModelImageSize, VariableImageSize

if TYPE_CHECKING:
from collections.abc import Callable

from torch import Tensor, nn
from torch.nn import Module
from transformers.image_processing_utils import BaseImageProcessor

from safeds.data.image.typing import ImageSize
from safeds.ml.nn.converters import InputConversion, OutputConversion
from safeds.ml.nn.layers import Layer


IFT = TypeVar("IFT", TabularDataset, TimeSeriesDataset, ImageDataset) # InputFitType
IPT = TypeVar("IPT", Table, TimeSeriesDataset, ImageList) # InputPredictType
OT = TypeVar("OT", TabularDataset, TimeSeriesDataset, ImageDataset) # OutputType
Expand Down Expand Up @@ -117,6 +120,61 @@ def __init__(
self._total_number_of_batches_done = 0
self._total_number_of_epochs_done = 0

@staticmethod
def load_pretrained_model(huggingface_repo: str) -> NeuralNetworkRegressor: # pragma: no cover
"""
Load a pretrained model from a [Huggingface repository](https://huggingface.co/models/).

Parameters
----------
huggingface_repo:
the name of the huggingface repository

Returns
-------
pretrained_model:
the pretrained model as a NeuralNetworkRegressor
"""
from transformers import (
AutoConfig,
AutoImageProcessor,
AutoModelForImageToImage,
PretrainedConfig,
Swin2SRForImageSuperResolution,
Swin2SRImageProcessor,
)

_init_default_device()

config: PretrainedConfig = AutoConfig.from_pretrained(huggingface_repo)

if config.model_type != "swin2sr":
raise ValueError("This model is not supported")

model: Swin2SRForImageSuperResolution = AutoModelForImageToImage.from_pretrained(huggingface_repo)

image_processor: Swin2SRImageProcessor = AutoImageProcessor.from_pretrained(huggingface_repo)

if hasattr(config, "num_channels"):
input_size = VariableImageSize(image_processor.pad_size, image_processor.pad_size, config.num_channels)
else: # Should never happen due to model check
raise ValueError("This model is not supported") # pragma: no cover

in_conversion = InputConversionImage(input_size)
out_conversion = OutputConversionImageToImage()

network = NeuralNetworkRegressor.__new__(NeuralNetworkRegressor)
network._input_conversion = in_conversion
network._model = model
network._output_conversion = out_conversion
network._input_size = input_size
network._batch_size = 1
network._is_fitted = True
network._total_number_of_epochs_done = 0
network._total_number_of_batches_done = 0

return network

def fit(
self,
train_data: IFT,
Expand Down Expand Up @@ -243,6 +301,10 @@ def predict(self, test_data: IPT) -> OT:
with torch.no_grad():
for x in dataloader:
elem = self._model(x)
if not isinstance(elem, torch.Tensor) and hasattr(elem, "reconstruction"):
elem = elem.reconstruction # pragma: no cover
elif not isinstance(elem, torch.Tensor):
raise ValueError(f"Output of model has unsupported type: {type(elem)}") # pragma: no cover
predictions.append(elem.squeeze(dim=1))
return self._output_conversion._data_conversion(
test_data,
Expand All @@ -255,6 +317,11 @@ def is_fitted(self) -> bool:
"""Whether the model is fitted."""
return self._is_fitted

@property
def input_size(self) -> int | ModelImageSize:
"""The input size of the model."""
return self._input_size


class NeuralNetworkClassifier(Generic[IFT, IPT, OT]):
"""
Expand Down Expand Up @@ -285,6 +352,13 @@ def __init__(
raise InvalidModelStructureError("You need to provide at least one layer to a neural network.")
if isinstance(output_conversion, OutputConversionImageToImage):
raise InvalidModelStructureError("A NeuralNetworkClassifier cannot be used with images as output.")
if isinstance(input_conversion, InputConversionImage) and isinstance(
input_conversion._input_size,
VariableImageSize,
):
raise InvalidModelStructureError(
"A NeuralNetworkClassifier cannot be used with a InputConversionImage that uses a VariableImageSize.",
)
elif isinstance(input_conversion, InputConversionImage):
if not isinstance(output_conversion, _OutputConversionImage):
raise InvalidModelStructureError(
Expand Down Expand Up @@ -324,7 +398,7 @@ def __init__(
self._input_conversion: InputConversion[IFT, IPT] = input_conversion
self._model = _create_internal_model(input_conversion, layers, is_for_classification=True)
self._output_conversion: OutputConversion[IPT, OT] = output_conversion
self._input_size = self._model.input_size
self._input_size: int | ModelImageSize = self._model.input_size
self._batch_size = 1
self._is_fitted = False
self._num_of_classes = (
Expand All @@ -333,6 +407,77 @@ def __init__(
self._total_number_of_batches_done = 0
self._total_number_of_epochs_done = 0

@staticmethod
def load_pretrained_model(huggingface_repo: str) -> NeuralNetworkClassifier: # pragma: no cover
"""
Load a pretrained model from a [Huggingface repository](https://huggingface.co/models/).

Parameters
----------
huggingface_repo:
the name of the huggingface repository

Returns
-------
pretrained_model:
the pretrained model as a NeuralNetworkClassifier
"""
from transformers import AutoConfig, AutoImageProcessor, AutoModelForImageClassification, PretrainedConfig
from transformers.models.auto.modeling_auto import MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING_NAMES

_init_default_device()

config: PretrainedConfig = AutoConfig.from_pretrained(huggingface_repo)

if config.model_type not in MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING_NAMES:
raise ValueError("This model is not supported")

model: Module = AutoModelForImageClassification.from_pretrained(huggingface_repo)

image_processor: BaseImageProcessor = AutoImageProcessor.from_pretrained(huggingface_repo)
if hasattr(image_processor, "size") and hasattr(config, "num_channels"):
if "shortest_edge" in image_processor.size:
input_size = ConstantImageSize(
image_processor.size.get("shortest_edge"),
image_processor.size.get("shortest_edge"),
config.num_channels,
)
else:
input_size = ConstantImageSize(
image_processor.size.get("width"),
image_processor.size.get("height"),
config.num_channels,
)
else: # Should never happen due to model check
raise ValueError("This model is not supported") # pragma: no cover

label_dict: dict[str, str] = config.id2label
column_name = "label"
labels_table = Table({column_name: [label for _, label in label_dict.items()]})
one_hot_encoder = OneHotEncoder().fit(labels_table, [column_name])

in_conversion = InputConversionImage(input_size)
out_conversion = OutputConversionImageToColumn()

in_conversion._column_name = column_name
in_conversion._one_hot_encoder = one_hot_encoder
in_conversion._input_size = input_size
in_conversion._output_type = _ColumnAsTensor
num_of_classes = labels_table.row_count

network = NeuralNetworkClassifier.__new__(NeuralNetworkClassifier)
network._input_conversion = in_conversion
network._model = model
network._output_conversion = out_conversion
network._input_size = input_size
network._batch_size = 1
network._is_fitted = True
network._num_of_classes = num_of_classes
network._total_number_of_epochs_done = 0
network._total_number_of_batches_done = 0

return network

def fit(
self,
train_data: IFT,
Expand Down Expand Up @@ -466,6 +611,10 @@ def predict(self, test_data: IPT) -> OT:
with torch.no_grad():
for x in dataloader:
elem = self._model(x)
if not isinstance(elem, torch.Tensor) and hasattr(elem, "logits"):
elem = elem.logits # pragma: no cover
elif not isinstance(elem, torch.Tensor):
raise ValueError(f"Output of model has unsupported type: {type(elem)}") # pragma: no cover
if self._num_of_classes > 1:
predictions.append(torch.argmax(elem, dim=1))
else:
Expand All @@ -481,6 +630,11 @@ def is_fitted(self) -> bool:
"""Whether the model is fitted."""
return self._is_fitted

@property
def input_size(self) -> int | ModelImageSize:
"""The input size of the model."""
return self._input_size


def _create_internal_model(
input_conversion: InputConversion[IFT, IPT],
Expand Down Expand Up @@ -518,7 +672,7 @@ def __init__(self, layers: list[Layer], is_for_classification: bool) -> None:
self._pytorch_layers = nn.Sequential(*internal_layers)

@property
def input_size(self) -> int | ImageSize:
def input_size(self) -> int | ModelImageSize:
return self._layer_list[0].input_size

def forward(self, x: Tensor) -> Tensor:
Expand Down
Loading