Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][Frontend] Fix Issues Under High Load With zeromq Frontend #7394

Merged
merged 88 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
b2e29a5
added proxy to limit use of uniz sockets
robertgshaw2-redhat Aug 10, 2024
8d31115
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 10, 2024
6d2b3df
comment
robertgshaw2-redhat Aug 10, 2024
c73e943
use random inproc path
robertgshaw2-redhat Aug 10, 2024
f1768fb
format
robertgshaw2-redhat Aug 10, 2024
601a461
foamt
robertgshaw2-redhat Aug 10, 2024
1a47d94
format
robertgshaw2-redhat Aug 10, 2024
eeecb09
Update vllm/entrypoints/openai/rpc/client.py
robertgshaw2-redhat Aug 10, 2024
2770e40
cleaning
robertgshaw2-redhat Aug 14, 2024
5a85618
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 18, 2024
938db1d
Merge branch 'fix-zmq-max-sockets' of https://github.com/neuralmagic/…
robertgshaw2-redhat Aug 18, 2024
ea2f03e
remove logging
robertgshaw2-redhat Aug 18, 2024
5cebc65
add info message re: concurrency
robertgshaw2-redhat Aug 18, 2024
2c12436
update comment
robertgshaw2-redhat Aug 18, 2024
9afd6ba
update
robertgshaw2-redhat Aug 18, 2024
c262088
format
robertgshaw2-redhat Aug 18, 2024
3e580d5
reorder
robertgshaw2-redhat Aug 18, 2024
d9e10e0
reverT
robertgshaw2-redhat Aug 18, 2024
4e3a63a
fix
robertgshaw2-redhat Aug 18, 2024
e54bf8a
fix
robertgshaw2-redhat Aug 18, 2024
6544f3a
fix abort logic
robertgshaw2-redhat Aug 18, 2024
81f4da8
reduce LOC change
robertgshaw2-redhat Aug 18, 2024
b3374bc
cleanup
robertgshaw2-redhat Aug 18, 2024
dd1817a
cleanup
robertgshaw2-redhat Aug 18, 2024
5b56365
format
robertgshaw2-redhat Aug 18, 2024
05ff816
fix client
robertgshaw2-redhat Aug 18, 2024
e551d30
revert unneccessary change
robertgshaw2-redhat Aug 18, 2024
3d7f65f
revert startup probe changes to separate PR
robertgshaw2-redhat Aug 18, 2024
e7e6f1e
stash
robertgshaw2-redhat Aug 18, 2024
eaaebcc
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 18, 2024
21b5239
stash draining
robertgshaw2-redhat Aug 19, 2024
7e15b00
update
robertgshaw2-redhat Aug 19, 2024
74c4166
stash
robertgshaw2-redhat Aug 19, 2024
450e949
convert RPCServer to use DEALER
robertgshaw2-redhat Aug 19, 2024
8348f1f
stash
robertgshaw2-redhat Aug 19, 2024
545956e
fix
robertgshaw2-redhat Aug 19, 2024
7a34611
cleaning
robertgshaw2-redhat Aug 19, 2024
50abb94
stash
robertgshaw2-redhat Aug 19, 2024
1723687
remove awk
robertgshaw2-redhat Aug 19, 2024
3dfc9ef
nits
robertgshaw2-redhat Aug 20, 2024
8d40f2d
format
robertgshaw2-redhat Aug 20, 2024
3397460
format
robertgshaw2-redhat Aug 20, 2024
ef132dc
nit
robertgshaw2-redhat Aug 20, 2024
10ef204
change
robertgshaw2-redhat Aug 20, 2024
b67718f
clean
robertgshaw2-redhat Aug 20, 2024
c3c1dbe
Update vllm/entrypoints/openai/rpc/server.py
robertgshaw2-redhat Aug 20, 2024
ee6efcf
format
robertgshaw2-redhat Aug 20, 2024
3fdc2fe
cleanup abort logic
robertgshaw2-redhat Aug 20, 2024
4cacb56
nit
robertgshaw2-redhat Aug 20, 2024
724eb31
added load test
robertgshaw2-redhat Aug 21, 2024
4d5e6b7
update load test
robertgshaw2-redhat Aug 21, 2024
b9e4168
updated
robertgshaw2-redhat Aug 21, 2024
8f9bc23
format
robertgshaw2-redhat Aug 21, 2024
9a2be3f
updated
robertgshaw2-redhat Aug 21, 2024
dee38f0
revert suurious change
robertgshaw2-redhat Aug 21, 2024
e78f443
convert to even smaller model
robertgshaw2-redhat Aug 21, 2024
cc2d7db
20k requests
robertgshaw2-redhat Aug 21, 2024
b40e269
convert to 10k requests
robertgshaw2-redhat Aug 21, 2024
03eed9c
clean up closing logic
robertgshaw2-redhat Aug 21, 2024
f697226
use constant
robertgshaw2-redhat Aug 21, 2024
fd642ab
fix bad cleanup
robertgshaw2-redhat Aug 21, 2024
762c2ed
remove useless argument
robertgshaw2-redhat Aug 21, 2024
c805ed2
up to 20k requests
robertgshaw2-redhat Aug 21, 2024
2e1652e
revert to 10k requests
robertgshaw2-redhat Aug 21, 2024
3e1ede4
revert suprious argument
robertgshaw2-redhat Aug 21, 2024
b3bf7ef
revert to 20k
robertgshaw2-redhat Aug 21, 2024
708bd34
format
robertgshaw2-redhat Aug 21, 2024
10a88ec
[BugFix] Raise all exception variations in async generator
njhill Aug 20, 2024
db8aebc
Fix possible premature generator completion; add tests
njhill Aug 21, 2024
b16c64b
format
robertgshaw2-redhat Aug 21, 2024
a9ecaa9
added test accuracy
robertgshaw2-redhat Aug 21, 2024
6f8d5e8
format
robertgshaw2-redhat Aug 21, 2024
bab177f
updated test pipeline
robertgshaw2-redhat Aug 21, 2024
7b58281
fix lm eval
robertgshaw2-redhat Aug 21, 2024
adf45d1
cleanup
robertgshaw2-redhat Aug 21, 2024
9e827b0
updated
robertgshaw2-redhat Aug 21, 2024
47dca36
Merge branch 'main' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 21, 2024
f84c341
added sleep time
robertgshaw2-redhat Aug 21, 2024
0ce78f8
actually sleep
robertgshaw2-redhat Aug 21, 2024
8054348
formatting
robertgshaw2-redhat Aug 21, 2024
5ddbdab
format
robertgshaw2-redhat Aug 21, 2024
1ebbe9e
mypy
robertgshaw2-redhat Aug 21, 2024
53d639b
mypy
robertgshaw2-redhat Aug 21, 2024
a36b381
format
robertgshaw2-redhat Aug 21, 2024
415ee39
remove test load
robertgshaw2-redhat Aug 21, 2024
26440e6
stash
robertgshaw2-redhat Aug 21, 2024
2442a9d
Merge branch 'fix-zmq-max-sockets' of https://github.com/neuralmagic/…
robertgshaw2-redhat Aug 21, 2024
b72f84f
Merge branch 'fix-raise-cancelled' into fix-zmq-max-sockets
robertgshaw2-redhat Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,11 @@ def is_stopped(self) -> bool:
def errored(self) -> bool:
return self._errored_with is not None

