Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Make uv_udp_send send data immediately if possible #1358

Merged
merged 2 commits into from
Jul 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 74 additions & 95 deletions src/unix/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@


static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_recvmsg(uv_udp_t* handle);
static void uv__udp_sendmsg(uv_udp_t* handle);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
int domain,
unsigned int flags);
Expand All @@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
assert(handle->io_watcher.fd == -1);

uv__udp_run_completed(handle);

while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
QUEUE_REMOVE(q);

req = QUEUE_DATA(q, uv_udp_send_t, queue);
uv__req_unregister(handle->loop, req);

if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;

if (req->send_cb != NULL)
req->send_cb(req, -ECANCELED);
req->status = -ECANCELED;
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}

handle->send_queue_size = 0;
handle->send_queue_count = 0;
uv__udp_run_completed(handle);

assert(handle->send_queue_size == 0);
assert(handle->send_queue_count == 0);

/* Now tear down the handle. */
handle->recv_cb = NULL;
Expand All @@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
}


static void uv__udp_run_pending(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
struct msghdr h;
ssize_t size;

while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL);

req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);

memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;

do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
}
while (size == -1 && errno == EINTR);

/* TODO try to write once or twice more in the
* hope that the socket becomes readable again?
*/
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
break;

req->status = (size == -1 ? -errno : size);

/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count--;
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}
}


static void uv__udp_run_completed(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
Expand All @@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
req = QUEUE_DATA(q, uv_udp_send_t, queue);
uv__req_unregister(handle->loop, req);

handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count--;

if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
Expand All @@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
else
req->send_cb(req, req->status);
}

if (QUEUE_EMPTY(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
uv__handle_stop(handle);
}
}


static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
uv_udp_t* handle;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);

if (revents & UV__POLLIN)
uv__udp_recvmsg(loop, w, revents);
uv__udp_recvmsg(handle);

if (revents & UV__POLLOUT)
uv__udp_sendmsg(loop, w, revents);
if (revents & UV__POLLOUT) {
uv__udp_sendmsg(handle);
uv__udp_run_completed(handle);
}
}


static void uv__udp_recvmsg(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents) {
static void uv__udp_recvmsg(uv_udp_t* handle) {
struct sockaddr_storage peer;
struct msghdr h;
uv_udp_t* handle;
ssize_t nread;
uv_buf_t buf;
int flags;
int count;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);
assert(revents & UV__POLLIN);

assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);

Expand Down Expand Up @@ -247,34 +204,46 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
}


static void uv__udp_sendmsg(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents) {
uv_udp_t* handle;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);
assert(revents & UV__POLLOUT);
static void uv__udp_sendmsg(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
struct msghdr h;
ssize_t size;

assert(!QUEUE_EMPTY(&handle->write_queue)
|| !QUEUE_EMPTY(&handle->write_completed_queue));

/* Write out pending data first. */
uv__udp_run_pending(handle);
while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL);

/* Drain 'request completed' queue. */
uv__udp_run_completed(handle);
req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);

if (!QUEUE_EMPTY(&handle->write_completed_queue)) {
/* Schedule completion callbacks. */
uv__io_feed(handle->loop, &handle->io_watcher);
}
else if (QUEUE_EMPTY(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;

if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
uv__handle_stop(handle);
do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);

if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
break;

req->status = (size == -1 ? -errno : size);

/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
uv__io_feed(handle->loop, &handle->io_watcher);
}
}

Expand Down Expand Up @@ -415,15 +384,21 @@ int uv__udp_send(uv_udp_send_t* req,
unsigned int addrlen,
uv_udp_send_cb send_cb) {
int err;
int empty_queue;

assert(nbufs > 0);

err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
if (err)
return err;

uv__req_init(handle->loop, req, UV_UDP_SEND);
/* It's legal for send_queue_count > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up send_queue_size/count later.
*/
empty_queue = (handle->send_queue_count == 0);

uv__req_init(handle->loop, req, UV_UDP_SEND);
assert(addrlen <= sizeof(req->addr));
memcpy(&req->addr, addr, addrlen);
req->send_cb = send_cb;
Expand All @@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count++;
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
uv__handle_start(handle);

if (empty_queue)
uv__udp_sendmsg(handle);
else
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);

return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion test/test-watcher-cross-stop.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ TEST_IMPL(watcher_cross_stop) {
for (i = 0; i < ARRAY_SIZE(sockets); i++)
uv_close((uv_handle_t*) &sockets[i], close_cb);

ASSERT(0 < recv_cb_called && recv_cb_called <= ARRAY_SIZE(sockets));
ASSERT(recv_cb_called > 0);
ASSERT(ARRAY_SIZE(sockets) == send_cb_called);

uv_run(loop, UV_RUN_DEFAULT);
Expand Down