Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Implemented Hugging Face Model Handler #26632

Merged
merged 29 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
06af6b7
automodel first pass
riteshghorse May 10, 2023
416166d
new model
riteshghorse May 16, 2023
6f063e5
updated model handler api
riteshghorse Jun 21, 2023
df87366
add model_class param
riteshghorse Jun 23, 2023
4da7edd
Merge branch 'master' of https://github.com/apache/beam into hf-model…
riteshghorse Jun 23, 2023
025cc52
update doc comments
riteshghorse Jun 26, 2023
8cf7a01
Merge branch 'master' of https://github.com/apache/beam into hf-model…
riteshghorse Jun 26, 2023
2c671ab
updated integration test and example
riteshghorse Jun 26, 2023
abaeb2a
unit test, modified params
riteshghorse Jun 27, 2023
d5e1cf3
add test setup for hugging face tests
riteshghorse Jun 27, 2023
4177c09
fix lints
riteshghorse Jun 27, 2023
6324752
fix import order
riteshghorse Jun 27, 2023
30029d3
refactor, doc, lints
riteshghorse Jun 28, 2023
c60d312
refactor, doc comments
riteshghorse Jun 29, 2023
a52536f
change test file
riteshghorse Jun 29, 2023
496d205
update types
riteshghorse Jul 7, 2023
d7fd777
update tox, doc, lints
riteshghorse Jul 11, 2023
8544051
fix lints
riteshghorse Jul 11, 2023
20b1af2
pr type
riteshghorse Jul 11, 2023
6b7854d
Merge branch 'master' of https://github.com/apache/beam into hf-model…
riteshghorse Jul 12, 2023
85014a7
update gpu warnings
riteshghorse Jul 12, 2023
9dff2e3
fix pydoc
riteshghorse Jul 12, 2023
dc83ecd
update typos, refactor
riteshghorse Jul 18, 2023
441011f
fix docstrings
riteshghorse Jul 18, 2023
f7e974c
refactor, doc, lints
riteshghorse Jul 18, 2023
467e5dd
pydoc
riteshghorse Jul 18, 2023
1a022d9
fix pydoc
riteshghorse Jul 20, 2023
2f95adc
updates to keyed model handler
riteshghorse Jul 24, 2023
4bdab80
pylints
riteshghorse Jul 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions sdks/python/apache_beam/examples/inference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The RunInference API supports the Tensorflow framework. To use Tensorflow locall
pip install tensorflow==2.12.0
```


### PyTorch dependencies

The following installation requirements are for the files used in these examples.
Expand All @@ -65,6 +66,21 @@ For installation of the `torch` dependency on a distributed runner such as Dataf
[PyPI dependency instructions](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#pypi-dependencies).


### Transformers dependencies

The following installation requirement is for the Hugging Face model handler examples.

The RunInference API supports loading models from the Hugging Face Hub. To use it, first install `transformers`.
```
pip install transformers==4.30.0
```
Additional dependicies for PyTorch and TensorFlow may need to be installed separately:
```
pip install tensorflow==2.12.0
pip install torch==1.10.0
```


### TensorRT dependencies

The RunInference API supports TensorRT SDK for high-performance deep learning inference with NVIDIA GPUs.
Expand Down Expand Up @@ -687,3 +703,60 @@ MilkQualityAggregation(bad_quality_measurements=6, medium_quality_measurements=4
MilkQualityAggregation(bad_quality_measurements=3, medium_quality_measurements=3, high_quality_measurements=3)
MilkQualityAggregation(bad_quality_measurements=1, medium_quality_measurements=2, high_quality_measurements=1)
```

---
## Language modeling with Hugging Face Hub

[`huggingface_language_modeling.py`](./huggingface_language_modeling.py) contains an implementation for a RunInference pipeline that performs masked language modeling (that is, decoding a masked token in a sentence) using the `AutoModelForMaskedLM` architecture from Hugging Face.

The pipeline reads sentences, performs basic preprocessing to convert the last word into a `<mask>` token, passes the masked sentence to the Hugging Face implementation of RunInference, and then writes the predictions to a text file.

### Dataset and model for language modeling

To use this transform, you need a dataset and model for language modeling.

