Skip to content

Commit

Permalink
Improve PMMLServer predict performance (kubeflow#1405)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnyISalIn authored Mar 16, 2021
1 parent 77ba3f2 commit 9969964
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 33 deletions.
9 changes: 8 additions & 1 deletion docs/samples/v1beta1/pmml/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ To test the [PMMLServer](http://dmg.org/pmml/pmml_examples/#Iris) server, first

# Predict on a InferenceService using PMMLServer

## Disadvantages

Because the `pmmlserver` based on [Py4J](https://github.com/bartdag/py4j) and that isn't support multi-process mode. So we can't set `spec.predictor.containerConcurrency`.

If you want to scale the PMMLServer to improve predict performance, you should to set the InferenceService's `resources.limits.cpu` to 1 and scale it replica size.


## Setup
1. Your ~/.kube/config should point to a cluster with [KFServing installed](https://github.com/kubeflow/kfserving/#install-kfserving).
2. Your cluster's Istio Ingress gateway must be [network accessible](https://istio.io/latest/docs/tasks/traffic-management/ingress/ingress-control/).
Expand Down Expand Up @@ -50,6 +57,6 @@ Expected Output
< x-envoy-upstream-service-time: 12
<
* Connection #0 to host localhost left intact
{"predictions": [[1.0, 0.0, 0.0, "2"]]}* Closing connection 0
{"predictions": [{'Species': 'setosa', 'Probability_setosa': 1.0, 'Probability_versicolor': 0.0, 'Probability_virginica': 0.0, 'Node_Id': '2'}]}* Closing connection 0
```

26 changes: 26 additions & 0 deletions pkg/apis/serving/v1beta1/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"

"github.com/kubeflow/kfserving/pkg/constants"
Expand All @@ -37,6 +38,8 @@ const (
UnsupportedStorageURIFormatError = "storageUri, must be one of: [%s] or match https://{}.blob.core.windows.net/{}/{} or be an absolute or relative local path. StorageUri [%s] is not supported."
InvalidLoggerType = "Invalid logger type"
InvalidISVCNameFormatError = "The InferenceService \"%s\" is invalid: a InferenceService name must consist of lower case alphanumeric characters or '-', and must start with alphabetical character. (e.g. \"my-name\" or \"abc-123\", regex used for validation is '%s')"
MaxWorkersShouldBeLessThanMaxError = "Workers cannot be greater than %d"
InvalidWorkerArgument = "Invalid workers argument"
)

// Constants
Expand Down Expand Up @@ -210,3 +213,26 @@ func ExactlyOneErrorFor(component Component) error {
componentType.Name(),
)
}

// ValidateMaxArgumentWorkers will to validate illegal workers count.
func ValidateMaxArgumentWorkers(slice []string, maxWorkers int64) error {
for _, v := range slice {

if strings.HasPrefix(v, "--workers") {
ret := strings.SplitN(v, "=", 2)

if len(ret) == 2 {
workers, err := strconv.ParseInt(ret[1], 10, 64)
if err != nil {
return fmt.Errorf(InvalidWorkerArgument)
}
if workers > maxWorkers {
return fmt.Errorf(MaxWorkersShouldBeLessThanMaxError, maxWorkers)
}
} else {
return fmt.Errorf(InvalidWorkerArgument)
}
}
}
return nil
}
27 changes: 27 additions & 0 deletions pkg/apis/serving/v1beta1/inference_service_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,30 @@ func TestRejectBadNameIncludeDot(t *testing.T) {
isvc.Name = "abc.de"
g.Expect(isvc.ValidateCreate()).ShouldNot(gomega.Succeed())
}

func TestPMMLWorkersArguments(t *testing.T) {
g := gomega.NewGomegaWithT(t)

isvc := InferenceService{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: InferenceServiceSpec{
Predictor: PredictorSpec{
PMML: &PMMLSpec{
PredictorExtensionSpec: PredictorExtensionSpec{
StorageURI: proto.String("gs://testbucket/testmodel"),
},
},
},
},
}

isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=2"}
g.Expect(isvc.ValidateCreate()).Should(gomega.MatchError(fmt.Sprintf(MaxWorkersShouldBeLessThanMaxError, 1)))
isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=foo"}
g.Expect(isvc.ValidateCreate()).Should(gomega.MatchError(InvalidWorkerArgument))
isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=1"}
g.Expect(isvc.ValidateCreate()).Should(gomega.Succeed())
}
9 changes: 2 additions & 7 deletions pkg/apis/serving/v1beta1/predictor_pmml.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package v1beta1

import (
"fmt"
"strconv"

"github.com/golang/protobuf/proto"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/kubeflow/kfserving/pkg/utils"
Expand All @@ -41,6 +39,7 @@ var (
// Validate returns an error if invalid
func (p *PMMLSpec) Validate() error {
return utils.FirstNonNilError([]error{
ValidateMaxArgumentWorkers(p.Container.Args, 1),
validateStorageURI(p.GetStorageUri()),
})
}
Expand All @@ -61,11 +60,7 @@ func (p *PMMLSpec) GetContainer(metadata metav1.ObjectMeta, extensions *Componen
fmt.Sprintf("%s=%s", constants.ArgumentModelDir, constants.DefaultModelLocalMountPath),
fmt.Sprintf("%s=%s", constants.ArgumentHttpPort, constants.InferenceServiceDefaultHttpPort),
}
if !utils.IncludesArg(p.Container.Args, constants.ArgumentWorkers) {
if extensions.ContainerConcurrency != nil {
arguments = append(arguments, fmt.Sprintf("%s=%s", constants.ArgumentWorkers, strconv.FormatInt(*extensions.ContainerConcurrency, 10)))
}
}

if p.Container.Image == "" {
p.Container.Image = config.Predictors.PMML.ContainerImage + ":" + *p.RuntimeVersion
}
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/serving/v1beta1/predictor_pmml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func TestCreatePMMLModelServingContainer(t *testing.T) {
"--model_name=someName",
"--model_dir=/mnt/models",
"--http_port=8080",
"--workers=1",
},
},
},
Expand Down
25 changes: 22 additions & 3 deletions python/pmml.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
FROM openjdk:11-slim

RUN apt update && apt install -y python3-minimal python3-pip && rm -rf /var/lib/apt/lists/*
ARG PYTHON_VERSION=3.7
ARG CONDA_PYTHON_VERSION=3
ARG CONDA_DIR=/opt/conda

# Install basic utilities
RUN apt-get update && \
apt-get install -y --no-install-recommends git wget unzip bzip2 build-essential ca-certificates && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Install miniconda
ENV PATH $CONDA_DIR/bin:$PATH
RUN wget --quiet https://repo.continuum.io/miniconda/Miniconda$CONDA_PYTHON_VERSION-latest-Linux-x86_64.sh -O /tmp/miniconda.sh && \
echo 'export PATH=$CONDA_DIR/bin:$PATH' > /etc/profile.d/conda.sh && \
/bin/bash /tmp/miniconda.sh -b -p $CONDA_DIR && \
rm -rf /tmp/* && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN conda install -y python=$PYTHON_VERSION

COPY pmmlserver pmmlserver
COPY kfserving kfserving

RUN pip3 install --upgrade pip && pip3 install -e ./kfserving
RUN pip3 install -e ./pmmlserver
RUN pip install --upgrade pip && pip3 install -e ./kfserving
RUN pip install -e ./pmmlserver
COPY third_party third_party

ENTRYPOINT ["python3", "-m", "pmmlserver"]
23 changes: 12 additions & 11 deletions python/pmmlserver/pmmlserver/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
# limitations under the License.

import os
from typing import Dict

import kfserving
import numpy as np
from pypmml import Model
from py4j.java_collections import JavaList
from jpmml_evaluator import make_evaluator
from jpmml_evaluator.py4j import launch_gateway, Py4JBackend
from typing import Dict

MODEL_BASENAME = "model"

Expand All @@ -31,27 +30,29 @@ def __init__(self, name: str, model_dir: str):
self.name = name
self.model_dir = model_dir
self.ready = False
self.evaluator = None
self.input_fields = []
self._gateway = None
self._backend = None

def load(self) -> bool:
model_path = kfserving.Storage.download(self.model_dir)
paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
for model_extension in MODEL_EXTENSIONS]
for path in paths:
if os.path.exists(path):
self._model = Model.load(path)
self._gateway = launch_gateway()
self._backend = Py4JBackend(self._gateway)
self.evaluator = make_evaluator(self._backend, path).verify()
self.input_fields = [inputField.getName() for inputField in self.evaluator.getInputFields()]
self.ready = True
break
return self.ready

def predict(self, request: Dict) -> Dict:
instances = request["instances"]
try:
inputs = np.array(instances)
except Exception as e:
raise Exception(
"Failed to initialize NumPy array from inputs: %s, %s" % (e, instances))
try:
result = [list(_) for _ in self._model.predict(inputs) if isinstance(_, JavaList)]
result = [self.evaluator.evaluate(dict(zip(self.input_fields, instance))) for instance in instances]
return {"predictions": result}
except Exception as e:
raise Exception("Failed to predict %s" % e)
9 changes: 8 additions & 1 deletion python/pmmlserver/pmmlserver/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,11 @@ def test_model():

request = {"instances": [[5.1, 3.5, 1.4, 0.2]]}
response = server.predict(request)
assert isinstance(response["predictions"][0], list)
expect_result = {'Species': 'setosa',
'Probability_setosa': 1.0,
'Probability_versicolor': 0.0,
'Probability_virginica': 0.0,
'Node_Id': '2'}

assert isinstance(response["predictions"][0], dict)
assert response["predictions"][0] == expect_result
4 changes: 2 additions & 2 deletions python/pmmlserver/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
]
setup(
name='pmmlserver',
version='0.5.0',
version='0.5.1',
author_email='[email protected]',
license='https://github.com/kubeflow/kfserving/LICENSE',
url='https://github.com/kubeflow/kfserving/python/pmmlserver',
Expand All @@ -33,7 +33,7 @@
packages=find_packages("pmmlserver"),
install_requires=[
"kfserving>=0.5.1",
"pypmml == 0.9.7",
"jpmml-evaluator==0.5.1",
],
tests_require=tests_require,
extras_require={'test': tests_require}
Expand Down
19 changes: 12 additions & 7 deletions test/e2e/predictor/test_pmml.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
# limitations under the License.

import os
from kubernetes import client

from kfserving import KFServingClient
from kfserving import constants
from kfserving import V1alpha2EndpointSpec
from kfserving import V1alpha2PredictorSpec
from kfserving import V1alpha2PMMLSpec
from kfserving import V1alpha2InferenceServiceSpec
from kfserving import V1alpha2InferenceService
from kfserving import V1alpha2InferenceServiceSpec
from kfserving import V1alpha2PMMLSpec
from kfserving import V1alpha2PredictorSpec
from kfserving import constants
from kubernetes import client
from kubernetes.client import V1ResourceRequirements

from ..common.utils import predict
from ..common.utils import KFSERVING_TEST_NAMESPACE
from ..common.utils import predict

api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION
KFServing = KFServingClient(config_file=os.environ.get("KUBECONFIG", "~/.kube/config"))
Expand All @@ -50,5 +51,9 @@ def test_pmml_kfserving():
KFServing.create(isvc)
KFServing.wait_isvc_ready(service_name, namespace=KFSERVING_TEST_NAMESPACE)
res = predict(service_name, './data/pmml_input.json')
assert(res["predictions"] == [[1.0, 0.0, 0.0, "2"]])
assert (res["predictions"] == [{'Species': 'setosa',
'Probability_setosa': 1.0,
'Probability_versicolor': 0.0,
'Probability_virginica': 0.0,
'Node_Id': '2'}])
KFServing.delete(service_name, KFSERVING_TEST_NAMESPACE)

0 comments on commit 9969964

Please sign in to comment.