Skip to content

Commit

Permalink
fix local bucketfs model upload and some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneKress79789 committed Feb 9, 2024
1 parent 3451f58 commit 5a52093
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 32 deletions.
15 changes: 11 additions & 4 deletions exasol_transformers_extension/udfs/models/base_model_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@
class BaseModelUDF(ABC):
"""
This base class should be extended by each UDF class containing model logic.
This class contains common operations for all prediction UDFs. The following
methods should be implemented specifically for each UDF class:
This class contains common operations for all prediction UDFs:
- accesses data part-by-part based on predefined batch size
- manages the script cache
- reads the corresponding model from BucketFS into cache
- creates model pipeline through transformer api
- manages the creation of predictions and the preparation of results.
Additionally, the following
methods should be implemented specifically for each UDF class:
- create_dataframes_from_predictions
- extract_unique_param_based_dataframes
- execute_prediction
- append_predictions_to_input_dataframe
"""
def __init__(self,
exa,
batch_size: int,
pipeline: transformers.Pipeline,
base_model: ModelFactoryProtocol, #todo rename?
base_model: ModelFactoryProtocol,
tokenizer: ModelFactoryProtocol,
task_name: str):
self.exa = exa
Expand Down Expand Up @@ -185,7 +192,7 @@ def check_cache(self, model_df: pd.DataFrame) -> None:

