Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving from readable-streams to streamx #42

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
return NULL; \
}

#define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, res, nread) \
#define NAPI_MAKE_CALLBACK_AND_ALLOC(env, nil, ctx, cb, n, argv, nread) \
napi_value res; \
if (napi_make_callback(env, nil, ctx, cb, n, argv, &res) == napi_pending_exception) { \
napi_value fatal_exception; \
napi_get_and_clear_last_exception(env, &fatal_exception); \
Expand Down Expand Up @@ -71,6 +72,7 @@ typedef struct {
napi_ref on_close;
napi_ref on_connect;
napi_ref realloc;
bool destroyed;
} utp_napi_connection_t;

typedef struct {
Expand Down Expand Up @@ -175,12 +177,11 @@ on_uv_read (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct s
utp_napi_parse_address((struct sockaddr *) addr, ip, &port);

UTP_NAPI_CALLBACK(self->on_message, {
napi_value ret;
napi_value argv[3];
napi_create_int32(env, nread, &(argv[0]));
napi_create_uint32(env, port, &(argv[1]));
napi_create_string_utf8(env, ip, NAPI_AUTO_LENGTH, &(argv[2]));
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, ret, nread)
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 3, argv, nread)
})
}

Expand All @@ -189,7 +190,12 @@ on_uv_close (uv_handle_t *handle) {
utp_napi_t *self = (utp_napi_t *) handle->data;

self->pending_close--;
if (self->pending_close > 0) return;
if (self->pending_close == 1) {
uv_close((uv_handle_t *) &(self->handle), on_uv_close);
}
if (self->pending_close > 0) {
return;
}

UTP_NAPI_CALLBACK(self->on_close, {
NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL);
Expand Down Expand Up @@ -219,13 +225,20 @@ on_utp_firewall (utp_callback_arguments *a) {

inline static void
utp_napi_connection_destroy (utp_napi_connection_t *self) {
if (self->destroyed) {
return;
}
if (self->buf.base == NULL) {
return;
}
UTP_NAPI_CALLBACK(self->on_close, {
NAPI_MAKE_CALLBACK(env, NULL, ctx, callback, 0, NULL, NULL)
})

self->env = env;
self->buf.base = NULL;
self->buf.len = 0;
self->destroyed = true;

napi_delete_reference(self->env, self->ctx);
napi_delete_reference(self->env, self->on_read);
Expand Down Expand Up @@ -258,12 +271,14 @@ on_utp_state_change (utp_callback_arguments *a) {
}

case UTP_STATE_EOF: {
if (self->destroyed) {
return 0;
}
if (self->recv_packet_size) {
UTP_NAPI_CALLBACK(self->on_read, {
napi_value ret;
napi_value argv[1];
napi_create_uint32(env, self->recv_packet_size, &(argv[0]));
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size)
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size)
self->recv_packet_size = 0;
})
}
Expand Down Expand Up @@ -342,10 +357,9 @@ on_utp_read (utp_callback_arguments *a) {
}

UTP_NAPI_CALLBACK(self->on_read, {
napi_value ret;
napi_value argv[1];
napi_create_uint32(env, self->recv_packet_size, &(argv[0]));
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, ret, self->recv_packet_size)
NAPI_MAKE_CALLBACK_AND_ALLOC(env, NULL, ctx, callback, 1, argv, self->recv_packet_size)
self->recv_packet_size = 0;
})

Expand Down Expand Up @@ -425,7 +439,6 @@ NAPI_METHOD(utp_napi_close) {
err = uv_udp_recv_stop(&(self->handle));
if (err < 0) UTP_NAPI_THROW(err)

uv_close((uv_handle_t *) &(self->handle), on_uv_close);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handle is now closed serially. If I leave his in parallel, it causes a segfault. Probably a timing error, but I couldn't figure out its cause.

uv_close((uv_handle_t *) &(self->timer), on_uv_close);

return NULL;
Expand Down
17 changes: 10 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ const events = require('events')
const dns = require('dns')
const set = require('unordered-set')

const EMPTY = Buffer.alloc(0)

module.exports = UTP

const EMPTY = Buffer.alloc(0)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my , i readded the EMPTY constant at the wrong place. Should be just as before. (memo)


function UTP (opts) {
if (!(this instanceof UTP)) return new UTP(opts)
events.EventEmitter.call(this)
Expand Down Expand Up @@ -132,16 +132,17 @@ UTP.prototype._closeMaybe = function () {
if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) {
this._closed = true
binding.utp_napi_close(this._handle)
} else {
for (const conn of this.connections) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand how these lines could be missing: When you close the server, any open connections are supposed to be closed, right?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, each connection must be closed individually

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same as tcp)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instances created in the tests are not properly torn down then, causing a lot of weird errors happen when I tried to remove this. Checking the tests for "why they fail because this is removed" is the reason this takes to long for me to continue.

conn.destroy(new Error('server closed'))
}
}
}

UTP.prototype.connect = function (port, ip) {
if (!this._inited) this.bind()
if (!ip) ip = '127.0.0.1'
const conn = new Connection(this, port, ip, null, this._allowHalfOpen)
if (!isIP(ip)) conn._resolveAndConnect(port, ip)
else conn._connect(port, ip || '127.0.0.1')
return conn
return new Connection(this, port, ip, null, this._allowHalfOpen)
}

UTP.prototype.listen = function (port, ip, onlistening) {
Expand Down Expand Up @@ -204,11 +205,13 @@ UTP.prototype._onmessage = function (size, port, address) {
this.emit('message', message, { address, family: 'IPv4', port })

if (this._buffer.length - this._offset <= 65536) {
// max package buffer is 64kb and we wanna make sure we have room for that
// returning the buffer indicates to the native code that
// the buffer has changed
this._buffer = Buffer.allocUnsafe(this._buffer.length)
this._offset = 0
return this._buffer
}

return EMPTY
}

Expand Down
Loading