Skip to content

Commit

Permalink
add dynamic batching embedding/reranking (opea-project#774)
Browse files Browse the repository at this point in the history
* draft static batching embedding/reranking on single gaudi card

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

* resolve segfault, deadlock and other issues

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* narrow down default timeout

* add doockerfile

* fix hpu local microservice start

* openai format

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* configurable timeout

* lower timeout

* fix

* lower default timeout

* bf16

* log, pad max_len

* autocast, 128

* fix acc issue

* perf fallback with no acc drop

* revert no-padding ones

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix hpu graph wrapper

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add padding batch

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* habana 1.18

* static -> dynamic

* add UT, add param in_single_process

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add docker file

* fix case doc empty, and pass model id from env

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* CI

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
Co-authored-by: ZePan110 <[email protected]>
  • Loading branch information
4 people authored Nov 6, 2024
1 parent a8e5adc commit 518cdfb
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/docker/compose/embeddings-compose-cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ services:
build:
dockerfile: comps/embeddings/predictionguard/Dockerfile
image: ${REGISTRY:-opea}/embedding-predictionguard:${TAG:-latest}
embedding-reranking-local:
build:
dockerfile: comps/embeddings/tei/langchain/Dockerfile.dynamic_batching
image: ${REGISTRY:-opea}/embedding-reranking-local:${TAG:-latest}
69 changes: 66 additions & 3 deletions comps/cores/mega/micro_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@

import asyncio
import multiprocessing
import os
from collections import defaultdict, deque
from enum import Enum
from typing import Any, List, Optional, Type

from ..proto.docarray import TextDoc
from .constants import ServiceRoleType, ServiceType
from .logger import CustomLogger
from .utils import check_ports_availability

opea_microservices = {}

logger = CustomLogger("micro_service")
logflag = os.getenv("LOGFLAG", False)


class MicroService:
"""MicroService class to create a microservice."""
Expand All @@ -31,6 +38,9 @@ def __init__(
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
use_remote_service: Optional[bool] = False,
dynamic_batching: bool = False,
dynamic_batching_timeout: int = 1,
dynamic_batching_max_batch_size: int = 32,
):
"""Init the microservice."""
self.name = f"{name}/{self.__class__.__name__}" if name else self.__class__.__name__
Expand All @@ -43,6 +53,9 @@ def __init__(
self.input_datatype = input_datatype
self.output_datatype = output_datatype
self.use_remote_service = use_remote_service
self.dynamic_batching = dynamic_batching
self.dynamic_batching_timeout = dynamic_batching_timeout
self.dynamic_batching_max_batch_size = dynamic_batching_max_batch_size
self.uvicorn_kwargs = {}

if ssl_keyfile:
Expand All @@ -58,10 +71,50 @@ def __init__(

self.server = self._get_server()
self.app = self.server.app
# create a batch request processor loop if using dynamic batching
if self.dynamic_batching:
self.buffer_lock = asyncio.Lock()
self.request_buffer = defaultdict(deque)

@self.app.on_event("startup")
async def startup_event():
asyncio.create_task(self._dynamic_batch_processor())

self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_until_complete(self._async_setup())

async def _dynamic_batch_processor(self):
if logflag:
logger.info("dynamic batch processor looping...")
while True:
await asyncio.sleep(self.dynamic_batching_timeout)
runtime_batch: dict[Enum, list[dict]] = {} # {ServiceType.Embedding: [{"request": xx, "response": yy}, {}]}

async with self.buffer_lock:
# prepare the runtime batch, access to buffer is locked
if self.request_buffer:
for service_type, request_lst in self.request_buffer.items():
batch = []
# grab min(MAX_BATCH_SIZE, REQUEST_SIZE) requests from buffer
for _ in range(min(self.dynamic_batching_max_batch_size, len(request_lst))):
batch.append(request_lst.popleft())

runtime_batch[service_type] = batch

# Run batched inference on the batch and set results
for service_type, batch in runtime_batch.items():
if not batch:
continue
results = await self.dynamic_batching_infer(service_type, batch)

for req, result in zip(batch, results):
req["response"].set_result(result)

async def dynamic_batching_infer(self, service_type: Enum, batch: list[dict]):
"""Need to implement."""
raise NotImplementedError("Unimplemented dynamic batching inference!")

def _validate_env(self):
"""Check whether to use the microservice locally."""
if self.use_remote_service:
Expand Down Expand Up @@ -116,10 +169,14 @@ def run(self):
self._validate_env()
self.event_loop.run_until_complete(self._async_run_forever())

def start(self):
def start(self, in_single_process=False):
self._validate_env()
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
self.process.start()
if in_single_process:
# Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process
self.run()
else:
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
self.process.start()

async def _async_teardown(self):
"""Shutdown the server."""
Expand Down Expand Up @@ -155,6 +212,9 @@ def register_microservice(
provider: Optional[str] = None,
provider_endpoint: Optional[str] = None,
methods: List[str] = ["POST"],
dynamic_batching: bool = False,
dynamic_batching_timeout: int = 1,
dynamic_batching_max_batch_size: int = 32,
):
def decorator(func):
if name not in opea_microservices:
Expand All @@ -172,6 +232,9 @@ def decorator(func):
output_datatype=output_datatype,
provider=provider,
provider_endpoint=provider_endpoint,
dynamic_batching=dynamic_batching,
dynamic_batching_timeout=dynamic_batching_timeout,
dynamic_batching_max_batch_size=dynamic_batching_max_batch_size,
)
opea_microservices[name] = micro_service
opea_microservices[name].app.router.add_api_route(endpoint, func, methods=methods)
Expand Down
28 changes: 28 additions & 0 deletions comps/embeddings/tei/langchain/Dockerfile.dynamic_batching
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

# FROM opea/habanalabs:1.16.1-pytorch-installer-2.2.2 as hpu
FROM vault.habana.ai/gaudi-docker/1.18.0/ubuntu22.04/habanalabs/pytorch-installer-2.4.0:latest as hpu

RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
libgl1-mesa-glx \
libjemalloc-dev

RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
chown -R user /home/user/

# Disable user for now
# USER user

COPY comps /home/user/comps

RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /home/user/comps/embeddings/tei/langchain/requirements.txt && \
pip install git+https://github.com/huggingface/optimum-habana.git

ENV PYTHONPATH=$PYTHONPATH:/home/user

WORKDIR /home/user/comps/embeddings/tei/langchain

ENTRYPOINT ["python", "local_embedding_reranking.py"]
Loading

0 comments on commit 518cdfb

Please sign in to comment.