current_model_key = (bucketfs_conn, sub_dir, model_name, token_conn)
if self.model_loader.loaded_model_key != current_model_key:
self.set_cache_dir(model_name, bucketfs_conn, sub_dir) #-> out = "/tmp/pytest-of-marlene/pytest-10/bert-base-uncased0/model_sub_dir/bert_base_uncased"
self.set_cache_dir(model_name, bucketfs_conn, sub_dir)
self.model_loader.clear_device_memory()
self.last_created_pipeline = self.model_loader.load_models(self.cache_dir,
current_model_key)
Expand Down
1 change: 0 additions & 1 deletion exasol_transformers_extension/upload_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def main(

bucketfs_operations.upload_model_files_to_bucketfs(
local_model_path, upload_path, bucketfs_location)
print("model uploaded to :" + str(upload_path)) #todo remove


if __name__ == '__main__':
Expand Down
7 changes: 4 additions & 3 deletions exasol_transformers_extension/utils/bucketfs_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def upload_model_files_to_bucketfs(
tmpdir_name: str, model_path: Path,
bucketfs_location: AbstractBucketFSLocation) -> Path:
"""
uploads model in tmpdir_name to model_path in bucketfs_location"""
uploads model in tmpdir_name to model_path in bucketfs_location
"""
with tempfile.TemporaryFile() as fileobj:
create_tar_of_directory(Path(tmpdir_name), fileobj)
model_upload_tar_file_path = model_path.with_suffix(".tar.gz")
Expand Down Expand Up @@ -71,8 +72,8 @@ def get_local_bucketfs_path(


def get_model_path(sub_dir: str, model_name: str) -> Path:
return Path(sub_dir, model_name.replace('-', '_'))#todo check usage
return Path(sub_dir, model_name.replace('-', '_'))


def get_model_path_with_pretrained(sub_dir: str, model_name: str) -> Path:
return Path(sub_dir, model_name.replace('-', '_'), "pretrained" , model_name)#todo chnage all paths like this?
return Path(sub_dir, model_name.replace('-', '_'), "pretrained" , model_name)
19 changes: 5 additions & 14 deletions tests/fixtures/model_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
AbstractBucketFSLocation


# todo also change upload?
def download_model(model_name: str, tmpdir_name: Path) -> Path:
tmpdir_name = Path(tmpdir_name)
for model_factory in [transformers.AutoModel, transformers.AutoTokenizer]:
Expand All @@ -30,21 +29,13 @@ def upload_model(bucketfs_location: AbstractBucketFSLocation,
tmpdir_name=str(model_dir),
model_path=Path(model_path),
bucketfs_location=bucketfs_location)
print("upload path")
print(model_path)
yield model_path


@contextmanager
def upload_model_to_local_bucketfs(
model_name: str, download_tmpdir: Path) -> str:
download_tmpdir_ = download_tmpdir/ "model_sub_dir/bert_base_uncased"

local_model_save_path = download_model(model_name, download_tmpdir_)
upload_tmpdir_name = local_model_save_path
print("upload_tmpdir_name:")
print(upload_tmpdir_name)
upload_tmpdir_name.mkdir(parents=True, exist_ok=True)
upload_tmpdir_name = download_model(model_name, download_tmpdir)
bucketfs_location = LocalFSMockBucketFSLocation(
PurePosixPath(upload_tmpdir_name))
upload_model(bucketfs_location, model_name, upload_tmpdir_name)
Expand All @@ -55,16 +46,16 @@ def upload_model_to_local_bucketfs(
def upload_base_model_to_local_bucketfs(tmpdir_factory) -> PurePosixPath:
tmpdir = tmpdir_factory.mktemp(model_params.base_model)
with upload_model_to_local_bucketfs(
model_params.base_model, tmpdir) as path:
yield path
model_params.base_model, tmpdir / model_params.sub_dir / model_params.base_model.replace("-", "_")):
yield tmpdir


@pytest.fixture(scope="session")
def upload_seq2seq_model_to_local_bucketfs(tmpdir_factory) -> PurePosixPath:
tmpdir = tmpdir_factory.mktemp(model_params.seq2seq_model)
with upload_model_to_local_bucketfs(
model_params.seq2seq_model, tmpdir) as path:
yield path
model_params.seq2seq_model, tmpdir / model_params.sub_dir / model_params.seq2seq_model.replace("-", "_")):
yield tmpdir


@contextmanager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_dataframe(self, num_rows='all', start_col=0):

@pytest.mark.parametrize(
"description, device_id, n_rows", [
#("on CPU with batch input", None, 3),
("on CPU with batch input", None, 3),
("on CPU with single input", None, 1),
("on GPU with batch input", 0, 3),
("on GPU with single input", 0, 1)
Expand All @@ -60,9 +60,6 @@ def test_filling_mask_udf(
f"to execute the test")

bucketfs_base_path = upload_base_model_to_local_bucketfs
print("upload path")
print(bucketfs_base_path)

bucketfs_conn_name = "bucketfs_connection"
bucketfs_connection = Connection(address=f"file://{bucketfs_base_path}")

Expand Down Expand Up @@ -98,8 +95,6 @@ def test_filling_mask_udf(
new_columns = ['filled_text', 'score', 'rank', 'error_message']

result = Result(result_df)
print("result:")
print(result_df)
assert (
result == ScoreMatcher()
and result == RankDTypeMatcher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ def test_token_classification_udf_with_multiple_aggregation_strategies(
['start_pos', 'end_pos', 'word', 'entity', 'score', 'error_message']

result = Result(result_df)
assert (#todo load path
result == ColumnsMatcher(columns=columns[1:], new_columns=new_columns)
assert (result == ColumnsMatcher(columns=columns[1:], new_columns=new_columns)
and result == NoErrorMessageMatcher()
and set(result_df['aggregation_strategy'].unique()) == {"none", "simple", "max", "average"}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
NewColumnsEmptyMatcher, ErrorMessageMatcher
from tests.utils.parameters import model_params
from exasol_udf_mock_python.connection import Connection

from tests.fixtures.model_fixture import upload_seq2seq_model_to_local_bucketfs

class ExaEnvironment:
def __init__(self, connections: Dict[str, Connection] = None):
Expand Down Expand Up @@ -60,7 +60,7 @@ def get_dataframe(self, num_rows='all', start_col=0):
("English", "French"), ("English", "German"),
("English", "Romanian")])
])
def test_translation_udf(#todo wrong load path
def test_translation_udf(
description, device_id, languages,
upload_seq2seq_model_to_local_bucketfs):
if device_id is not None and not torch.cuda.is_available():
Expand Down

0 comments on commit 5a52093

Please sign in to comment.