Skip to content

Commit

Permalink
feat: add send_endorsements
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Mar 3, 2023
1 parent 485cc5e commit 0c9f4cf
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 117 deletions.
226 changes: 207 additions & 19 deletions massa-grpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::config::GrpcConfig;
use massa_consensus_exports::{ConsensusChannels, ConsensusController};
use massa_models::{
block::{BlockDeserializer, BlockDeserializerArgs, SecureShareBlock},
endorsement::{EndorsementDeserializer, SecureShareEndorsement},
error::ModelsError,
operation::{OperationDeserializer, SecureShareOperation},
secure_share::SecureShareDeserializer,
Expand Down Expand Up @@ -231,7 +232,7 @@ impl grpc::grpc_server::Grpc for MassaService {
"failed to propagate block: {}",
e
);
let _res = sendblocks_notify_error(
let _res = send_blocks_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::Internal,
Expand Down Expand Up @@ -260,7 +261,7 @@ impl grpc::grpc_server::Grpc for MassaService {
"wrong signature: {}",
res_block.signature
);
let _res = sendblocks_notify_error(
let _res = send_blocks_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand All @@ -269,7 +270,7 @@ impl grpc::grpc_server::Grpc for MassaService {
.await;
};
} else {
let _res = sendblocks_notify_error(
let _res = send_blocks_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand All @@ -281,7 +282,7 @@ impl grpc::grpc_server::Grpc for MassaService {
}
Err(e) => {
let error = format!("failed to deserialize block: {}", e);
let _res = sendblocks_notify_error(
let _res = send_blocks_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand All @@ -292,7 +293,7 @@ impl grpc::grpc_server::Grpc for MassaService {
}
};
} else {
let _res = sendblocks_notify_error(
let _res = send_blocks_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand Down Expand Up @@ -337,9 +338,166 @@ impl grpc::grpc_server::Grpc for MassaService {

async fn send_endorsements(
&self,
_request: tonic::Request<tonic::Streaming<grpc::SendEndorsementsRequest>>,
request: tonic::Request<tonic::Streaming<grpc::SendEndorsementsRequest>>,
) -> Result<tonic::Response<Self::SendEndorsementsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let mut cmd_sender = self.pool_command_sender.clone();
let mut protocol_sender = self.protocol_command_sender.clone();
let config = self.grpc_config.clone();
let storage = self.storage.clone_without_refs();

let (tx, rx) = tokio::sync::mpsc::channel(config.max_channel_size);
let mut in_stream = request.into_inner();

tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(req_content) => {
if req_content.endorsements.is_empty() {
let _res = send_endorsements_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
"the request payload is empty".to_owned(),
)
.await;
} else {
let proto_endorsement = req_content.endorsements;
if proto_endorsement.len() as u32 > config.max_endorsements_per_message
{
let _res = send_endorsements_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
"too many endorsements".to_owned(),
)
.await;
} else {
let endorsement_deserializer =
SecureShareDeserializer::new(EndorsementDeserializer::new(
config.thread_count,
config.endorsement_count,
));
let verified_eds_res: Result<HashMap<String, SecureShareEndorsement>, ModelsError> = proto_endorsement
.into_iter()
.map(|proto_endorsement| {
let mut ed_serialized = Vec::new();
ed_serialized.extend(proto_endorsement.signature.as_bytes());
ed_serialized.extend(proto_endorsement.creator_public_key.as_bytes());
ed_serialized.extend(proto_endorsement.serialized_content);
let verified_op = match endorsement_deserializer.deserialize::<DeserializeError>(&ed_serialized) {
Ok(tuple) => {
let (rest, res_endorsement): (&[u8], SecureShareEndorsement) = tuple;
if rest.is_empty() {
if let Ok(_verify_signature) = res_endorsement.verify_signature() {
Ok((res_endorsement.id.to_string(), res_endorsement))
} else {
Err(ModelsError::MassaSignatureError(massa_signature::MassaSignatureError::SignatureError(
format!("wrong signature: {}", res_endorsement.signature).to_owned())
))
}
} else {
Err(ModelsError::DeserializeError(
"there is data left after endorsement deserialization".to_owned()
))
}
},
Err(e) => {
Err(ModelsError::DeserializeError(format!("failed to deserialize endorsement: {}", e).to_owned()
))
}
};
verified_op
})
.collect();

match verified_eds_res {
Ok(verified_eds) => {
let mut endorsement_storage = storage.clone_without_refs();
endorsement_storage.store_endorsements(
verified_eds.values().cloned().collect(),
);
cmd_sender.add_endorsements(endorsement_storage.clone());

let _res = match protocol_sender
.propagate_endorsements(endorsement_storage)
{
Ok(()) => (),
Err(e) => {
let error = format!(
"failed to propagate endorsement: {}",
e
);
let _res = send_endorsements_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::Internal,
error.to_owned(),
)
.await;
}
};

let result = grpc::EndorsementResult {
ids: verified_eds.keys().cloned().collect(),
};
let _res = match tx
.send(Ok(grpc::SendEndorsementsResponse {
id: req_content.id.clone(),
message: Some(
grpc::send_endorsements_response::Message::Result(
result,
),
),
}))
.await
{
Ok(()) => (),
Err(e) => {
error!(
"failed to send back endorsement response: {}",
e
)
}
};
}
Err(e) => {
let error = format!("invalid endorsement(s): {}", e);
let _res = send_endorsements_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
error.to_owned(),
)
.await;
}
}
}
}
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
warn!("client disconnected, broken pipe: {}", io_err);
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(e) => {
error!("failed to send back send_endorsements error response: {}", e);
break;
}
}
}
}
}
});

