Skip to content

Commit

Permalink
Remove OpenSSL from client dependencies (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 6, 2024
1 parent f9c78d0 commit b16fdd1
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 109 deletions.
64 changes: 9 additions & 55 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions integration/tests/examples/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ pub(crate) trait IggyExampleTestCase {
) {
let server_addr = server.get_raw_tcp_addr().unwrap();
for stdout in [producer_stdout, consumer_stdout] {
assert!(stdout.contains(
format!("Iggy client is connecting to server: {}...", &server_addr).as_str()
));
assert!(stdout.contains(
format!("Iggy client has connected to server: {}", &server_addr).as_str()
));
assert!(
stdout.contains(format!("is connecting to server: {}...", &server_addr).as_str())
);
assert!(stdout.contains(format!("has connected to server: {}", &server_addr).as_str()));
}
}
fn protocol(&self, server: &TestServer) -> Vec<String> {
Expand Down
5 changes: 2 additions & 3 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.15"
version = "0.6.16"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -39,11 +39,10 @@ futures-util = "0.3.30"
humantime = "2.1.0"
keyring = { version = "3.2.0", optional = true, features = ["sync-secret-service", "vendored"] }
lazy_static = "1.4.0"
openssl = { version = "0.10.64", features = ["vendored"] }
passterm = { version = "2.0.1", optional = true }
quinn = { version = "0.11.5" }
regex = "1.10.4"
reqwest = { version = "0.12.7", features = ["json"] }
reqwest = { version = "0.12.7", default-features = false, features = ["json", "rustls-tls"] }
reqwest-middleware = { version = "0.3.2", features = ["json"] }
reqwest-retry = "0.6.1"
rustls = { version = "0.23.10", features = ["ring"] }
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ impl IggyConsumer {
continue;
}

info!("Rejoining consumer group");
info!("Rejoining consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}...");
if let Err(error) = Self::initialize_consumer_group(
client.clone(),
create_consumer_group_if_not_exists,
Expand All @@ -409,10 +409,10 @@ impl IggyConsumer {
)
.await
{
error!("Failed to join consumer group: {error}");
error!("Failed to join consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}. {error}");
continue;
}
info!("Rejoined consumer group");
info!("Rejoined consumer group: {consumer_name} for stream: {stream_id}, topic: {topic_id}");
can_poll.store(true, ORDERING);
}
DiagnosticEvent::SignedOut => {
Expand Down
39 changes: 30 additions & 9 deletions sdk/src/quic/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl BinaryTransport for QuicClient {
}

self.disconnect().await?;
info!("Reconnecting to the server...");
info!(
"Reconnecting to the server: {}, by client: {}",
self.config.server_address, self.config.client_address
);
self.connect().await?;
self.send_raw(code, payload).await
}
Expand Down Expand Up @@ -281,7 +284,7 @@ impl QuicClient {
return Err(IggyError::CannotEstablishConnection);
}

connection = connection_result.unwrap();
connection = connection_result?;
remote_address = connection.remote_address();
break;
}
Expand All @@ -299,19 +302,25 @@ impl QuicClient {
Ok(())
}
AutoLogin::Enabled(credentials) => {
info!("{NAME} client is signing in...");
info!(
"{NAME} client: {} is signing in...",
self.config.client_address
);
self.set_state(ClientState::Authenticating).await;
match credentials {
Credentials::UsernamePassword(username, password) => {
self.login_user(username, password).await?;
self.publish_event(DiagnosticEvent::SignedIn).await;
info!("{NAME} client has signed in with the user credentials, username: {username}",);
info!("{NAME} client: {} has signed in with the user credentials, username: {username}", self.config.client_address);
Ok(())
}
Credentials::PersonalAccessToken(token) => {
self.login_with_personal_access_token(token).await?;
self.publish_event(DiagnosticEvent::SignedIn).await;
info!("{NAME} client has signed in with a personal access token.",);
info!(
"{NAME} client: {} has signed in with a personal access token.",
self.config.client_address
);
Ok(())
}
}
Expand All @@ -324,24 +333,36 @@ impl QuicClient {
return Ok(());
}

info!("{NAME} client is disconnecting from server...");
info!(
"{NAME} client: {} is disconnecting from server...",
self.config.client_address
);
self.set_state(ClientState::Disconnected).await;
self.connection.lock().await.take();
self.endpoint.wait_idle().await;
self.publish_event(DiagnosticEvent::Disconnected).await;
let now = IggyTimestamp::now();
info!("{NAME} client has disconnected from server at: {now}.");
info!(
"{NAME} client: {} has disconnected from server at: {now}.",
self.config.client_address
);
Ok(())
}

async fn send_raw(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> {
match self.get_state().await {
ClientState::Disconnected => {
trace!("Cannot send data. Client is not connected.");
trace!(
"Cannot send data. Client: {} is not connected.",
self.config.client_address
);
return Err(IggyError::NotConnected);
}
ClientState::Connecting => {
trace!("Cannot send data. Client is still connecting.");
trace!(
"Cannot send data. Client: {} is still connecting.",
self.config.client_address
);
return Err(IggyError::NotConnected);
}
_ => {}
Expand Down
Loading

0 comments on commit b16fdd1

Please sign in to comment.