diff --git a/doc/api/dgram.md b/doc/api/dgram.md index 50fd5db5a671e1..b9b43fcb216c1e 100644 --- a/doc/api/dgram.md +++ b/doc/api/dgram.md @@ -166,6 +166,7 @@ added: v0.11.14 * `port` {integer} * `address` {string} * `exclusive` {boolean} + * `fd` {integer} * `callback` {Function} For UDP sockets, causes the `dgram.Socket` to listen for datagram @@ -177,6 +178,11 @@ system will attempt to listen on all addresses. Once binding is complete, a `'listening'` event is emitted and the optional `callback` function is called. +The `options` object may contain a `fd` property. When a `fd` greater +than `0` is set, it will wrap around an existing socket with the given +file descriptor. In this case, the properties of `port` and `address` +will be ignored. + Note that specifying both a `'listening'` event listener and passing a `callback` to the `socket.bind()` method is not harmful but not very useful. diff --git a/lib/dgram.js b/lib/dgram.js index 292f7daf876c43..f9f839c0dba3ad 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -25,7 +25,8 @@ const errors = require('internal/errors'); const { kStateSymbol, _createSocketHandle, - newHandle + newHandle, + guessHandleType, } = require('internal/dgram'); const { ERR_INVALID_ARG_TYPE, @@ -35,7 +36,8 @@ const { ERR_SOCKET_BAD_PORT, ERR_SOCKET_BUFFER_SIZE, ERR_SOCKET_CANNOT_SEND, - ERR_SOCKET_DGRAM_NOT_RUNNING + ERR_SOCKET_DGRAM_NOT_RUNNING, + ERR_INVALID_FD_TYPE } = errors.codes; const { Buffer } = require('buffer'); const util = require('util'); @@ -45,6 +47,7 @@ const { defaultTriggerAsyncIdScope, symbols: { async_id_symbol } } = require('internal/async_hooks'); +const { isInt32 } = require('internal/validators'); const { UV_UDP_REUSEADDR } = process.binding('constants').os; const { UDP, SendWrap } = process.binding('udp_wrap'); @@ -151,6 +154,28 @@ function bufferSize(self, size, buffer) { return ret; } +// Query master process to get the server handle and utilize it. +function bindServerHandle(self, options, errCb) { + if (!cluster) + cluster = require('cluster'); + + const state = self[kStateSymbol]; + cluster._getServer(self, options, (err, handle) => { + if (err) { + errCb(err); + return; + } + + if (!state.handle) { + // Handle has been closed in the mean time. + return handle.close(); + } + + replaceHandle(self, handle); + startListening(self); + }); +} + Socket.prototype.bind = function(port_, address_ /* , callback */) { let port = port_; @@ -171,6 +196,44 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) { return this; } + // Open an existing fd instead of creating a new one. + if (port !== null && typeof port === 'object' && + isInt32(port.fd) && port.fd > 0) { + const fd = port.fd; + const exclusive = !!port.exclusive; + const state = this[kStateSymbol]; + + if (!cluster) + cluster = require('cluster'); + + if (cluster.isWorker && !exclusive) { + bindServerHandle(this, { + address: null, + port: null, + addressType: this.type, + fd, + flags: null + }, (err) => { + // Callback to handle error. + const ex = errnoException(err, 'open'); + this.emit('error', ex); + state.bindState = BIND_STATE_UNBOUND; + }); + return this; + } + + const type = guessHandleType(fd); + if (type !== 'UDP') + throw new ERR_INVALID_FD_TYPE(type); + const err = state.handle.open(fd); + + if (err) + throw errnoException(err, 'open'); + + startListening(this); + return this; + } + var address; var exclusive; @@ -207,28 +270,18 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) { flags |= UV_UDP_REUSEADDR; if (cluster.isWorker && !exclusive) { - const onHandle = (err, handle) => { - if (err) { - var ex = exceptionWithHostPort(err, 'bind', ip, port); - this.emit('error', ex); - state.bindState = BIND_STATE_UNBOUND; - return; - } - - if (!state.handle) - // handle has been closed in the mean time. - return handle.close(); - - replaceHandle(this, handle); - startListening(this); - }; - cluster._getServer(this, { + bindServerHandle(this, { address: ip, port: port, addressType: this.type, fd: -1, flags: flags - }, onHandle); + }, (err) => { + // Callback to handle error. + const ex = exceptionWithHostPort(err, 'bind', ip, port); + this.emit('error', ex); + state.bindState = BIND_STATE_UNBOUND; + }); } else { if (!state.handle) return; // handle has been closed in the mean time diff --git a/lib/internal/dgram.js b/lib/internal/dgram.js index 82b65294c21da5..e9b5364c8e3674 100644 --- a/lib/internal/dgram.js +++ b/lib/internal/dgram.js @@ -1,7 +1,9 @@ 'use strict'; -const assert = require('assert'); const { codes } = require('internal/errors'); const { UDP } = process.binding('udp_wrap'); +const { isInt32 } = require('internal/validators'); +const TTYWrap = process.binding('tty_wrap'); +const { UV_EINVAL } = process.binding('uv'); const { ERR_INVALID_ARG_TYPE, ERR_SOCKET_BAD_TYPE } = codes; const kStateSymbol = Symbol('state symbol'); let dns; // Lazy load for startup performance. @@ -17,6 +19,9 @@ function lookup6(lookup, address, callback) { } +const guessHandleType = TTYWrap.guessHandleType; + + function newHandle(type, lookup) { if (lookup === undefined) { if (dns === undefined) { @@ -49,22 +54,32 @@ function newHandle(type, lookup) { function _createSocketHandle(address, port, addressType, fd, flags) { - // Opening an existing fd is not supported for UDP handles. - assert(typeof fd !== 'number' || fd < 0); - const handle = newHandle(addressType); - - if (port || address) { - const err = handle.bind(address, port || 0, flags); - - if (err) { - handle.close(); - return err; + let err; + + if (isInt32(fd) && fd > 0) { + const type = guessHandleType(fd); + if (type !== 'UDP') { + err = UV_EINVAL; + } else { + err = handle.open(fd); } + } else if (port || address) { + err = handle.bind(address, port || 0, flags); + } + + if (err) { + handle.close(); + return err; } return handle; } -module.exports = { kStateSymbol, _createSocketHandle, newHandle }; +module.exports = { + kStateSymbol, + _createSocketHandle, + newHandle, + guessHandleType, +}; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index 2ef5c61358744a..9bcf6ceb7b3ed2 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -117,6 +117,7 @@ void UDPWrap::Initialize(Local target, Local(), attributes); + env->SetProtoMethod(t, "open", Open); env->SetProtoMethod(t, "bind", Bind); env->SetProtoMethod(t, "send", Send); env->SetProtoMethod(t, "bind6", Bind6); @@ -206,6 +207,18 @@ void UDPWrap::DoBind(const FunctionCallbackInfo& args, int family) { } +void UDPWrap::Open(const FunctionCallbackInfo& args) { + UDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, + args.Holder(), + args.GetReturnValue().Set(UV_EBADF)); + int fd = static_cast(args[0]->IntegerValue()); + int err = uv_udp_open(&wrap->handle_, fd); + + args.GetReturnValue().Set(err); +} + + void UDPWrap::Bind(const FunctionCallbackInfo& args) { DoBind(args, AF_INET); } diff --git a/src/udp_wrap.h b/src/udp_wrap.h index ca048f5aef98af..b5d282489685ed 100644 --- a/src/udp_wrap.h +++ b/src/udp_wrap.h @@ -42,6 +42,7 @@ class UDPWrap: public HandleWrap { v8::Local context); static void GetFD(const v8::FunctionCallbackInfo& args); static void New(const v8::FunctionCallbackInfo& args); + static void Open(const v8::FunctionCallbackInfo& args); static void Bind(const v8::FunctionCallbackInfo& args); static void Send(const v8::FunctionCallbackInfo& args); static void Bind6(const v8::FunctionCallbackInfo& args); diff --git a/test/parallel/test-cluster-dgram-bind-fd.js b/test/parallel/test-cluster-dgram-bind-fd.js new file mode 100644 index 00000000000000..429f932608e2b2 --- /dev/null +++ b/test/parallel/test-cluster-dgram-bind-fd.js @@ -0,0 +1,108 @@ +'use strict'; +const common = require('../common'); +if (common.isWindows) + common.skip('dgram clustering is currently not supported on Windows.'); + +const NUM_WORKERS = 4; +const PACKETS_PER_WORKER = 10; + +const assert = require('assert'); +const cluster = require('cluster'); +const dgram = require('dgram'); +const { UDP } = process.binding('udp_wrap'); + +if (cluster.isMaster) + master(); +else + worker(); + + +function master() { + // Create a handle and use its fd. + const rawHandle = new UDP(); + const err = rawHandle.bind(common.localhostIPv4, 0, 0); + assert(err >= 0, String(err)); + assert.notStrictEqual(rawHandle.fd, -1); + + const fd = rawHandle.fd; + + let listening = 0; + + // Fork 4 workers. + for (let i = 0; i < NUM_WORKERS; i++) + cluster.fork(); + + // Wait until all workers are listening. + cluster.on('listening', common.mustCall((worker, address) => { + if (++listening < NUM_WORKERS) + return; + + // Start sending messages. + const buf = Buffer.from('hello world'); + const socket = dgram.createSocket('udp4'); + let sent = 0; + doSend(); + + function doSend() { + socket.send(buf, 0, buf.length, address.port, address.address, afterSend); + } + + function afterSend() { + sent++; + if (sent < NUM_WORKERS * PACKETS_PER_WORKER) { + doSend(); + } else { + socket.close(); + } + } + }, NUM_WORKERS)); + + // Set up event handlers for every worker. Each worker sends a message when + // it has received the expected number of packets. After that it disconnects. + for (const key in cluster.workers) { + if (cluster.workers.hasOwnProperty(key)) + setupWorker(cluster.workers[key]); + } + + function setupWorker(worker) { + let received = 0; + + worker.send({ + fd, + }); + + worker.on('message', common.mustCall((msg) => { + received = msg.received; + worker.disconnect(); + })); + + worker.on('exit', common.mustCall(() => { + assert.strictEqual(received, PACKETS_PER_WORKER); + })); + } +} + + +function worker() { + let received = 0; + + process.on('message', common.mustCall((data) => { + const { fd } = data; + // Create udp socket and start listening. + const socket = dgram.createSocket('udp4'); + + socket.on('message', common.mustCall((data, info) => { + received++; + + // Every 10 messages, notify the master. + if (received === PACKETS_PER_WORKER) { + process.send({ received }); + socket.close(); + } + }, PACKETS_PER_WORKER)); + + socket.bind({ + fd, + }); + })); +} diff --git a/test/parallel/test-dgram-bind-fd-error.js b/test/parallel/test-dgram-bind-fd-error.js new file mode 100644 index 00000000000000..efe0e43d7b5673 --- /dev/null +++ b/test/parallel/test-dgram-bind-fd-error.js @@ -0,0 +1,55 @@ +// Flags: --expose-internals +'use strict'; +const common = require('../common'); +if (common.isWindows) + common.skip('Does not support binding fd on Windows'); + +const dgram = require('dgram'); +const assert = require('assert'); +const { kStateSymbol } = require('internal/dgram'); +const { TCP, constants } = process.binding('tcp_wrap'); +const TYPE = 'udp4'; + +// Throw when the fd is occupied according to https://github.com/libuv/libuv/pull/1851. +{ + const socket = dgram.createSocket(TYPE); + + socket.bind(common.mustCall(() => { + const anotherSocket = dgram.createSocket(TYPE); + const { handle } = socket[kStateSymbol]; + + common.expectsError(() => { + anotherSocket.bind({ + fd: handle.fd, + }); + }, { + code: 'EEXIST', + type: Error, + message: /^open EEXIST$/ + }); + + socket.close(); + })); +} + +// Throw when the type of fd is not "UDP". +{ + const handle = new TCP(constants.SOCKET); + handle.listen(); + + const fd = handle.fd; + assert.notStrictEqual(fd, -1); + + const socket = new dgram.createSocket(TYPE); + common.expectsError(() => { + socket.bind({ + fd, + }); + }, { + code: 'ERR_INVALID_FD_TYPE', + type: TypeError, + message: /^Unsupported fd type: TCP$/ + }); + + handle.close(); +} diff --git a/test/parallel/test-dgram-bind-fd.js b/test/parallel/test-dgram-bind-fd.js new file mode 100644 index 00000000000000..c4a80abb92c4d2 --- /dev/null +++ b/test/parallel/test-dgram-bind-fd.js @@ -0,0 +1,118 @@ +'use strict'; +const common = require('../common'); +if (common.isWindows) + common.skip('Does not support binding fd on Windows'); + +const assert = require('assert'); +const dgram = require('dgram'); +const { UDP } = process.binding('udp_wrap'); +const { UV_UDP_REUSEADDR } = process.binding('constants').os; + +const BUFFER_SIZE = 4096; + +// Test binding a fd. +{ + function createHandle(reuseAddr, udp4, bindAddress) { + let flags = 0; + if (reuseAddr) + flags |= UV_UDP_REUSEADDR; + + const handle = new UDP(); + let err = 0; + + if (udp4) { + err = handle.bind(bindAddress, 0, flags); + } else { + err = handle.bind6(bindAddress, 0, flags); + } + assert(err >= 0, String(err)); + assert.notStrictEqual(handle.fd, -1); + return handle; + } + + function testWithOptions(reuseAddr, udp4) { + const type = udp4 ? 'udp4' : 'udp6'; + const bindAddress = udp4 ? common.localhostIPv4 : '::1'; + + let fd; + + const receiver = dgram.createSocket({ + type, + }); + + receiver.bind({ + port: 0, + address: bindAddress, + }, common.mustCall(() => { + const { port, address } = receiver.address(); + // Create a handle to reuse its fd. + const handle = createHandle(reuseAddr, udp4, bindAddress); + + fd = handle.fd; + assert.notStrictEqual(handle.fd, -1); + + const socket = dgram.createSocket({ + type, + recvBufferSize: BUFFER_SIZE, + sendBufferSize: BUFFER_SIZE, + }); + + socket.bind({ + port: 0, + address: bindAddress, + fd, + }, common.mustCall(() => { + // Test address(). + const rinfo = {}; + const err = handle.getsockname(rinfo); + assert.strictEqual(err, 0); + const socketRInfo = socket.address(); + assert.strictEqual(rinfo.address, socketRInfo.address); + assert.strictEqual(rinfo.port, socketRInfo.port); + + // Test buffer size. + const recvBufferSize = socket.getRecvBufferSize(); + const sendBufferSize = socket.getSendBufferSize(); + + // note: linux will double the buffer size + const expectedBufferSize = common.isLinux ? + BUFFER_SIZE * 2 : BUFFER_SIZE; + assert.strictEqual(recvBufferSize, expectedBufferSize); + assert.strictEqual(sendBufferSize, expectedBufferSize); + + socket.send(String(fd), port, address); + })); + + socket.on('message', common.mustCall((data) => { + assert.strictEqual(data.toString('utf8'), String(fd)); + socket.close(); + })); + + socket.on('error', (err) => { + console.error(err.message); + assert.fail(err.message); + }); + + socket.on('close', common.mustCall(() => {})); + })); + + receiver.on('message', common.mustCall((data, { address, port }) => { + assert.strictEqual(data.toString('utf8'), String(fd)); + receiver.send(String(fd), port, address); + process.nextTick(() => receiver.close()); + })); + + receiver.on('error', (err) => { + console.error(err.message); + assert.fail(err.message); + }); + + receiver.on('close', common.mustCall(() => {})); + } + + testWithOptions(true, true); + testWithOptions(false, true); + if (common.hasIPv6) { + testWithOptions(false, false); + } +} diff --git a/test/parallel/test-dgram-create-socket-handle-fd.js b/test/parallel/test-dgram-create-socket-handle-fd.js new file mode 100644 index 00000000000000..ff507b6ec5091e --- /dev/null +++ b/test/parallel/test-dgram-create-socket-handle-fd.js @@ -0,0 +1,42 @@ +'use strict'; +const common = require('../common'); +if (common.isWindows) + common.skip('Does not support binding fd on Windows'); + +const assert = require('assert'); +const dgram = require('dgram'); +const { UDP } = process.binding('udp_wrap'); +const { TCP, constants } = process.binding('tcp_wrap'); +const _createSocketHandle = dgram._createSocketHandle; + +// Return a negative number if the "existing fd" is invalid. +{ + const err = _createSocketHandle(common.localhostIPv4, 0, 'udp4', 42); + assert(err < 0); +} + +// Return a negative number if the type of fd is not "UDP". +{ + // Create a handle with fd. + const rawHandle = new UDP(); + const err = rawHandle.bind(common.localhostIPv4, 0, 0); + assert(err >= 0, String(err)); + assert.notStrictEqual(rawHandle.fd, -1); + + const handle = _createSocketHandle(null, 0, 'udp4', rawHandle.fd); + assert(handle instanceof UDP); + assert.strictEqual(typeof handle.fd, 'number'); + assert(handle.fd > 0); +} + +// Create a bound handle. +{ + const rawHandle = new TCP(constants.SOCKET); + const err = rawHandle.listen(); + assert(err >= 0, String(err)); + assert.notStrictEqual(rawHandle.fd, -1); + + const handle = _createSocketHandle(null, 0, 'udp4', rawHandle.fd); + assert(handle < 0); + rawHandle.close(); +} diff --git a/test/parallel/test-dgram-create-socket-handle.js b/test/parallel/test-dgram-create-socket-handle.js index e49e3e8dd6c4b4..3df34a95c2b526 100644 --- a/test/parallel/test-dgram-create-socket-handle.js +++ b/test/parallel/test-dgram-create-socket-handle.js @@ -5,14 +5,6 @@ const assert = require('assert'); const { _createSocketHandle } = require('internal/dgram'); const UDP = process.binding('udp_wrap').UDP; -// Throws if an "existing fd" is passed in. -common.expectsError(() => { - _createSocketHandle(common.localhostIPv4, 0, 'udp4', 42); -}, { - code: 'ERR_ASSERTION', - message: /^false == true$/ -}); - { // Create a handle that is not bound. const handle = _createSocketHandle(null, null, 'udp4');