let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

Ok(tonic::Response::new(
Box::pin(out_stream) as Self::SendEndorsementsStream
))
}

type SendOperationsStream = Pin<
Expand Down Expand Up @@ -367,7 +525,7 @@ impl grpc::grpc_server::Grpc for MassaService {
match result {
Ok(req_content) => {
if req_content.operations.is_empty() {
let _res = sendoperations_notify_error(
let _res = send_operations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand All @@ -377,7 +535,7 @@ impl grpc::grpc_server::Grpc for MassaService {
} else {
let proto_operations = req_content.operations;
if proto_operations.len() as u32 > config.max_operations_per_message {
let _res = sendoperations_notify_error(
let _res = send_operations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand Down Expand Up @@ -440,9 +598,11 @@ impl grpc::grpc_server::Grpc for MassaService {
{
Ok(()) => (),
Err(e) => {
let error =
format!("failed to propagate operations: {}", e);
let _res = sendoperations_notify_error(
let error = format!(
"failed to propagate operations: {}",
e
);
let _res = send_operations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::Internal,
Expand Down Expand Up @@ -476,8 +636,8 @@ impl grpc::grpc_server::Grpc for MassaService {
};
}
Err(e) => {
let error = format!("invalid operations:{}", e);
let _res = sendoperations_notify_error(
let error = format!("invalid operation(s): {}", e);
let _res = send_operations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
Expand All @@ -499,7 +659,7 @@ impl grpc::grpc_server::Grpc for MassaService {
match tx.send(Err(err)).await {
Ok(_) => (),
Err(e) => {
error!("failed to send back sendblocks error response: {}", e);
error!("failed to send back send_operations error response: {}", e);
break;
}
}
Expand All @@ -516,7 +676,7 @@ impl grpc::grpc_server::Grpc for MassaService {
}
}

async fn sendblocks_notify_error(
async fn send_blocks_notify_error(
id: String,
sender: tokio::sync::mpsc::Sender<Result<grpc::SendBlocksResponse, tonic::Status>>,
code: tonic::Code,
Expand All @@ -538,13 +698,41 @@ async fn sendblocks_notify_error(
{
Ok(()) => Ok(()),
Err(e) => {
error!("failed to send back sendblocks error response: {}", e);
error!("failed to send back send_blocks error response: {}", e);
Ok(())
}
}
}

async fn send_endorsements_notify_error(
id: String,
sender: tokio::sync::mpsc::Sender<Result<grpc::SendEndorsementsResponse, tonic::Status>>,
code: tonic::Code,
error: String,
) -> Result<(), Box<dyn Error>> {
error!("{}", error);
match sender
.send(Ok(grpc::SendEndorsementsResponse {
id,
message: Some(grpc::send_endorsements_response::Message::Error(
massa_proto::google::rpc::Status {
code: code.into(),
message: error,
details: Vec::new(),
},
)),
}))
.await
{
Ok(()) => Ok(()),
Err(e) => {
error!("failed to send back send_enorsements error response: {}", e);
Ok(())
}
}
}

async fn sendoperations_notify_error(
async fn send_operations_notify_error(
id: String,
sender: tokio::sync::mpsc::Sender<Result<grpc::SendOperationsResponse, tonic::Status>>,
code: tonic::Code,
Expand All @@ -566,7 +754,7 @@ async fn sendoperations_notify_error(
{
Ok(()) => Ok(()),
Err(e) => {
error!("failed to send back sendoperations error response: {}", e);
error!("failed to send back send_operations error response: {}", e);
Ok(())
}
}
Expand Down
5 changes: 2 additions & 3 deletions massa-proto/proto/massa/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package massa.api.v1;

import "common.proto";
import "endorsement.proto";

import "google/api/annotations.proto";
import "google/rpc/status.proto";
Expand Down Expand Up @@ -76,7 +75,7 @@ message SendEndorsementsRequest {
// string field
string id = 1;
// object field
Endorsement endorsement = 2;
repeated SecureSharePayload endorsements = 2;
}

// SendEndorsementsResponse holds response from SendEndorsements
Expand All @@ -93,7 +92,7 @@ message SendEndorsementsResponse {
// Holds Endorsement response
message EndorsementResult {
// string field
string id = 1;
repeated string ids = 1;
}

// SendOperationsRequest holds parameters to SendOperations
Expand Down
Binary file modified massa-proto/src/api.bin
Binary file not shown.
Loading

0 comments on commit 0c9f4cf

Please sign in to comment.