-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tpu quic proxy as module and lite-rpc switch #164
tpu quic proxy as module and lite-rpc switch #164
Conversation
… into groovie/tpu-proxy-as-module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To merge I approve.
@@ -29,4 +29,6 @@ pub struct Args { | |||
pub maximum_retries_per_tx: usize, | |||
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)] | |||
pub transaction_retry_after_secs: u64, | |||
#[arg(long)] | |||
pub experimental_quic_proxy_addr: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be quic_proxy_addr
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
None => TpuConnectionPath::QuicDirectPath, | ||
Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath { | ||
// e.g. "127.0.0.1:11111" | ||
forward_proxy_address: prox_address.parse().unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use expect instead of unwrap with a message, failed to parse proxy address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
pub async fn listen( | ||
&self, | ||
exit_signal: Arc<AtomicBool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid using exit_signal, we can do everything without exit_signals, using tokio tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
let exit_signal = exit_signal.clone(); | ||
let forwarder_channel_copy = forwarder_channel.clone(); | ||
tokio::spawn(async move { | ||
let connection = connecting.await.context("handshake").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid using unwrap. The proxy will crash if connecting returns an error, we should continue on error instead of panicing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
loop { | ||
let maybe_stream = client_connection.accept_uni().await; | ||
match maybe_stream { | ||
Err(quinn::ConnectionError::ApplicationClosed(reason)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually errors are after Ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pub agent_exit_signal: Arc<AtomicBool>, | ||
pub created_at: Instant, | ||
// relative to start | ||
pub age_ms: AtomicU64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need age_ms
and touch
we can always calculate age of handle by a simple function. This may add additional complexity if the touch is not correctly called the age will be wrong and handle will live forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i k .. that's bad. will fix that. the issue was that I could not easily have an AtomicTimestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ConnectionState::NotConnected => { | ||
match self.create_connection().await { | ||
Some(new_connection) => { | ||
*lock = ConnectionState::Connection(new_connection.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate code as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean?
} | ||
|
||
async fn create_connection(&self) -> Option<Connection> { | ||
let connection = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No implementation of rtt0 type of connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure that we want that? the connection handling is optimized to keep actions active and does not suffer so much from high connection cost - WDYT
tokio::time::timeout(FALLBACK_TIMEOUT, future) | ||
} | ||
|
||
// note this is duplicated from lite-rpc module |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should move the code where both of the libraries can use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but where - new crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
use std::fmt; | ||
use std::net::SocketAddr; | ||
use std::time::Duration; | ||
use tokio::sync::RwLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems similar code, should be benchmarked with existing code, I prefer only to have one way that we connect and maintain connection with TPU client. Could you compare the results of existing implementation and this implementation. You can also refactor original implementation so that it reflects the changes you want to do.
Basic implementation of #142
adds the "Quic-proxy" service (a standalone network service) which accepts transaction batches via a simple wire protocol from lite-rpc and forwards them to the designated TPU (identified by address in wire protocol).
the connection between client and proxy is also QUIC
the proxy optionally takes a validator identity keypair to boost the connection to TPU, i.e. turns it into a Staked connection
instructions how to start locally are documented in README
architecture
the proxy consists of an inbound and an outbound part. inbound manages the connects from proxy client (lite-rpc in this case); outbound manages the connections to TPU.