-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade a JS socket to TLS directly if possible #3341
base: main
Are you sure you want to change the base?
Upgrade a JS socket to TLS directly if possible #3341
Conversation
destroyIfCanceled = false | ||
)(upgrade(duplex)) | ||
tlsSockReadable <- socket match { | ||
case Socket.AsyncSocket(sock, _) if clientMode => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory the same logic should work for server sockets as well, but it results in a hang. I think this is connected to the pause or readable state of the socket vs duplex but I can't figure out how to unpause the socket at the right moment to allow things to flow.
@@ -37,6 +37,18 @@ private[tls] trait TLSSocketPlatform[F[_]] | |||
private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => | |||
|
|||
private[tls] def forAsync[F[_]]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this forAsyncServer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or couldn't we just pass clientMode
in forAsync
so we don't need separate methods? I don't really care, these are just nitpicks :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, it was bincompat. But this is all private anyway. We can just add the exclusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I didn't want to just add the exclusion unilaterally. Since it was fairly easy to avoid I did, but I've gone back to what it was before ignored the error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it was fairly easy to avoid I did
Yeah, this is good practice so thanks for doing that. But in this case I think its cleaner like this.
private[net] case class AsyncSocket[F[_]]( | ||
sock: facade.net.Socket, | ||
readStream: SuspendedStream[F, Byte] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about this. Do we know what happens to this SuspendedStream
after the sock
is directly upgraded?
What I mean is that this readStream
has already established listeners on the sock
. So if events are firing on those listeners as sock
is being used, but nobody is consuming from the SuspendedStream
, then I am concerned that this is actually a memory leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question.
As far as I can tell the listeners are all on control events, rather than data, i.e. they trigger changes of behaviour rather than push any data anywhere.
The read loop boils down to stream pull -> readable.read(), so if the stream isn't being consumed then read()
is never being called and there is no data to leak.
This also means that we're relying on the Javascript runtime to propagate any error or close events, which I think is a reasonable assumption, but we could add additional listeners for them if we feel that it's needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell the listeners are all on control events, rather than data, i.e. they trigger changes of behaviour rather than push any data anywhere.
Right, sure. But those triggers still invoke callbacks, that typically put things in Queue
s. If they are not consumed, they accumulate and leak.
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we're going to leak any data anywhere, but I guess we will end up with a channel full of ()
.
The only Queue
that I can see in the process is synchronous
so wont accumulate anything.
It seems like it ought to be possible for that channel not to be a channel since it's only acting as an indicator that something can be read or not (that is we need to communicate that something can be read, but duplicate events prior to a read are meaningless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I guess we will end up with a channel full of
()
.
Yes, sorry, this is what I meant by "memory leak". Not specifically a data leak, as in bytes of data.
It seems like it ought to be possible for that channel not to be a channel since it's only acting as an indicator that something can be read or not (that is we need to communicate that something can be read, but duplicate events prior to a read are meaningless)
This is a great point. I wonder if we can replace it with a Signal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only
Queue
that I can see in the process issynchronous
so wont accumulate anything.
Ah, this is a misconception. Unless whoever is pushing to the synchronous
queue respect backpressure and stops trying to push, the attempts to push will queue up unboundedly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes sorry, I actually thought that we wouldn't leak anything (including ()
). Had to reread the Node docs to correct myself about how the readable event actually works (I thought it was only fired once, rather than repeatedly).
I'm happy to switch over to a Signal
here but I wonder if it's worth it with #3348 now which should solve the issue as well. Don't know if you've got any preference for handling related PRs like these?
Unless whoever is pushing to the synchronous queue respect backpressure
I admit I'd assumed that the internals of fs2 would get that right 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to reread the Node docs to correct myself about how the readable event actually works (I thought it was only fired once, rather than repeatedly).
It is very confusing. While working on #3348 I discovered that the readable
event may be fired multiple times in a row (honestly it seemed a bit buggy) without read()
s in between, which is why we needed a counter for readable events and not simply a boolean toggle ...
I think if you merge #3348 into your PR, then my concerns about memory leaks would be addressed. But I need to think about this more. The "dangling" listeners are difficult to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readable event may be fired multiple times in a row
I would agree that sounds like a bug. I thought that there wouldn't be a new readable
event until you've done a read()
. I guess maybe it can half fill the internal buffer emit readable
, then fill it and emit another readable
? 🤷
At the worst the second read()
will just always return null
and then be flattened away immediately.
Will do that merge 👍 We're happy running off snapshot builds for a while so we've got time to get it right
…-directly-for-tls
This change alters how an upgrade to a TLS Socket is performed on JS runtimes.
Currently there is a lot of machinery to back and forth between native JS and FS2 constructs in order to perform the upgrade.
On the client side it currently goes "JS Socket <-> FS2 Socket <-> JS Duplex <-> JS TLS Socket <-> FS2 Socket".
Since JS can perform this upgrade directly we can let it do so when we have access to an underlying JS socket.
This results in "JS Socket <-> JS TLS Socket <-> FS2 Socket" instead, with the associated removal of several layers of Queues and Channels involved in the conversions. Note that the non TLS FS2 socket remains as is, it just is no longer involved in the TLS upgrade.
This is related to #3138 as it performs the optimization described there in limited situations that we can access without changing the API.
Will also address one of the issues on #3338 related to use of closed dispatchers in the duplex stream, as we no longer use it.