Skip to content

Commit

Permalink
Fix concurrency with async rwlock, split command validation (#1081)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Jul 23, 2024
1 parent 6360f9e commit ec7f4b2
Show file tree
Hide file tree
Showing 113 changed files with 564 additions and 426 deletions.
15 changes: 2 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.5.0"
version = "0.5.1"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
3 changes: 2 additions & 1 deletion sdk/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::bytes_serializable::BytesSerializable;
use crate::error::IggyError;
use crate::validatable::Validatable;
use std::fmt::Display;

pub trait Command: BytesSerializable + Send + Sync + Display {
pub trait Command: BytesSerializable + Validatable<IggyError> + Send + Sync + Display {
fn code(&self) -> u32;
}

Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/create_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ impl BytesSerializable for CreateConsumerGroup {
group_id,
name,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/delete_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl BytesSerializable for DeleteConsumerGroup {
topic_id,
group_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/get_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl BytesSerializable for GetConsumerGroup {
topic_id,
group_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/get_consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl BytesSerializable for GetConsumerGroups {
stream_id,
topic_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/join_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl BytesSerializable for JoinConsumerGroup {
topic_id,
group_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_groups/leave_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl BytesSerializable for LeaveConsumerGroup {
topic_id,
group_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_offsets/get_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ impl BytesSerializable for GetConsumerOffset {
topic_id,
partition_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/consumer_offsets/store_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ impl BytesSerializable for StoreConsumerOffset {
partition_id,
offset,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/messages/poll_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ impl BytesSerializable for PollMessages {
count,
auto_commit,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/messages/send_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ impl BytesSerializable for SendMessages {
partitioning: key,
messages,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/partitions/create_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl BytesSerializable for CreatePartitions {
topic_id,
partitions_count,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/partitions/delete_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl BytesSerializable for DeletePartitions {
topic_id,
partitions_count,
};
command.validate()?;
Ok(command)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl BytesSerializable for CreatePersonalAccessToken {
let expiry: IggyExpiry = expiry.into();

let command = CreatePersonalAccessToken { name, expiry };
command.validate()?;
Ok(command)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl BytesSerializable for DeletePersonalAccessToken {
}

let command = DeletePersonalAccessToken { name };
command.validate()?;
Ok(command)
}
}
Expand Down
4 changes: 1 addition & 3 deletions sdk/src/personal_access_tokens/get_personal_access_tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ impl BytesSerializable for GetPersonalAccessTokens {
Bytes::new()
}

fn from_bytes(bytes: Bytes) -> std::result::Result<GetPersonalAccessTokens, IggyError> {
fn from_bytes(bytes: Bytes) -> Result<GetPersonalAccessTokens, IggyError> {
if !bytes.is_empty() {
return Err(IggyError::InvalidCommand);
}

let command = GetPersonalAccessTokens {};
command.validate()?;
Ok(GetPersonalAccessTokens {})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl BytesSerializable for LoginWithPersonalAccessToken {
}

let command = LoginWithPersonalAccessToken { token };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/src/quic/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl BinaryTransport for QuicClient {
}

async fn send_with_response<T: Command>(&self, command: &T) -> Result<Bytes, IggyError> {
command.validate()?;
self.send_raw_with_response(command.code(), command.to_bytes())
.await
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/streams/create_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl BytesSerializable for CreateStream {
}

let command = CreateStream { stream_id, name };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/streams/delete_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl BytesSerializable for DeleteStream {

let stream_id = Identifier::from_bytes(bytes)?;
let command = DeleteStream { stream_id };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/streams/get_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl BytesSerializable for GetStream {

let stream_id = Identifier::from_bytes(bytes)?;
let command = GetStream { stream_id };
command.validate()?;
Ok(command)
}
}
Expand Down
2 changes: 0 additions & 2 deletions sdk/src/streams/get_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl BytesSerializable for GetStreams {
return Err(IggyError::InvalidCommand);
}

let command = GetStreams {};
command.validate()?;
Ok(GetStreams {})
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/streams/purge_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl BytesSerializable for PurgeStream {

let stream_id = Identifier::from_bytes(bytes)?;
let command = PurgeStream { stream_id };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/streams/update_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl BytesSerializable for UpdateStream {
}

let command = UpdateStream { stream_id, name };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/system/get_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl BytesSerializable for GetClient {

let client_id = u32::from_le_bytes(bytes.as_ref().try_into()?);
let command = GetClient { client_id };
command.validate()?;
Ok(command)
}
}
Expand Down
2 changes: 0 additions & 2 deletions sdk/src/system/get_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl BytesSerializable for GetClients {
return Err(IggyError::InvalidCommand);
}

let command = GetClients {};
command.validate()?;
Ok(GetClients {})
}
}
Expand Down
2 changes: 0 additions & 2 deletions sdk/src/system/get_me.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl BytesSerializable for GetMe {
return Err(IggyError::InvalidCommand);
}

let command = GetMe {};
command.validate()?;
Ok(GetMe {})
}
}
Expand Down
2 changes: 0 additions & 2 deletions sdk/src/system/get_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl BytesSerializable for GetStats {
return Err(IggyError::InvalidCommand);
}

let command = GetStats {};
command.validate()?;
Ok(GetStats {})
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/system/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ impl BytesSerializable for Ping {
}

let command = Ping {};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/src/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl BinaryTransport for TcpClient {
}

async fn send_with_response<T: Command>(&self, command: &T) -> Result<Bytes, IggyError> {
command.validate()?;
self.send_raw_with_response(command.code(), command.to_bytes())
.await
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ impl BytesSerializable for CreateTopic {
replication_factor,
name,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/delete_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl BytesSerializable for DeleteTopic {
stream_id,
topic_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/get_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl BytesSerializable for GetTopic {
stream_id,
topic_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/get_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl BytesSerializable for GetTopics {

let stream_id = Identifier::from_bytes(bytes)?;
let command = GetTopics { stream_id };
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/purge_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl BytesSerializable for PurgeTopic {
stream_id,
topic_id,
};
command.validate()?;
Ok(command)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/topics/update_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl BytesSerializable for UpdateTopic {
replication_factor,
name,
};
command.validate()?;
Ok(command)
}
}
Expand Down
7 changes: 1 addition & 6 deletions sdk/src/users/change_password.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,13 @@ impl BytesSerializable for ChangePassword {
current_password,
new_password,
};
command.validate()?;
Ok(command)
}
}

impl Display for ChangePassword {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}|{}|{}",
self.user_id, self.current_password, self.new_password
)
write!(f, "{}|******|******", self.user_id)
}
}

Expand Down
5 changes: 2 additions & 3 deletions sdk/src/users/create_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ impl BytesSerializable for CreateUser {
status,
permissions,
};
command.validate()?;
Ok(command)
}
}
Expand All @@ -151,8 +150,8 @@ impl Display for CreateUser {
};
write!(
f,
"{}|{}|{}|{}",
self.username, self.password, self.status, permissions
"{}|******|{}|{}",
self.username, self.status, permissions
)
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/src/users/delete_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl BytesSerializable for DeleteUser {

let user_id = Identifier::from_bytes(bytes)?;
let command = DeleteUser { user_id };
command.validate()?;
Ok(command)
}
}
Expand Down
Loading

0 comments on commit ec7f4b2

Please sign in to comment.