@property
def limit_concurrency(self) -> Optional[int]:
"""Maximum number of concurrently running requests."""
return None

def set_errored(self, exc: Exception) -> None:
self._errored_with = exc

Expand Down
4 changes: 4 additions & 0 deletions vllm/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def is_stopped(self) -> bool:
def errored(self) -> bool:
...

@property
def limit_concurrency(self) -> Optional[int]:
"""Maximum number of concurrently running requests."""

def generate(
self,
inputs: PromptInputs,
Expand Down
9 changes: 9 additions & 0 deletions vllm/entrypoints/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ async def serve_http(app: FastAPI, engine: AsyncEngineClient,

logger.info("Route: %s, Methods: %s", path, ', '.join(methods))

# Set concurrency limits in uvicorn if running in multiprocessing mode
# since zmq has maximum socket limit of zmq.constants.SOCKET_LIMIT (65536).
if engine.limit_concurrency is not None:
logger.info(
"Launching Uvicorn with --limit_concurrency %s. To avoid this "
"limit at the expense of performance run with "
"--disable-frontend-multiprocessing", engine.limit_concurrency)
uvicorn_kwargs["limit_concurrency"] = engine.limit_concurrency

config = uvicorn.Config(app, **uvicorn_kwargs)
server = uvicorn.Server(config)
_add_shutdown_handlers(app, server, engine)
Expand Down
5 changes: 3 additions & 2 deletions vllm/entrypoints/openai/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from vllm.sampling_params import SamplingParams

VLLM_RPC_SUCCESS_STR = "SUCCESS"
VLLM_RPC_HEALTHY_STR = "HEALTHY"
VLLM_RPC_SERVER_START_TIMEOUT_MS = 1000
VLLM_RPC_HEALTH_TIMEOUT_MS = 10000


@dataclass
Expand All @@ -34,7 +35,7 @@ class RPCUtilityRequest(Enum):
GET_SCHEDULER_CONFIG = 5
GET_LORA_CONFIG = 6
DO_LOG_STATS = 7
CHECK_HEALTH = 8
IS_SERVER_HEALTHY = 8
IS_TRACING_ENABLED = 9


Expand Down
141 changes: 105 additions & 36 deletions vllm/entrypoints/openai/rpc/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from contextlib import contextmanager
from typing import Any, AsyncGenerator, Mapping, Optional
from uuid import uuid4

import cloudpickle
import zmq
Expand All @@ -8,31 +10,108 @@
from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig,
ParallelConfig, SchedulerConfig)
from vllm.entrypoints.openai.rpc import (RPC_REQUEST_TYPE,
VLLM_RPC_HEALTHY_STR,
VLLM_RPC_HEALTH_TIMEOUT_MS,
VLLM_RPC_SERVER_START_TIMEOUT_MS,
VLLM_RPC_SUCCESS_STR, RPCAbortRequest,
RPCGenerateRequest, RPCUtilityRequest)
from vllm.inputs import PromptInputs
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.outputs import EmbeddingRequestOutput, RequestOutput
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs

# Time to wait before checking it the server process is alive.
SERVER_START_TIMEOUT_MS = 1000
logger = init_logger(__name__)

# Path used for inprocess proxy.
INPROC_PROXY_PATH = f"inproc://{uuid4()}"


class AsyncEngineRPCClient:
"""
RPCClient that connects to the RPCServer wrapping AsyncLLMEngine.

On startup, the RPCClient:
- makes DEALER socket (to_rpc_server) that connects to the RPCServer
via ipc, which uses unix sockets under the hood
(https://libzmq.readthedocs.io/en/zeromq4-1/zmq_ipc.html)
- makes ROUTER socket (from_api_server) that binds to a random
inproc address, which uses memory under the hood
(https://libzmq.readthedocs.io/en/zeromq3-x/zmq_inproc.html)
- runs a proxy in a background asyncio task between
from_api_server (ROUTER, inproc) and to_rpc_server (DEALER ipc, )

Each request handled by the asyncio api_server calls generate():
- make a DEALER socket that connects to from_api_server via inproc
- send a RCPGenerateRequest to the inproc socket
- background proxy forwards the request from inproc -> ipc
- RPCServer responds to the request one token at a time over ipc
- background proxy forwards the response from ipc -> inproc

The connection looks like this:
DEALER <- inproc -> [ ROUTER | DEALER ] <- ipc -> ROUTER

Message routing is performed via identities that are managed by the
ROUTER socket. ROUTER sockets track every connection it has and
tells the caller about these. The way it tells the caller is to stick
the connection identity in front of each message received. When we
send the message via a ROUTER, we first send an identity frame.
See https://zguide.zeromq.org/docs/chapter3/#The-Extended-Reply-Envelope
for more details on connection identities.

This proxy design enables us to use a single unix socket, which
improves performance by avoiding syscalls (~5%) and avoids resource limits
such as ulimit, which defaults to 1024 on ubuntu.

See: https://zguide.zeromq.org/docs/chapter3/ for more details on the
Request-Reply pattern of zeromq sockets.
"""
robertgshaw2-redhat marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, rpc_path: str):
self.context = zmq.asyncio.Context()
self.rpc_path = rpc_path
self.context.set(zmq.constants.MAX_SOCKETS,
self.context.get(zmq.constants.SOCKET_LIMIT))

