From 562bab2a77350b9a05b193dc8991b7e0c5368c44 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Wed, 8 Nov 2023 15:38:42 -0500 Subject: [PATCH 1/8] First try at fixing OOM's --- machine/jobs/nmt_engine_build_job.py | 38 ++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/machine/jobs/nmt_engine_build_job.py b/machine/jobs/nmt_engine_build_job.py index 36d7539..276e5ae 100644 --- a/machine/jobs/nmt_engine_build_job.py +++ b/machine/jobs/nmt_engine_build_job.py @@ -87,20 +87,38 @@ def run( current_inference_step = 0 phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) batch_size = self._config["batch_size"] + translate_batch = TranslateBatch(batch_size) for pi_batch in batch(src_pretranslations, batch_size): if check_canceled is not None: check_canceled() - _translate_batch(engine, pi_batch, writer) + translate_batch.translate(engine, pi_batch, writer) current_inference_step += len(pi_batch) phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) -def _translate_batch( - engine: TranslationEngine, - batch: Sequence[PretranslationInfo], - writer: PretranslationWriter, -) -> None: - source_segments = [pi["translation"] for pi in batch] - for i, result in enumerate(engine.translate_batch(source_segments)): - batch[i]["translation"] = result.translation - writer.write(batch[i]) +batch_divisor = 1 + + +class TranslateBatch: + def __init__(self, initial_batch_size): + self.batch_size = initial_batch_size + + def translate( + self, + engine: TranslationEngine, + batch: Sequence[PretranslationInfo], + writer: PretranslationWriter, + ) -> None: + while True: + source_segments = [pi["translation"] for pi in batch] + outer_batch_size = len(source_segments) + try: + for step in range(0, outer_batch_size, self.batch_size): + for i, result in enumerate(engine.translate_batch(source_segments[step : step + self.batch_size])): + batch[i + step]["translation"] = result.translation + for i in range(len(source_segments)): + writer.write(batch[i]) + break + except Exception: + self.batch_size = max(self.batch_size // 2, 1) + logger.info(f"Out of memory error, reducing batch size to {self.batch_size}") From c61f84185a357c95bb0e4dd1e01b7b71745d9571 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Thu, 9 Nov 2023 15:27:31 -0500 Subject: [PATCH 2/8] Second round of fixes for OOM errors --- .../hugging_face_nmt_model_factory.py | 9 ++++++- machine/jobs/nmt_engine_build_job.py | 24 +++++++++++-------- machine/jobs/nmt_model_factory.py | 2 +- .../huggingface/hugging_face_nmt_engine.py | 5 ++++ machine/translation/translation_engine.py | 4 ++++ 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py index 8601f71..d17fe25 100644 --- a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py +++ b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py @@ -1,3 +1,4 @@ +import logging from pathlib import Path from typing import Any, cast @@ -15,6 +16,8 @@ from ..nmt_model_factory import NmtModelFactory from ..shared_file_service import SharedFileService +logger = logging.getLogger(__name__) + class HuggingFaceNmtModelFactory(NmtModelFactory): def __init__(self, config: Any, shared_file_service: SharedFileService) -> None: @@ -67,7 +70,11 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: add_unk_trg_tokens=self._config.huggingface.tokenizer.add_unk_trg_tokens, ) - def create_engine(self) -> TranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: + if half_previous_batch_size: + self._config.huggingface.generate_params.batch_size = max( + self._config.huggingface.generate_params.batch_size // 2, 1 + ) return HuggingFaceNmtEngine( self._model, src_lang=self._config.src_lang, diff --git a/machine/jobs/nmt_engine_build_job.py b/machine/jobs/nmt_engine_build_job.py index 276e5ae..4749bde 100644 --- a/machine/jobs/nmt_engine_build_job.py +++ b/machine/jobs/nmt_engine_build_job.py @@ -81,17 +81,16 @@ def run( inference_step_count = sum(1 for _ in src_pretranslations) with ExitStack() as stack: phase_progress = stack.enter_context(progress_reporter.start_next_phase()) - engine = stack.enter_context(self._nmt_model_factory.create_engine()) src_pretranslations = stack.enter_context(self._shared_file_service.get_source_pretranslations()) writer = stack.enter_context(self._shared_file_service.open_target_pretranslation_writer()) current_inference_step = 0 phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) batch_size = self._config["batch_size"] - translate_batch = TranslateBatch(batch_size) + translate_batch = TranslateBatch(stack, self._nmt_model_factory) for pi_batch in batch(src_pretranslations, batch_size): if check_canceled is not None: check_canceled() - translate_batch.translate(engine, pi_batch, writer) + translate_batch.translate(pi_batch, writer) current_inference_step += len(pi_batch) phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) @@ -100,12 +99,13 @@ def run( class TranslateBatch: - def __init__(self, initial_batch_size): - self.batch_size = initial_batch_size + def __init__(self, stack: ExitStack, nmt_model_factory: NmtModelFactory): + self._stack = stack + self._nmt_model_factory = nmt_model_factory + self._engine = self._stack.enter_context(self._nmt_model_factory.create_engine()) def translate( self, - engine: TranslationEngine, batch: Sequence[PretranslationInfo], writer: PretranslationWriter, ) -> None: @@ -113,12 +113,16 @@ def translate( source_segments = [pi["translation"] for pi in batch] outer_batch_size = len(source_segments) try: - for step in range(0, outer_batch_size, self.batch_size): - for i, result in enumerate(engine.translate_batch(source_segments[step : step + self.batch_size])): + for step in range(0, outer_batch_size, self._engine.get_batch_size()): + for i, result in enumerate( + self._engine.translate_batch(source_segments[step : step + self._engine.get_batch_size()]) + ): batch[i + step]["translation"] = result.translation for i in range(len(source_segments)): writer.write(batch[i]) break except Exception: - self.batch_size = max(self.batch_size // 2, 1) - logger.info(f"Out of memory error, reducing batch size to {self.batch_size}") + logger.info(f"Out of memory error, reducing batch size to {self._engine.get_batch_size() // 2}") + self._engine = self._stack.enter_context( + self._nmt_model_factory.create_engine(half_previous_batch_size=True) + ) diff --git a/machine/jobs/nmt_model_factory.py b/machine/jobs/nmt_model_factory.py index 850280b..6161320 100644 --- a/machine/jobs/nmt_model_factory.py +++ b/machine/jobs/nmt_model_factory.py @@ -29,7 +29,7 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: ... @abstractmethod - def create_engine(self) -> TranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: ... @abstractmethod diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index e730d16..b93706f 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -56,6 +56,8 @@ def __init__( ): raise ValueError(f"'{tgt_lang}' is not a valid language code.") + self._batch_size = int(pipeline_kwargs.get("batch_size")) # type: ignore[assignment] + self._pipeline = _TranslationPipeline( model=model, tokenizer=self._tokenizer, @@ -71,6 +73,9 @@ def translate_n(self, n: int, segment: Union[str, Sequence[str]]) -> Sequence[Tr def translate_batch(self, segments: Sequence[Union[str, Sequence[str]]]) -> Sequence[TranslationResult]: return [results[0] for results in self.translate_n_batch(1, segments)] + def get_batch_size(self) -> int: + return self._batch_size + def translate_n_batch( self, n: int, segments: Sequence[Union[str, Sequence[str]]] ) -> Sequence[Sequence[TranslationResult]]: diff --git a/machine/translation/translation_engine.py b/machine/translation/translation_engine.py index 56bf9ce..d65606d 100644 --- a/machine/translation/translation_engine.py +++ b/machine/translation/translation_engine.py @@ -20,6 +20,10 @@ def translate_n(self, n: int, segment: Union[str, Sequence[str]]) -> Sequence[Tr def translate_batch(self, segments: Sequence[Union[str, Sequence[str]]]) -> Sequence[TranslationResult]: ... + @abstractmethod + def get_batch_size(self) -> int: + ... + @abstractmethod def translate_n_batch( self, n: int, segments: Sequence[Union[str, Sequence[str]]] From f77b1c863fcc546ef055d2abe267a616cc9c1ed3 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Thu, 9 Nov 2023 17:14:48 -0500 Subject: [PATCH 3/8] Fix tests --- .../huggingface/hugging_face_nmt_model_factory.py | 4 ++-- machine/jobs/nmt_engine_build_job.py | 4 ++-- machine/jobs/nmt_model_factory.py | 4 ++-- .../huggingface/hugging_face_nmt_engine.py | 10 +++++++--- machine/translation/nmt_translation_engine.py | 12 ++++++++++++ machine/translation/translation_engine.py | 8 ++------ tests/jobs/test_nmt_engine_build_job.py | 5 +++-- 7 files changed, 30 insertions(+), 17 deletions(-) create mode 100644 machine/translation/nmt_translation_engine.py diff --git a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py index d17fe25..685a5d3 100644 --- a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py +++ b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py @@ -10,9 +10,9 @@ from ...corpora.text_corpus import TextCorpus from ...translation.huggingface.hugging_face_nmt_engine import HuggingFaceNmtEngine from ...translation.huggingface.hugging_face_nmt_model_trainer import HuggingFaceNmtModelTrainer +from ...translation.nmt_translation_engine import NmtTranslationEngine from ...translation.null_trainer import NullTrainer from ...translation.trainer import Trainer -from ...translation.translation_engine import TranslationEngine from ..nmt_model_factory import NmtModelFactory from ..shared_file_service import SharedFileService @@ -70,7 +70,7 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: add_unk_trg_tokens=self._config.huggingface.tokenizer.add_unk_trg_tokens, ) - def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> NmtTranslationEngine: if half_previous_batch_size: self._config.huggingface.generate_params.batch_size = max( self._config.huggingface.generate_params.batch_size // 2, 1 diff --git a/machine/jobs/nmt_engine_build_job.py b/machine/jobs/nmt_engine_build_job.py index 4749bde..3799205 100644 --- a/machine/jobs/nmt_engine_build_job.py +++ b/machine/jobs/nmt_engine_build_job.py @@ -3,7 +3,7 @@ from typing import Any, Callable, Optional, Sequence from ..corpora.corpora_utils import batch -from ..translation.translation_engine import TranslationEngine +from ..translation.nmt_translation_engine import NmtTranslationEngine from ..utils.phased_progress_reporter import Phase, PhasedProgressReporter from ..utils.progress_status import ProgressStatus from .nmt_model_factory import NmtModelFactory @@ -102,7 +102,7 @@ class TranslateBatch: def __init__(self, stack: ExitStack, nmt_model_factory: NmtModelFactory): self._stack = stack self._nmt_model_factory = nmt_model_factory - self._engine = self._stack.enter_context(self._nmt_model_factory.create_engine()) + self._engine: NmtTranslationEngine = self._stack.enter_context(self._nmt_model_factory.create_engine()) def translate( self, diff --git a/machine/jobs/nmt_model_factory.py b/machine/jobs/nmt_model_factory.py index 6161320..c108b9a 100644 --- a/machine/jobs/nmt_model_factory.py +++ b/machine/jobs/nmt_model_factory.py @@ -2,8 +2,8 @@ from ..corpora.parallel_text_corpus import ParallelTextCorpus from ..corpora.text_corpus import TextCorpus +from ..translation.nmt_translation_engine import NmtTranslationEngine from ..translation.trainer import Trainer -from ..translation.translation_engine import TranslationEngine class NmtModelFactory(ABC): @@ -29,7 +29,7 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: ... @abstractmethod - def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> NmtTranslationEngine: ... @abstractmethod diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index b93706f..3d9102c 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -11,14 +11,14 @@ from ...annotations.range import Range from ...utils.typeshed import StrPath -from ..translation_engine import TranslationEngine +from ..nmt_translation_engine import NmtTranslationEngine from ..translation_result import TranslationResult from ..translation_result_builder import TranslationResultBuilder from ..translation_sources import TranslationSources from ..word_alignment_matrix import WordAlignmentMatrix -class HuggingFaceNmtEngine(TranslationEngine): +class HuggingFaceNmtEngine(NmtTranslationEngine): def __init__( self, model: Union[PreTrainedModel, StrPath, str], @@ -56,7 +56,11 @@ def __init__( ): raise ValueError(f"'{tgt_lang}' is not a valid language code.") - self._batch_size = int(pipeline_kwargs.get("batch_size")) # type: ignore[assignment] + batch_size = pipeline_kwargs.get("batch_size") + if batch_size is not None: + self._batch_size = int(batch_size) # type: ignore[assignment] + else: + self._batch_size = 16 self._pipeline = _TranslationPipeline( model=model, diff --git a/machine/translation/nmt_translation_engine.py b/machine/translation/nmt_translation_engine.py new file mode 100644 index 0000000..c486a07 --- /dev/null +++ b/machine/translation/nmt_translation_engine.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from abc import abstractmethod +from typing import ContextManager + +from .translation_engine import TranslationEngine + + +class NmtTranslationEngine(TranslationEngine, ContextManager["NmtTranslationEngine"]): + @abstractmethod + def get_batch_size(self) -> int: + ... diff --git a/machine/translation/translation_engine.py b/machine/translation/translation_engine.py index d65606d..a152e2a 100644 --- a/machine/translation/translation_engine.py +++ b/machine/translation/translation_engine.py @@ -2,12 +2,12 @@ from abc import abstractmethod from types import TracebackType -from typing import ContextManager, Optional, Sequence, Type, Union +from typing import Optional, Sequence, Type, Union from .translation_result import TranslationResult -class TranslationEngine(ContextManager["TranslationEngine"]): +class TranslationEngine: @abstractmethod def translate(self, segment: Union[str, Sequence[str]]) -> TranslationResult: ... @@ -20,10 +20,6 @@ def translate_n(self, n: int, segment: Union[str, Sequence[str]]) -> Sequence[Tr def translate_batch(self, segments: Sequence[Union[str, Sequence[str]]]) -> Sequence[TranslationResult]: ... - @abstractmethod - def get_batch_size(self) -> int: - ... - @abstractmethod def translate_n_batch( self, n: int, segments: Sequence[Union[str, Sequence[str]]] diff --git a/tests/jobs/test_nmt_engine_build_job.py b/tests/jobs/test_nmt_engine_build_job.py index b3a1bf6..142149f 100644 --- a/tests/jobs/test_nmt_engine_build_job.py +++ b/tests/jobs/test_nmt_engine_build_job.py @@ -10,7 +10,7 @@ from machine.corpora import DictionaryTextCorpus from machine.jobs import NmtEngineBuildJob, NmtModelFactory, PretranslationInfo, PretranslationWriter, SharedFileService from machine.translation import Phrase, Trainer, TrainStats, TranslationResult, TranslationSources, WordAlignmentMatrix -from machine.translation.translation_engine import TranslationEngine +from machine.translation.nmt_translation_engine import NmtTranslationEngine from machine.utils import CanceledError, ContextManagedGenerator @@ -45,8 +45,9 @@ def __init__(self, decoy: Decoy) -> None: stats.metrics["bleu"] = 30.0 decoy.when(self.model_trainer.stats).then_return(stats) - self.engine = decoy.mock(cls=TranslationEngine) + self.engine = decoy.mock(cls=NmtTranslationEngine) decoy.when(self.engine.__enter__()).then_return(self.engine) + decoy.when(self.engine.get_batch_size()).then_return(16) decoy.when(self.engine.translate_batch(matchers.Anything())).then_return( [ TranslationResult( From 9872305719a47c38eb2e1fb8d7ffae0f75681c4c Mon Sep 17 00:00:00 2001 From: John Lambert Date: Mon, 20 Nov 2023 12:30:19 -0500 Subject: [PATCH 4/8] Rework to add to huggingface directly --- machine/jobs/settings.yaml | 3 + .../huggingface/hugging_face_nmt_engine.py | 67 +++++++++++++++---- .../hugging_face_nmt_model_trainer.py | 10 ++- 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/machine/jobs/settings.yaml b/machine/jobs/settings.yaml index e1a692b..fa8908f 100644 --- a/machine/jobs/settings.yaml +++ b/machine/jobs/settings.yaml @@ -3,6 +3,7 @@ default: max_steps: 20000 data_dir: ~/machine batch_size: 1024 + oom_batch_size_backoff_multiplier: 0.5 huggingface: parent_model_name: facebook/nllb-200-distilled-1.3B train_params: @@ -33,5 +34,7 @@ staging: max_steps: 10 huggingface: parent_model_name: facebook/nllb-200-distilled-600M + train_params: + group_by_length: false generate_params: num_beams: 1 diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index 3d9102c..2bd4abc 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -1,6 +1,7 @@ from __future__ import annotations import gc +import logging from math import exp, prod from typing import Any, Iterable, List, Sequence, Tuple, Union, cast @@ -17,6 +18,8 @@ from ..translation_sources import TranslationSources from ..word_alignment_matrix import WordAlignmentMatrix +logger = logging.getLogger(__name__) + class HuggingFaceNmtEngine(NmtTranslationEngine): def __init__( @@ -24,22 +27,26 @@ def __init__( model: Union[PreTrainedModel, StrPath, str], **pipeline_kwargs, ) -> None: - if isinstance(model, PreTrainedModel): - model.eval() + self._model = model + self._pipeline_kwargs = pipeline_kwargs + if isinstance(self._model, PreTrainedModel): + self._model.eval() else: - model_config = AutoConfig.from_pretrained(str(model), label2id={}, id2label={}, num_labels=0) - model = cast(PreTrainedModel, AutoModelForSeq2SeqLM.from_pretrained(str(model), config=model_config)) - self._tokenizer = AutoTokenizer.from_pretrained(model.name_or_path, use_fast=True) + model_config = AutoConfig.from_pretrained(str(self._model), label2id={}, id2label={}, num_labels=0) + self._model = cast( + PreTrainedModel, AutoModelForSeq2SeqLM.from_pretrained(str(self._model), config=model_config) + ) + self._tokenizer = AutoTokenizer.from_pretrained(self._model.name_or_path, use_fast=True) - src_lang = pipeline_kwargs.get("src_lang") - tgt_lang = pipeline_kwargs.get("tgt_lang") + src_lang = self._pipeline_kwargs.get("src_lang") + tgt_lang = self._pipeline_kwargs.get("tgt_lang") if ( src_lang is not None and tgt_lang is not None - and "prefix" not in pipeline_kwargs - and (model.name_or_path.startswith("t5-") or model.name_or_path.startswith("google/mt5-")) + and "prefix" not in self._pipeline_kwargs + and (self._model.name_or_path.startswith("t5-") or self._model.name_or_path.startswith("google/mt5-")) ): - pipeline_kwargs["prefix"] = f"translate {src_lang} to {tgt_lang}: " + self._pipeline_kwargs["prefix"] = f"translate {src_lang} to {tgt_lang}: " else: additional_special_tokens = self._tokenizer.additional_special_tokens if ( @@ -56,16 +63,20 @@ def __init__( ): raise ValueError(f"'{tgt_lang}' is not a valid language code.") - batch_size = pipeline_kwargs.get("batch_size") + batch_size = self._pipeline_kwargs.pop("batch_size") if batch_size is not None: self._batch_size = int(batch_size) # type: ignore[assignment] else: self._batch_size = 16 + # If not set, default to not backing off (1.0). + self._oom_batch_size_backoff_multiplier = self._pipeline_kwargs.pop("oom_batch_size_backoff_multiplier", 1.0) + self._pipeline = _TranslationPipeline( - model=model, + model=self._model, tokenizer=self._tokenizer, - **pipeline_kwargs, + batch_size=self._batch_size, + **self._pipeline_kwargs, ) def translate(self, segment: Union[str, Sequence[str]]) -> TranslationResult: @@ -82,6 +93,36 @@ def get_batch_size(self) -> int: def translate_n_batch( self, n: int, segments: Sequence[Union[str, Sequence[str]]] + ) -> Sequence[Sequence[TranslationResult]]: + while True: + if type(segments) is str: + segments = [segments] + else: + segments = [segment for segment in segments] + outer_batch_size = len(segments) + all_results: List[Sequence[TranslationResult]] = [] + try: + for step in range(0, outer_batch_size, self._batch_size): + all_results.extend(self._try_translate_n_batch(n, segments[step : step + self._batch_size])) + return all_results + except Exception as e: + if self._oom_batch_size_backoff_multiplier >= 0.9999: + raise Exception( + "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier to < 1 to gracefuly handle these type of errors." + ) from e + self._batch_size = max(int(round(self._batch_size * self._oom_batch_size_backoff_multiplier)), 1) + logger.info( + f"Out of memory error caught, reducing batch size to {self._batch_size}. Remaking translation pipeline." + ) + self._pipeline = _TranslationPipeline( + model=self._model, + tokenizer=self._tokenizer, + batch_size=self._batch_size, + **self._pipeline_kwargs, + ) + + def _try_translate_n_batch( + self, n: int, segments: Sequence[Union[str, Sequence[str]]] ) -> Sequence[Sequence[TranslationResult]]: all_results: List[List[TranslationResult]] = [] i = 0 diff --git a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py index ddc8dfc..07b04bc 100644 --- a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py +++ b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py @@ -134,6 +134,7 @@ def train( # Set seed before initializing model. set_seed(self._training_args.seed) + logger.info("Initializing tokenizer.") if isinstance(self._model, PreTrainedModel): model = self._model self._original_use_cache = model.config.use_cache @@ -193,6 +194,7 @@ def add_tokens(tokenizer: Any, missing_tokens: List[str]) -> Any: logger.info(f"Added {len(missing_tokens)} tokens to the tokenizer: {missing_tokens}") return AutoTokenizer.from_pretrained(str(tokenizer_dir), use_fast=True) + logger.info("Checking for missing tokens.") if self._add_unk_src_tokens or self._add_unk_trg_tokens: if not isinstance(tokenizer, PreTrainedTokenizerFast): logger.warning( @@ -233,6 +235,7 @@ def add_lang_code_to_tokenizer(tokenizer: Any, lang_code: str): tokenizer.lang_token_to_id[lang_code] = lang_id tokenizer.id_to_lang_token[lang_id] = lang_code + logger.info("Add new language codes as tokens.") if isinstance(tokenizer, MULTILINGUAL_TOKENIZERS): if self._src_lang is not None: add_lang_code_to_tokenizer(tokenizer, self._src_lang) @@ -309,6 +312,7 @@ def preprocess_function(examples): model_inputs["labels"] = labels["input_ids"] return model_inputs + logger.info("Run tokenizer.") train_dataset = train_dataset.map( preprocess_function, batched=True, @@ -339,17 +343,21 @@ def preprocess_function(examples): ], ) + logger.info("Train NMT model.") ckpt = None if self._training_args.resume_from_checkpoint is not None: ckpt = self._training_args.resume_from_checkpoint elif last_checkpoint is not None: ckpt = last_checkpoint - train_result = self._trainer.train(resume_from_checkpoint=ckpt) + train_result = self._trainer.train( + resume_from_checkpoint=ckpt, + ) self._metrics = train_result.metrics self._metrics["train_samples"] = len(train_dataset) self._trainer.log_metrics("train", self._metrics) + logger.info("Model training finished.") def save(self) -> None: if self._trainer is None: From db31d4193f57be55269a99693bd0e26b49f0f183 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Tue, 21 Nov 2023 10:16:28 -0500 Subject: [PATCH 5/8] Revert initial implementation. Respond to reviewer comments. --- .../hugging_face_nmt_model_factory.py | 4 +- machine/jobs/nmt_engine_build_job.py | 46 +++++-------------- machine/jobs/nmt_model_factory.py | 4 +- machine/jobs/settings.yaml | 2 - .../huggingface/hugging_face_nmt_engine.py | 23 +++++----- .../hugging_face_nmt_model_trainer.py | 13 +++--- machine/translation/nmt_translation_engine.py | 12 ----- tests/jobs/test_nmt_engine_build_job.py | 5 +- 8 files changed, 36 insertions(+), 73 deletions(-) delete mode 100644 machine/translation/nmt_translation_engine.py diff --git a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py index 685a5d3..d17fe25 100644 --- a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py +++ b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py @@ -10,9 +10,9 @@ from ...corpora.text_corpus import TextCorpus from ...translation.huggingface.hugging_face_nmt_engine import HuggingFaceNmtEngine from ...translation.huggingface.hugging_face_nmt_model_trainer import HuggingFaceNmtModelTrainer -from ...translation.nmt_translation_engine import NmtTranslationEngine from ...translation.null_trainer import NullTrainer from ...translation.trainer import Trainer +from ...translation.translation_engine import TranslationEngine from ..nmt_model_factory import NmtModelFactory from ..shared_file_service import SharedFileService @@ -70,7 +70,7 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: add_unk_trg_tokens=self._config.huggingface.tokenizer.add_unk_trg_tokens, ) - def create_engine(self, half_previous_batch_size=False) -> NmtTranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: if half_previous_batch_size: self._config.huggingface.generate_params.batch_size = max( self._config.huggingface.generate_params.batch_size // 2, 1 diff --git a/machine/jobs/nmt_engine_build_job.py b/machine/jobs/nmt_engine_build_job.py index 3799205..36d7539 100644 --- a/machine/jobs/nmt_engine_build_job.py +++ b/machine/jobs/nmt_engine_build_job.py @@ -3,7 +3,7 @@ from typing import Any, Callable, Optional, Sequence from ..corpora.corpora_utils import batch -from ..translation.nmt_translation_engine import NmtTranslationEngine +from ..translation.translation_engine import TranslationEngine from ..utils.phased_progress_reporter import Phase, PhasedProgressReporter from ..utils.progress_status import ProgressStatus from .nmt_model_factory import NmtModelFactory @@ -81,48 +81,26 @@ def run( inference_step_count = sum(1 for _ in src_pretranslations) with ExitStack() as stack: phase_progress = stack.enter_context(progress_reporter.start_next_phase()) + engine = stack.enter_context(self._nmt_model_factory.create_engine()) src_pretranslations = stack.enter_context(self._shared_file_service.get_source_pretranslations()) writer = stack.enter_context(self._shared_file_service.open_target_pretranslation_writer()) current_inference_step = 0 phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) batch_size = self._config["batch_size"] - translate_batch = TranslateBatch(stack, self._nmt_model_factory) for pi_batch in batch(src_pretranslations, batch_size): if check_canceled is not None: check_canceled() - translate_batch.translate(pi_batch, writer) + _translate_batch(engine, pi_batch, writer) current_inference_step += len(pi_batch) phase_progress(ProgressStatus.from_step(current_inference_step, inference_step_count)) -batch_divisor = 1 - - -class TranslateBatch: - def __init__(self, stack: ExitStack, nmt_model_factory: NmtModelFactory): - self._stack = stack - self._nmt_model_factory = nmt_model_factory - self._engine: NmtTranslationEngine = self._stack.enter_context(self._nmt_model_factory.create_engine()) - - def translate( - self, - batch: Sequence[PretranslationInfo], - writer: PretranslationWriter, - ) -> None: - while True: - source_segments = [pi["translation"] for pi in batch] - outer_batch_size = len(source_segments) - try: - for step in range(0, outer_batch_size, self._engine.get_batch_size()): - for i, result in enumerate( - self._engine.translate_batch(source_segments[step : step + self._engine.get_batch_size()]) - ): - batch[i + step]["translation"] = result.translation - for i in range(len(source_segments)): - writer.write(batch[i]) - break - except Exception: - logger.info(f"Out of memory error, reducing batch size to {self._engine.get_batch_size() // 2}") - self._engine = self._stack.enter_context( - self._nmt_model_factory.create_engine(half_previous_batch_size=True) - ) +def _translate_batch( + engine: TranslationEngine, + batch: Sequence[PretranslationInfo], + writer: PretranslationWriter, +) -> None: + source_segments = [pi["translation"] for pi in batch] + for i, result in enumerate(engine.translate_batch(source_segments)): + batch[i]["translation"] = result.translation + writer.write(batch[i]) diff --git a/machine/jobs/nmt_model_factory.py b/machine/jobs/nmt_model_factory.py index c108b9a..6161320 100644 --- a/machine/jobs/nmt_model_factory.py +++ b/machine/jobs/nmt_model_factory.py @@ -2,8 +2,8 @@ from ..corpora.parallel_text_corpus import ParallelTextCorpus from ..corpora.text_corpus import TextCorpus -from ..translation.nmt_translation_engine import NmtTranslationEngine from ..translation.trainer import Trainer +from ..translation.translation_engine import TranslationEngine class NmtModelFactory(ABC): @@ -29,7 +29,7 @@ def create_model_trainer(self, corpus: ParallelTextCorpus) -> Trainer: ... @abstractmethod - def create_engine(self, half_previous_batch_size=False) -> NmtTranslationEngine: + def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: ... @abstractmethod diff --git a/machine/jobs/settings.yaml b/machine/jobs/settings.yaml index fa8908f..1343812 100644 --- a/machine/jobs/settings.yaml +++ b/machine/jobs/settings.yaml @@ -34,7 +34,5 @@ staging: max_steps: 10 huggingface: parent_model_name: facebook/nllb-200-distilled-600M - train_params: - group_by_length: false generate_params: num_beams: 1 diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index 2bd4abc..dd137f6 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -12,7 +12,7 @@ from ...annotations.range import Range from ...utils.typeshed import StrPath -from ..nmt_translation_engine import NmtTranslationEngine +from ..translation_engine import TranslationEngine from ..translation_result import TranslationResult from ..translation_result_builder import TranslationResultBuilder from ..translation_sources import TranslationSources @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) -class HuggingFaceNmtEngine(NmtTranslationEngine): +class HuggingFaceNmtEngine(TranslationEngine): def __init__( self, model: Union[PreTrainedModel, StrPath, str], @@ -63,11 +63,7 @@ def __init__( ): raise ValueError(f"'{tgt_lang}' is not a valid language code.") - batch_size = self._pipeline_kwargs.pop("batch_size") - if batch_size is not None: - self._batch_size = int(batch_size) # type: ignore[assignment] - else: - self._batch_size = 16 + self._batch_size = int(self._pipeline_kwargs.pop("batch_size", 1)) # If not set, default to not backing off (1.0). self._oom_batch_size_backoff_multiplier = self._pipeline_kwargs.pop("oom_batch_size_backoff_multiplier", 1.0) @@ -88,9 +84,6 @@ def translate_n(self, n: int, segment: Union[str, Sequence[str]]) -> Sequence[Tr def translate_batch(self, segments: Sequence[Union[str, Sequence[str]]]) -> Sequence[TranslationResult]: return [results[0] for results in self.translate_n_batch(1, segments)] - def get_batch_size(self) -> int: - return self._batch_size - def translate_n_batch( self, n: int, segments: Sequence[Union[str, Sequence[str]]] ) -> Sequence[Sequence[TranslationResult]]: @@ -106,13 +99,19 @@ def translate_n_batch( all_results.extend(self._try_translate_n_batch(n, segments[step : step + self._batch_size])) return all_results except Exception as e: + # The out or memory error is not inherited from if self._oom_batch_size_backoff_multiplier >= 0.9999: raise Exception( - "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier to < 1 to gracefuly handle these type of errors." + "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier " + + "to < 1 to gracefuly handle these type of errors." ) from e + if self._batch_size == 1: + # Could it be another error? + raise e self._batch_size = max(int(round(self._batch_size * self._oom_batch_size_backoff_multiplier)), 1) logger.info( - f"Out of memory error caught, reducing batch size to {self._batch_size}. Remaking translation pipeline." + f"Out of memory error caught with message {e.args[0]}, reducing batch size to {self._batch_size}. " + + "Remaking translation pipeline." ) self._pipeline = _TranslationPipeline( model=self._model, diff --git a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py index 07b04bc..e4be94a 100644 --- a/machine/translation/huggingface/hugging_face_nmt_model_trainer.py +++ b/machine/translation/huggingface/hugging_face_nmt_model_trainer.py @@ -134,7 +134,6 @@ def train( # Set seed before initializing model. set_seed(self._training_args.seed) - logger.info("Initializing tokenizer.") if isinstance(self._model, PreTrainedModel): model = self._model self._original_use_cache = model.config.use_cache @@ -148,6 +147,8 @@ def train( num_labels=0, ) model = cast(PreTrainedModel, AutoModelForSeq2SeqLM.from_pretrained(self._model, config=config)) + + logger.info("Initializing tokenizer") tokenizer = AutoTokenizer.from_pretrained(model.name_or_path, use_fast=True) src_lang = self._src_lang @@ -194,8 +195,8 @@ def add_tokens(tokenizer: Any, missing_tokens: List[str]) -> Any: logger.info(f"Added {len(missing_tokens)} tokens to the tokenizer: {missing_tokens}") return AutoTokenizer.from_pretrained(str(tokenizer_dir), use_fast=True) - logger.info("Checking for missing tokens.") if self._add_unk_src_tokens or self._add_unk_trg_tokens: + logger.info("Checking for missing tokens") if not isinstance(tokenizer, PreTrainedTokenizerFast): logger.warning( f"Tokenizer can not be updated from default configuration: \ @@ -235,8 +236,8 @@ def add_lang_code_to_tokenizer(tokenizer: Any, lang_code: str): tokenizer.lang_token_to_id[lang_code] = lang_id tokenizer.id_to_lang_token[lang_id] = lang_code - logger.info("Add new language codes as tokens.") if isinstance(tokenizer, MULTILINGUAL_TOKENIZERS): + logger.info("Add new language codes as tokens") if self._src_lang is not None: add_lang_code_to_tokenizer(tokenizer, self._src_lang) if self._tgt_lang is not None: @@ -312,7 +313,7 @@ def preprocess_function(examples): model_inputs["labels"] = labels["input_ids"] return model_inputs - logger.info("Run tokenizer.") + logger.info("Run tokenizer") train_dataset = train_dataset.map( preprocess_function, batched=True, @@ -343,7 +344,7 @@ def preprocess_function(examples): ], ) - logger.info("Train NMT model.") + logger.info("Train NMT model") ckpt = None if self._training_args.resume_from_checkpoint is not None: ckpt = self._training_args.resume_from_checkpoint @@ -357,7 +358,7 @@ def preprocess_function(examples): self._metrics["train_samples"] = len(train_dataset) self._trainer.log_metrics("train", self._metrics) - logger.info("Model training finished.") + logger.info("Model training finished") def save(self) -> None: if self._trainer is None: diff --git a/machine/translation/nmt_translation_engine.py b/machine/translation/nmt_translation_engine.py deleted file mode 100644 index c486a07..0000000 --- a/machine/translation/nmt_translation_engine.py +++ /dev/null @@ -1,12 +0,0 @@ -from __future__ import annotations - -from abc import abstractmethod -from typing import ContextManager - -from .translation_engine import TranslationEngine - - -class NmtTranslationEngine(TranslationEngine, ContextManager["NmtTranslationEngine"]): - @abstractmethod - def get_batch_size(self) -> int: - ... diff --git a/tests/jobs/test_nmt_engine_build_job.py b/tests/jobs/test_nmt_engine_build_job.py index 142149f..b3a1bf6 100644 --- a/tests/jobs/test_nmt_engine_build_job.py +++ b/tests/jobs/test_nmt_engine_build_job.py @@ -10,7 +10,7 @@ from machine.corpora import DictionaryTextCorpus from machine.jobs import NmtEngineBuildJob, NmtModelFactory, PretranslationInfo, PretranslationWriter, SharedFileService from machine.translation import Phrase, Trainer, TrainStats, TranslationResult, TranslationSources, WordAlignmentMatrix -from machine.translation.nmt_translation_engine import NmtTranslationEngine +from machine.translation.translation_engine import TranslationEngine from machine.utils import CanceledError, ContextManagedGenerator @@ -45,9 +45,8 @@ def __init__(self, decoy: Decoy) -> None: stats.metrics["bleu"] = 30.0 decoy.when(self.model_trainer.stats).then_return(stats) - self.engine = decoy.mock(cls=NmtTranslationEngine) + self.engine = decoy.mock(cls=TranslationEngine) decoy.when(self.engine.__enter__()).then_return(self.engine) - decoy.when(self.engine.get_batch_size()).then_return(16) decoy.when(self.engine.translate_batch(matchers.Anything())).then_return( [ TranslationResult( From 1ea55fad8b02db75387f29ae75616cf6dff12535 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Tue, 21 Nov 2023 11:28:22 -0500 Subject: [PATCH 6/8] Add fixme --- machine/translation/huggingface/hugging_face_nmt_engine.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index dd137f6..0b0b40b 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -101,6 +101,8 @@ def translate_n_batch( except Exception as e: # The out or memory error is not inherited from if self._oom_batch_size_backoff_multiplier >= 0.9999: + # FIXME after upgrading to Pytorch 2.1, this should be changed to OutOfMemoryError + # see https://github.com/sillsdev/machine.py/issues/67 raise Exception( "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier " + "to < 1 to gracefuly handle these type of errors." From 6e1a8f830b7806ec41946c30fb973395d17b16a9 Mon Sep 17 00:00:00 2001 From: Damien Daspit Date: Tue, 21 Nov 2023 19:12:07 -0500 Subject: [PATCH 7/8] Use OutOfMemoryError to backoff of batch size --- .../huggingface/hugging_face_nmt_engine.py | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index 0b0b40b..ba1f49d 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -25,6 +25,7 @@ class HuggingFaceNmtEngine(TranslationEngine): def __init__( self, model: Union[PreTrainedModel, StrPath, str], + oom_batch_size_backoff_multiplier: float = 1.0, **pipeline_kwargs, ) -> None: self._model = model @@ -65,8 +66,7 @@ def __init__( self._batch_size = int(self._pipeline_kwargs.pop("batch_size", 1)) - # If not set, default to not backing off (1.0). - self._oom_batch_size_backoff_multiplier = self._pipeline_kwargs.pop("oom_batch_size_backoff_multiplier", 1.0) + self._oom_batch_size_backoff_multiplier = oom_batch_size_backoff_multiplier self._pipeline = _TranslationPipeline( model=self._model, @@ -98,23 +98,11 @@ def translate_n_batch( for step in range(0, outer_batch_size, self._batch_size): all_results.extend(self._try_translate_n_batch(n, segments[step : step + self._batch_size])) return all_results - except Exception as e: - # The out or memory error is not inherited from - if self._oom_batch_size_backoff_multiplier >= 0.9999: - # FIXME after upgrading to Pytorch 2.1, this should be changed to OutOfMemoryError - # see https://github.com/sillsdev/machine.py/issues/67 - raise Exception( - "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier " - + "to < 1 to gracefuly handle these type of errors." - ) from e - if self._batch_size == 1: - # Could it be another error? - raise e + except torch.cuda.OutOfMemoryError: # type: ignore[reportGeneralTypeIssues] + if self._oom_batch_size_backoff_multiplier >= 0.9999 or self._batch_size == 1: + raise self._batch_size = max(int(round(self._batch_size * self._oom_batch_size_backoff_multiplier)), 1) - logger.info( - f"Out of memory error caught with message {e.args[0]}, reducing batch size to {self._batch_size}. " - + "Remaking translation pipeline." - ) + logger.warn(f"Out of memory error caught, reducing batch size to {self._batch_size} and retrying.") self._pipeline = _TranslationPipeline( model=self._model, tokenizer=self._tokenizer, From 16203fc6cf4d7edc470a21cabced156a4239ed06 Mon Sep 17 00:00:00 2001 From: John Lambert Date: Tue, 21 Nov 2023 20:53:07 -0500 Subject: [PATCH 8/8] Fix small error. Change to real error - and suppress warnings change name to oom_batch_size_backoff_mult --- .../hugging_face_nmt_model_factory.py | 1 + machine/jobs/settings.yaml | 4 +-- .../huggingface/hugging_face_nmt_engine.py | 25 ++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py index d17fe25..e97bfb9 100644 --- a/machine/jobs/huggingface/hugging_face_nmt_model_factory.py +++ b/machine/jobs/huggingface/hugging_face_nmt_model_factory.py @@ -83,6 +83,7 @@ def create_engine(self, half_previous_batch_size=False) -> TranslationEngine: num_beams=self._config.huggingface.generate_params.num_beams, batch_size=self._config.huggingface.generate_params.batch_size, truncation=TruncationStrategy.LONGEST_FIRST, + oom_batch_size_backoff_mult=self._config.huggingface.generate_params.oom_batch_size_backoff_mult, ) def save_model(self) -> None: diff --git a/machine/jobs/settings.yaml b/machine/jobs/settings.yaml index 1343812..6c00382 100644 --- a/machine/jobs/settings.yaml +++ b/machine/jobs/settings.yaml @@ -3,7 +3,6 @@ default: max_steps: 20000 data_dir: ~/machine batch_size: 1024 - oom_batch_size_backoff_multiplier: 0.5 huggingface: parent_model_name: facebook/nllb-200-distilled-1.3B train_params: @@ -21,6 +20,7 @@ default: device: 0 num_beams: 2 batch_size: 16 + oom_batch_size_backoff_mult: 0.5 tokenizer: add_unk_src_tokens: true add_unk_trg_tokens: true @@ -35,4 +35,4 @@ staging: huggingface: parent_model_name: facebook/nllb-200-distilled-600M generate_params: - num_beams: 1 + num_beams: 1 \ No newline at end of file diff --git a/machine/translation/huggingface/hugging_face_nmt_engine.py b/machine/translation/huggingface/hugging_face_nmt_engine.py index 0b0b40b..5f88086 100644 --- a/machine/translation/huggingface/hugging_face_nmt_engine.py +++ b/machine/translation/huggingface/hugging_face_nmt_engine.py @@ -66,7 +66,7 @@ def __init__( self._batch_size = int(self._pipeline_kwargs.pop("batch_size", 1)) # If not set, default to not backing off (1.0). - self._oom_batch_size_backoff_multiplier = self._pipeline_kwargs.pop("oom_batch_size_backoff_multiplier", 1.0) + self._oom_batch_size_backoff_mult = float(self._pipeline_kwargs.pop("oom_batch_size_backoff_mult", 1.0)) self._pipeline = _TranslationPipeline( model=self._model, @@ -98,22 +98,23 @@ def translate_n_batch( for step in range(0, outer_batch_size, self._batch_size): all_results.extend(self._try_translate_n_batch(n, segments[step : step + self._batch_size])) return all_results - except Exception as e: + except torch.cuda.OutOfMemoryError as e: # type: ignore[reportGeneralTypeIssues] # The out or memory error is not inherited from - if self._oom_batch_size_backoff_multiplier >= 0.9999: + if self._oom_batch_size_backoff_mult >= 0.9999: # FIXME after upgrading to Pytorch 2.1, this should be changed to OutOfMemoryError # see https://github.com/sillsdev/machine.py/issues/67 - raise Exception( - "Likely an Out of Memory Error. Change oom_batch_size_backoff_multiplier " + raise torch.cuda.OutOfMemoryError( + "Out of Memory Error. Change oom_batch_size_backoff_mult " + "to < 1 to gracefuly handle these type of errors." - ) from e - if self._batch_size == 1: - # Could it be another error? - raise e - self._batch_size = max(int(round(self._batch_size * self._oom_batch_size_backoff_multiplier)), 1) + ) from e # type: ignore[reportGeneralTypeIssues] + if self._batch_size <= 1: + raise torch.cuda.OutOfMemoryError( + "Out of Memory Error. Batch size already at 1." + ) from e # type: ignore[reportGeneralTypeIssues] + self._batch_size = max(int(round(self._batch_size * self._oom_batch_size_backoff_mult)), 1) logger.info( - f"Out of memory error caught with message {e.args[0]}, reducing batch size to {self._batch_size}. " - + "Remaking translation pipeline." + f"Out of memory error caught with message: {e.args[0]}. " # type: ignore[reportGeneralTypeIssues] + + f"\nReducing batch size to {self._batch_size} and remaking translation pipeline." ) self._pipeline = _TranslationPipeline( model=self._model,