From 996996484ac879672d96d43a6068ed10ee3fc1d1 Mon Sep 17 00:00:00 2001 From: AnyISalIn Date: Tue, 16 Mar 2021 10:14:15 +0800 Subject: [PATCH] Improve PMMLServer predict performance (#1405) --- docs/samples/v1beta1/pmml/README.md | 9 ++++++- pkg/apis/serving/v1beta1/component.go | 26 ++++++++++++++++++ .../inference_service_validation_test.go | 27 +++++++++++++++++++ pkg/apis/serving/v1beta1/predictor_pmml.go | 9 ++----- .../serving/v1beta1/predictor_pmml_test.go | 1 - python/pmml.Dockerfile | 25 ++++++++++++++--- python/pmmlserver/pmmlserver/model.py | 23 ++++++++-------- python/pmmlserver/pmmlserver/test_model.py | 9 ++++++- python/pmmlserver/setup.py | 4 +-- test/e2e/predictor/test_pmml.py | 19 ++++++++----- 10 files changed, 119 insertions(+), 33 deletions(-) diff --git a/docs/samples/v1beta1/pmml/README.md b/docs/samples/v1beta1/pmml/README.md index e5e9dc04f58..a8912a28d50 100644 --- a/docs/samples/v1beta1/pmml/README.md +++ b/docs/samples/v1beta1/pmml/README.md @@ -4,6 +4,13 @@ To test the [PMMLServer](http://dmg.org/pmml/pmml_examples/#Iris) server, first # Predict on a InferenceService using PMMLServer +## Disadvantages + +Because the `pmmlserver` based on [Py4J](https://github.com/bartdag/py4j) and that isn't support multi-process mode. So we can't set `spec.predictor.containerConcurrency`. + +If you want to scale the PMMLServer to improve predict performance, you should to set the InferenceService's `resources.limits.cpu` to 1 and scale it replica size. + + ## Setup 1. Your ~/.kube/config should point to a cluster with [KFServing installed](https://github.com/kubeflow/kfserving/#install-kfserving). 2. Your cluster's Istio Ingress gateway must be [network accessible](https://istio.io/latest/docs/tasks/traffic-management/ingress/ingress-control/). @@ -50,6 +57,6 @@ Expected Output < x-envoy-upstream-service-time: 12 < * Connection #0 to host localhost left intact -{"predictions": [[1.0, 0.0, 0.0, "2"]]}* Closing connection 0 +{"predictions": [{'Species': 'setosa', 'Probability_setosa': 1.0, 'Probability_versicolor': 0.0, 'Probability_virginica': 0.0, 'Node_Id': '2'}]}* Closing connection 0 ``` diff --git a/pkg/apis/serving/v1beta1/component.go b/pkg/apis/serving/v1beta1/component.go index 41807957ef8..7f2b8441628 100644 --- a/pkg/apis/serving/v1beta1/component.go +++ b/pkg/apis/serving/v1beta1/component.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "regexp" + "strconv" "strings" "github.com/kubeflow/kfserving/pkg/constants" @@ -37,6 +38,8 @@ const ( UnsupportedStorageURIFormatError = "storageUri, must be one of: [%s] or match https://{}.blob.core.windows.net/{}/{} or be an absolute or relative local path. StorageUri [%s] is not supported." InvalidLoggerType = "Invalid logger type" InvalidISVCNameFormatError = "The InferenceService \"%s\" is invalid: a InferenceService name must consist of lower case alphanumeric characters or '-', and must start with alphabetical character. (e.g. \"my-name\" or \"abc-123\", regex used for validation is '%s')" + MaxWorkersShouldBeLessThanMaxError = "Workers cannot be greater than %d" + InvalidWorkerArgument = "Invalid workers argument" ) // Constants @@ -210,3 +213,26 @@ func ExactlyOneErrorFor(component Component) error { componentType.Name(), ) } + +// ValidateMaxArgumentWorkers will to validate illegal workers count. +func ValidateMaxArgumentWorkers(slice []string, maxWorkers int64) error { + for _, v := range slice { + + if strings.HasPrefix(v, "--workers") { + ret := strings.SplitN(v, "=", 2) + + if len(ret) == 2 { + workers, err := strconv.ParseInt(ret[1], 10, 64) + if err != nil { + return fmt.Errorf(InvalidWorkerArgument) + } + if workers > maxWorkers { + return fmt.Errorf(MaxWorkersShouldBeLessThanMaxError, maxWorkers) + } + } else { + return fmt.Errorf(InvalidWorkerArgument) + } + } + } + return nil +} diff --git a/pkg/apis/serving/v1beta1/inference_service_validation_test.go b/pkg/apis/serving/v1beta1/inference_service_validation_test.go index 445bbd727bc..a4c54677576 100644 --- a/pkg/apis/serving/v1beta1/inference_service_validation_test.go +++ b/pkg/apis/serving/v1beta1/inference_service_validation_test.go @@ -257,3 +257,30 @@ func TestRejectBadNameIncludeDot(t *testing.T) { isvc.Name = "abc.de" g.Expect(isvc.ValidateCreate()).ShouldNot(gomega.Succeed()) } + +func TestPMMLWorkersArguments(t *testing.T) { + g := gomega.NewGomegaWithT(t) + + isvc := InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + Spec: InferenceServiceSpec{ + Predictor: PredictorSpec{ + PMML: &PMMLSpec{ + PredictorExtensionSpec: PredictorExtensionSpec{ + StorageURI: proto.String("gs://testbucket/testmodel"), + }, + }, + }, + }, + } + + isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=2"} + g.Expect(isvc.ValidateCreate()).Should(gomega.MatchError(fmt.Sprintf(MaxWorkersShouldBeLessThanMaxError, 1))) + isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=foo"} + g.Expect(isvc.ValidateCreate()).Should(gomega.MatchError(InvalidWorkerArgument)) + isvc.Spec.Predictor.PMML.Container.Args = []string{"--workers=1"} + g.Expect(isvc.ValidateCreate()).Should(gomega.Succeed()) +} diff --git a/pkg/apis/serving/v1beta1/predictor_pmml.go b/pkg/apis/serving/v1beta1/predictor_pmml.go index 3f4090f1316..cab0cddae19 100644 --- a/pkg/apis/serving/v1beta1/predictor_pmml.go +++ b/pkg/apis/serving/v1beta1/predictor_pmml.go @@ -18,8 +18,6 @@ package v1beta1 import ( "fmt" - "strconv" - "github.com/golang/protobuf/proto" "github.com/kubeflow/kfserving/pkg/constants" "github.com/kubeflow/kfserving/pkg/utils" @@ -41,6 +39,7 @@ var ( // Validate returns an error if invalid func (p *PMMLSpec) Validate() error { return utils.FirstNonNilError([]error{ + ValidateMaxArgumentWorkers(p.Container.Args, 1), validateStorageURI(p.GetStorageUri()), }) } @@ -61,11 +60,7 @@ func (p *PMMLSpec) GetContainer(metadata metav1.ObjectMeta, extensions *Componen fmt.Sprintf("%s=%s", constants.ArgumentModelDir, constants.DefaultModelLocalMountPath), fmt.Sprintf("%s=%s", constants.ArgumentHttpPort, constants.InferenceServiceDefaultHttpPort), } - if !utils.IncludesArg(p.Container.Args, constants.ArgumentWorkers) { - if extensions.ContainerConcurrency != nil { - arguments = append(arguments, fmt.Sprintf("%s=%s", constants.ArgumentWorkers, strconv.FormatInt(*extensions.ContainerConcurrency, 10))) - } - } + if p.Container.Image == "" { p.Container.Image = config.Predictors.PMML.ContainerImage + ":" + *p.RuntimeVersion } diff --git a/pkg/apis/serving/v1beta1/predictor_pmml_test.go b/pkg/apis/serving/v1beta1/predictor_pmml_test.go index b1e5495283d..3a0c14ab5e3 100644 --- a/pkg/apis/serving/v1beta1/predictor_pmml_test.go +++ b/pkg/apis/serving/v1beta1/predictor_pmml_test.go @@ -299,7 +299,6 @@ func TestCreatePMMLModelServingContainer(t *testing.T) { "--model_name=someName", "--model_dir=/mnt/models", "--http_port=8080", - "--workers=1", }, }, }, diff --git a/python/pmml.Dockerfile b/python/pmml.Dockerfile index 42a396ed743..50fa78ee26e 100644 --- a/python/pmml.Dockerfile +++ b/python/pmml.Dockerfile @@ -1,12 +1,31 @@ FROM openjdk:11-slim -RUN apt update && apt install -y python3-minimal python3-pip && rm -rf /var/lib/apt/lists/* +ARG PYTHON_VERSION=3.7 +ARG CONDA_PYTHON_VERSION=3 +ARG CONDA_DIR=/opt/conda + +# Install basic utilities +RUN apt-get update && \ + apt-get install -y --no-install-recommends git wget unzip bzip2 build-essential ca-certificates && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Install miniconda +ENV PATH $CONDA_DIR/bin:$PATH +RUN wget --quiet https://repo.continuum.io/miniconda/Miniconda$CONDA_PYTHON_VERSION-latest-Linux-x86_64.sh -O /tmp/miniconda.sh && \ + echo 'export PATH=$CONDA_DIR/bin:$PATH' > /etc/profile.d/conda.sh && \ + /bin/bash /tmp/miniconda.sh -b -p $CONDA_DIR && \ + rm -rf /tmp/* && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +RUN conda install -y python=$PYTHON_VERSION COPY pmmlserver pmmlserver COPY kfserving kfserving -RUN pip3 install --upgrade pip && pip3 install -e ./kfserving -RUN pip3 install -e ./pmmlserver +RUN pip install --upgrade pip && pip3 install -e ./kfserving +RUN pip install -e ./pmmlserver COPY third_party third_party ENTRYPOINT ["python3", "-m", "pmmlserver"] diff --git a/python/pmmlserver/pmmlserver/model.py b/python/pmmlserver/pmmlserver/model.py index bdc7c6e1885..c726dbdfcc2 100644 --- a/python/pmmlserver/pmmlserver/model.py +++ b/python/pmmlserver/pmmlserver/model.py @@ -13,12 +13,11 @@ # limitations under the License. import os -from typing import Dict import kfserving -import numpy as np -from pypmml import Model -from py4j.java_collections import JavaList +from jpmml_evaluator import make_evaluator +from jpmml_evaluator.py4j import launch_gateway, Py4JBackend +from typing import Dict MODEL_BASENAME = "model" @@ -31,6 +30,10 @@ def __init__(self, name: str, model_dir: str): self.name = name self.model_dir = model_dir self.ready = False + self.evaluator = None + self.input_fields = [] + self._gateway = None + self._backend = None def load(self) -> bool: model_path = kfserving.Storage.download(self.model_dir) @@ -38,7 +41,10 @@ def load(self) -> bool: for model_extension in MODEL_EXTENSIONS] for path in paths: if os.path.exists(path): - self._model = Model.load(path) + self._gateway = launch_gateway() + self._backend = Py4JBackend(self._gateway) + self.evaluator = make_evaluator(self._backend, path).verify() + self.input_fields = [inputField.getName() for inputField in self.evaluator.getInputFields()] self.ready = True break return self.ready @@ -46,12 +52,7 @@ def load(self) -> bool: def predict(self, request: Dict) -> Dict: instances = request["instances"] try: - inputs = np.array(instances) - except Exception as e: - raise Exception( - "Failed to initialize NumPy array from inputs: %s, %s" % (e, instances)) - try: - result = [list(_) for _ in self._model.predict(inputs) if isinstance(_, JavaList)] + result = [self.evaluator.evaluate(dict(zip(self.input_fields, instance))) for instance in instances] return {"predictions": result} except Exception as e: raise Exception("Failed to predict %s" % e) diff --git a/python/pmmlserver/pmmlserver/test_model.py b/python/pmmlserver/pmmlserver/test_model.py index 0106e450669..0ef326de60b 100644 --- a/python/pmmlserver/pmmlserver/test_model.py +++ b/python/pmmlserver/pmmlserver/test_model.py @@ -25,4 +25,11 @@ def test_model(): request = {"instances": [[5.1, 3.5, 1.4, 0.2]]} response = server.predict(request) - assert isinstance(response["predictions"][0], list) + expect_result = {'Species': 'setosa', + 'Probability_setosa': 1.0, + 'Probability_versicolor': 0.0, + 'Probability_virginica': 0.0, + 'Node_Id': '2'} + + assert isinstance(response["predictions"][0], dict) + assert response["predictions"][0] == expect_result diff --git a/python/pmmlserver/setup.py b/python/pmmlserver/setup.py index f7d6c844704..08a84ff7ea8 100644 --- a/python/pmmlserver/setup.py +++ b/python/pmmlserver/setup.py @@ -22,7 +22,7 @@ ] setup( name='pmmlserver', - version='0.5.0', + version='0.5.1', author_email='anyisalin@gmail.com', license='https://github.com/kubeflow/kfserving/LICENSE', url='https://github.com/kubeflow/kfserving/python/pmmlserver', @@ -33,7 +33,7 @@ packages=find_packages("pmmlserver"), install_requires=[ "kfserving>=0.5.1", - "pypmml == 0.9.7", + "jpmml-evaluator==0.5.1", ], tests_require=tests_require, extras_require={'test': tests_require} diff --git a/test/e2e/predictor/test_pmml.py b/test/e2e/predictor/test_pmml.py index b8faaafff6c..561a03be885 100644 --- a/test/e2e/predictor/test_pmml.py +++ b/test/e2e/predictor/test_pmml.py @@ -13,18 +13,19 @@ # limitations under the License. import os -from kubernetes import client + from kfserving import KFServingClient -from kfserving import constants from kfserving import V1alpha2EndpointSpec -from kfserving import V1alpha2PredictorSpec -from kfserving import V1alpha2PMMLSpec -from kfserving import V1alpha2InferenceServiceSpec from kfserving import V1alpha2InferenceService +from kfserving import V1alpha2InferenceServiceSpec +from kfserving import V1alpha2PMMLSpec +from kfserving import V1alpha2PredictorSpec +from kfserving import constants +from kubernetes import client from kubernetes.client import V1ResourceRequirements -from ..common.utils import predict from ..common.utils import KFSERVING_TEST_NAMESPACE +from ..common.utils import predict api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION KFServing = KFServingClient(config_file=os.environ.get("KUBECONFIG", "~/.kube/config")) @@ -50,5 +51,9 @@ def test_pmml_kfserving(): KFServing.create(isvc) KFServing.wait_isvc_ready(service_name, namespace=KFSERVING_TEST_NAMESPACE) res = predict(service_name, './data/pmml_input.json') - assert(res["predictions"] == [[1.0, 0.0, 0.0, "2"]]) + assert (res["predictions"] == [{'Species': 'setosa', + 'Probability_setosa': 1.0, + 'Probability_versicolor': 0.0, + 'Probability_virginica': 0.0, + 'Node_Id': '2'}]) KFServing.delete(service_name, KFSERVING_TEST_NAMESPACE)