# IPC connection to RPC Server (uses unix sockets).
self.to_rcp_server = self.context.socket(zmq.constants.DEALER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: rcp -> rpc

self.to_rcp_server.connect(rpc_path)

# In process proxy to RPC Server (used memory-based messaging).
self.from_api_server = self.context.socket(zmq.constants.ROUTER)
self.from_api_server.bind(INPROC_PROXY_PATH)

# Asyncio background task for the proxy.
self.proxy_task = asyncio.create_task(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@njhill does this need to be explicitly canceled somewhere?

(e.g. in close())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertgshaw2-neuralmagic yes, we should cancel it there.

self.run_proxy(self.from_api_server, self.to_rcp_server))

# Maximum number of requests that can be active. This value is
# used uvicorn to launch with --limit-concurrency to limit the
# maximum number of requests being processed at a time.
# Note: https://www.uvicorn.org/server-behavior/#resource-limits
# Note: this value is typically 65536
self.limit_concurrency = self.context.get(zmq.constants.SOCKET_LIMIT)

async def run_proxy(self, socket_from, socket_to):
"""Background task that runs a proxy"""
poller = zmq.asyncio.Poller()
poller.register(socket_from, zmq.constants.POLLIN)
poller.register(socket_to, zmq.constants.POLLIN)
while True:
events = await poller.poll()
events = dict(events)
if socket_from in events:
msg = await socket_from.recv_multipart()
await socket_to.send_multipart(msg)
if socket_to in events:
msg = await socket_to.recv_multipart()
await socket_from.send_multipart(msg)

async def setup(self):
"""Setup the client before it starts sending server requests."""

# Wait until server is ready.
await self.wait_for_server()
await self._wait_for_server_rpc()
self._errored = False

# Get the configs.
Expand All @@ -54,15 +133,13 @@ def close(self):
self.context.destroy()

@contextmanager
def socket(self):
# Ensure client sockets are always closed after use

# Connect to RPC socket for Request-Reply pattern,
def to_proxy_socket(self):
# Connect to the proxy.
# Note that we use DEALER to enable asynchronous communication
# to enable streaming.
socket = self.context.socket(zmq.constants.DEALER)
try:
socket.connect(self.rpc_path)
socket.connect(INPROC_PROXY_PATH)
yield socket
finally:
# linger == 0 means discard unsent messages
Expand All @@ -81,10 +158,9 @@ async def _send_get_data_rpc_request(self, request: RPCUtilityRequest,
error_message: str) -> Any:
"""Send an RPC request that is expecting data back."""

with self.socket() as socket:

with self.to_proxy_socket() as socket:
# Ping RPCServer with a request.
await socket.send(cloudpickle.dumps(request))
await socket.send_multipart([cloudpickle.dumps(request)])

# Await the data from the Server.
data = cloudpickle.loads(await socket.recv())
Expand All @@ -93,6 +169,9 @@ async def _send_get_data_rpc_request(self, request: RPCUtilityRequest,
# LoRAConfig can be None.
if expected_type == LoRAConfig and data is None:
pass
elif isinstance(data, Exception):
logger.warning(error_message)
robertgshaw2-redhat marked this conversation as resolved.
Show resolved Hide resolved
raise data
else:
raise ValueError(error_message)

Expand All @@ -103,9 +182,9 @@ async def _send_one_way_rpc_request(self,
error_message: str,
timeout: Optional[int] = None):
"""Send one-way RPC request to trigger an action."""
with self.socket() as socket:
with self.to_proxy_socket() as socket:
# Ping RPC Server with request.
await socket.send(cloudpickle.dumps(request))
await socket.send_multipart([cloudpickle.dumps(request)])

# Await acknowledgement from RPCServer.
if timeout is not None and await socket.poll(timeout=timeout) == 0:
Expand All @@ -114,6 +193,9 @@ async def _send_one_way_rpc_request(self,
response = cloudpickle.loads(await socket.recv())

if not isinstance(response, str) or response != VLLM_RPC_SUCCESS_STR:
if isinstance(response, Exception):
logger.warning(error_message)
robertgshaw2-redhat marked this conversation as resolved.
Show resolved Hide resolved
raise response
raise ValueError(error_message)

return response
Expand All @@ -130,13 +212,13 @@ async def get_model_config(self) -> ModelConfig:
async def is_tracing_enabled(self) -> bool:
return self.tracing_flag

async def wait_for_server(self):
async def _wait_for_server_rpc(self):
"""Wait for the RPCServer to start up."""

await self._send_one_way_rpc_request(
request=RPCUtilityRequest.IS_SERVER_READY,
error_message="Unable to start RPC Server.",
timeout=SERVER_START_TIMEOUT_MS)
error_message="Unable to start RPC Server",
timeout=VLLM_RPC_SERVER_START_TIMEOUT_MS)

async def _get_model_config_rpc(self) -> ModelConfig:
"""Get the ModelConfig object from the RPC Server"""
Expand Down Expand Up @@ -226,7 +308,7 @@ async def generate(

finished = False
try:
with self.socket() as socket:
with self.to_proxy_socket() as socket:

# Send RPCGenerateRequest to the RPCServer.
await socket.send_multipart([
Expand Down Expand Up @@ -266,23 +348,10 @@ async def generate(
async def check_health(self) -> None:
"""Raise if unhealthy"""

with self.socket() as socket:

# Ping RPCServer with CHECK_HEALTH request.
await socket.send(cloudpickle.dumps(RPCUtilityRequest.CHECK_HEALTH)
)

# Await the reply from the server.
# TODO: do we need an internal timeout here?
# Or do we expect the external probe to timeout and let this chill?
health_message = cloudpickle.loads(await socket.recv())

if isinstance(health_message, Exception):
raise health_message

if health_message != VLLM_RPC_HEALTHY_STR:
raise ValueError("Expected healthy response from backend but got "
"f{health_message}")
await self._send_one_way_rpc_request(
request=RPCUtilityRequest.IS_SERVER_HEALTHY,
error_message="Got Unhealthy response from RPC Server",
timeout=VLLM_RPC_HEALTH_TIMEOUT_MS)

async def encode(self, *args,
**kwargs) -> AsyncGenerator[EmbeddingRequestOutput, None]:
Expand Down
Loading
Loading