1. Choose a checkpoint to load from Hugging Face Hub, eg:[MaskedLanguageModel](https://huggingface.co/stevhliu/my_awesome_eli5_mlm_model).
2. (Optional) Create a file named `SENTENCES.txt` that contains sentences to feed into the model. The content of the file should be similar to the following example:
```
The capital of France is Paris .
He looked up and saw the sun and stars .
...
```

### Running `huggingface_language_modeling.py`

To run the language modeling pipeline locally, use the following command:
```sh
python -m apache_beam.examples.inference.huggingface_language_modeling \
--input SENTENCES \
--output OUTPUT \
--model_name REPOSITORY_ID
```
The `input` argument is optional. If none is provided, it will run the pipeline with some
example sentences.

For example, if you've followed the naming conventions recommended above:
```sh
python -m apache_beam.examples.inference.huggingface_language_modeling \
--input SENTENCES.txt \
--output predictions.csv \
--model_name "stevhliu/my_awesome_eli5_mlm_model"
```
Or, using the default example sentences:
```sh
python -m apache_beam.examples.inference.huggingface_language_modeling \
--output predictions.csv \
--model_name "stevhliu/my_awesome_eli5_mlm_model"
```

This writes the output to the `predictions.csv` with contents like:
```
The capital of France is Paris .;paris
He looked up and saw the sun and stars .;moon
...
```
Each line has data separated by a semicolon ";".
The first item is the input sentence. The model masks the last word and tries to predict it;
the second item is the word that the model predicts for the mask.

---
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A pipeline that uses RunInference to perform Language Modeling with
masked language model from Hugging Face.

This pipeline takes sentences from a custom text file, converts the last word
of the sentence into a <mask> token, and then uses the AutoModelForMaskedLM from
Hugging Face to predict the best word for the masked token given all the words
already in the sentence. The pipeline then writes the prediction to an output
file in which users can then compare against the original sentence.
"""

import argparse
import logging
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import Tuple

import apache_beam as beam
import torch
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners.runner import PipelineResult
from transformers import AutoModelForMaskedLM
from transformers import AutoTokenizer


def add_mask_to_last_word(text: str) -> Tuple[str, str]:
text_list = text.split()
return text, ' '.join(text_list[:-2] + ['<mask>', text_list[-1]])


def tokenize_sentence(
text_and_mask: Tuple[str, str],
tokenizer: AutoTokenizer) -> Tuple[str, Dict[str, torch.Tensor]]:
text, masked_text = text_and_mask
tokenized_sentence = tokenizer.encode_plus(masked_text, return_tensors="pt")

# Workaround to manually remove batch dim until we have the feature to
# add optional batching flag.
# TODO(https://github.com/apache/beam/issues/21863): Remove once optional
# batching flag added
return text, {
k: torch.squeeze(v)
for k, v in dict(tokenized_sentence).items()
}


def filter_empty_lines(text: str) -> Iterator[str]:
if len(text.strip()) > 0:
yield text


class PostProcessor(beam.DoFn):
"""Processes the PredictionResult to get the predicted word.

The logits are the output of the Model. We can get the word with the highest
probability of being a candidate replacement word by taking the argmax.
"""
def __init__(self, tokenizer: AutoTokenizer):
super().__init__()
self.tokenizer = tokenizer

def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
text, prediction_result = element
inputs = prediction_result.example
logits = prediction_result.inference['logits']
mask_token_index = torch.where(
inputs["input_ids"] == self.tokenizer.mask_token_id)[0]
predicted_token_id = logits[mask_token_index].argmax(axis=-1)
decoded_word = self.tokenizer.decode(predicted_token_id)
yield text + ';' + decoded_word


def parse_known_args(argv):
"""Parses args for the workflow."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Path to the text file containing sentences.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Path of file in which to save the output predictions.')
parser.add_argument(
'--model_name',
dest='model_name',
required=True,
help='bert uncased model. This can be base model or large model')
parser.add_argument(
'--model_class',
dest='model_class',
default=AutoModelForMaskedLM,
help="Name of the model from Hugging Face")
return parser.parse_known_args(argv)


def run(
argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
"""
Args:
argv: Command line arguments defined for this example.
save_main_session: Used for internal testing.
test_pipeline: Used for internal testing.
"""
known_args, pipeline_args = parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

pipeline = test_pipeline
if not test_pipeline:
pipeline = beam.Pipeline(options=pipeline_options)

tokenizer = AutoTokenizer.from_pretrained(known_args.model_name)

model_handler = HuggingFaceModelHandlerKeyedTensor(
model_uri=known_args.model_name,
model_class=known_args.model_class,
framework='pt',
max_batch_size=1)
if not known_args.input:
text = (
pipeline | 'CreateSentences' >> beam.Create([
'The capital of France is Paris .',
'It is raining cats and dogs .',
'Today is Monday and tomorrow is Tuesday .',
'There are 5 coconuts on this palm tree .',
'The strongest person in the world is not famous .',
'The secret ingredient to his wonderful life was gratitude .',
'The biggest animal in the world is the whale .',
]))
else:
text = (
pipeline | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
text_and_tokenized_text_tuple = (
text
| 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines)
| 'AddMask' >> beam.Map(add_mask_to_last_word)
|
'TokenizeSentence' >> beam.Map(lambda x: tokenize_sentence(x, tokenizer)))
output = (
text_and_tokenized_text_tuple
| 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
| 'ProcessOutput' >> beam.ParDo(PostProcessor(tokenizer=tokenizer)))
_ = output | "WriteOutput" >> beam.io.WriteToText(
known_args.output, shard_name_template='', append_trailing_newlines=True)

result = pipeline.run()
result.wait_until_finish()
return result


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Loading