-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fix issues seen in "multi" mode when a large number of sockets are created concurrently #1509
Conversation
…re created concurrently - added a missing trim() in srv_impl - take up v2.3.0 of socket_connector which ensures that `serverToSocket` introduces no delay in completing the tcp handshake
Some stress testing, the improvement is solid enough for most webpages. For a single webpage to create enough connections to overload this version means that webpage is probably doing something very wrong. |
Once atsign-foundation/socket_connector#21 has been merged and published then I will bump the dependency in this PR and mark it ready for review |
e2e test failures are my fault, I accidentally merged a bad commit, will resolve this shortly. |
fix: as good practice, added calls to `await socket.flush()` everywhere we're writing stuff to sockets in srv_impl.dart test: added a feature to sshnpd whereby if `INLINE_SRV` is in the environment and set to `true`, then we run the Srv inline rather than forking it in a separate process. This is super useful for debugging srv issues on the daemon side.
…o fix-multi-highly-concurrent
Took up some latest socket_connector package and made some more tweaks to noports_core (srv_impl.dart). Updated PR description accordingly |
…o fix-multi-highly-concurrent
…true when calling SocketConnector.serverToSocket
- if "SRV_TRACE" is in the environment as 'true' then set verbose:true when calling SocketConnector.serverToSocket - if "SRV_INLINE" is in the environment as 'true' then run the srv inside the sshnpd process rather than forking
logTraffic: logTraffic, | ||
socketAuthVerifierA: socketAuthVerifierA, | ||
socketAuthVerifierB: socketAuthVerifierB, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format change only
timeout: req.timeout, | ||
).run(); | ||
logger.info('Started rv - pid is ${rv.pid}'); | ||
if (Platform.environment['SRV_INLINE'] == 'true') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feature very useful for debugging sshnpd where it will run the srv in the same process rather than forking it
/// - simple client which talks to a demo_socket_server | ||
/// - sends a request for N kBytes of data to be sent to us | ||
/// - listens for the data and writes a message at the end | ||
/// saying how much it received vs expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added this and demo_socket_server.dart to facilitate highly concurrent load testing of npt
if (relayAddresses.isEmpty) { | ||
throw Exception('Cannot resolve relay host $streamingHost'); | ||
} | ||
InternetAddress relayAddress = relayAddresses[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor: get the relayAddress at the start, and pass it down through the various functions
portB: streamingPort, | ||
verbose: false, | ||
verbose: Platform.environment['SRV_TRACE'] == 'true', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useful debugging feature to make the socket connector emit more logs
logger: ioSinkForLogger(logger), | ||
transformAtoB: encrypter, | ||
transformBtoA: decrypter, | ||
multi: multi, | ||
timeout: timeout, | ||
beforeJoining: (Side sideA, Side sideB) async { | ||
beforeJoining: (Side sideA, Side sideB) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has to be synchronous to avoid a race in SocketConnector.handleSingleConnection
@@ -630,69 +637,111 @@ class SrvImplDart implements Srv<SocketConnector> { | |||
return socketConnector; | |||
} | |||
|
|||
Future _handleMultiConnectRequest( | |||
Future<void> _handleMultiConnectRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor - _handleMultiConnectRequest
is now just handling a multi connect request; the parsing of what was received on the control socket is now done in the new _handleControlRequest
function
SocketConnector sc, | ||
InternetAddress relayAddress, | ||
DataTransformer? encrypter, | ||
DataTransformer? decrypter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor: Passing nullable encrypter and decrypter so we don't have two blocks of duplicate code for handling unencrypted vs encrypted connections
' creating new socketToSocket connection'); | ||
|
||
InternetAddress localAddress = await resolveRequestedLocalHost(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the code below, we're taking direct control of creating the socket connections (to the relay and to the requested local host:port
) rather than calling SocketConnector.socketToSocket
.
This is required because (1) the order really matters here (connect to relay first, connect to local host:port
second) (2) write and flush the auth string to the relay before trying to create the local socket (3) ensure that there are no uncaught exceptions on either socket
InternetAddress localAddress = await resolveRequestedLocalHost(); | ||
|
||
// First, connect to the relay. | ||
// If it fails, return, as we can't do anything more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that once we have connected and authenticated to the relay, the relay will join it to the client-side.
} | ||
|
||
// Now, connect to the local host:port | ||
// If it fails, we need to close the socket connection to the relay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we close the socket connection to the relay, the relay will close its connection to the client.
unawaited(sc.handleSingleConnection(sideA).catchError((err) { | ||
logger.severe( | ||
'ERROR $err from handleSingleConnection on sideA (local - $localAddress:$localPort)'); | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we've added both sides, socket connector will join them up so traffic starts to flow from local socket to relay socket (and within the relay, from this side's socket to the client side's socket, and on the client side from its relay socket to its local socket.)
} finally { | ||
controlStreamMutex.release(); | ||
} | ||
} | ||
|
||
Future<InternetAddress> resolveRequestedLocalHost() async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InternetAddress.lookup
'stype
parameter defaults toInternetAddressType.any
- Per that method's documentation, the order of the returned list is not guaranteed.
- We will try to resolve via IPv4 first; if that doesn't work then we will try to resolve via IPv6
- We have more work to do in this area, since ipv6 addresses contain colons and we have code scattered around which parses things based on colons. In general we are not properly supporting IPv6. I've created a ticket for remedying that
@gkc, @cconstab and @XavierChanth have successfully stress tested with various websites and concurrent load tests; this PR is functionally complete and ready to go. Next steps:
|
DRAFT until socket_connector 2.3.3 has been published and taken up by this branch
- What I did
fix: fix issues seen in "multi" mode when a large number of sockets are created concurrently
- How I did it
serverToSocket
introduces no delay in completing the tcp handshakedone
future is handled upon its completion whether with or without errorserverToSocket
handles sockets in strict sequence as they are acceptedINLINE_SRV
is in the environment and set totrue
, then we run the Srv inline rather than forking it in a separate process. This is super useful for debugging srv issues on the daemon side.- How to verify it
Manual testing required. I've tested as follows with local atDirectory and atServers:
build/sshnp/np_admin -a @policy -n sshnp --root-domain vip.ve.atsign.zone
bin/sshnpd --root-domain vip.ve.atsign.zone -m @client -a @service -d mbp --permit-open "*:*"
bin/srvd -a @relay --root-domain vip.ve.atsign.zone --ip 127.0.0.1 -v
bin/npt --root-domain vip.ve.atsign.zone -f @client -t @service -r @relay --rh 127.0.0.1 --rp 3000 -d mbp --lp 3001 -T 0