Skip to content

Commit

Permalink
feat: add send_operations
Browse files Browse the repository at this point in the history
naif impl
  • Loading branch information
aoudiamoncef committed Mar 3, 2023
1 parent 0d49c8d commit 485cc5e
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 175 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions massa-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ massa_storage = { path = "../massa-storage" }
massa_time = { path = "../massa-time" }
massa_wallet = { path = "../massa-wallet" }
massa_serialization = { path = "../massa-serialization" }
massa_signature = { path = "../massa-signature" }
massa_proto = { path = "../massa-proto" }
199 changes: 194 additions & 5 deletions massa-grpc/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! Copyright (c) 2022 MASSA LABS <[email protected]>
//! gRPC API for a massa-node
use std::{error::Error, io::ErrorKind, pin::Pin};
use std::{collections::HashMap, error::Error, io::ErrorKind, pin::Pin};

use crate::config::GrpcConfig;
use massa_consensus_exports::{ConsensusChannels, ConsensusController};
use massa_models::{
block::{BlockDeserializer, BlockDeserializerArgs, SecureShareBlock},
error::ModelsError,
operation::{OperationDeserializer, SecureShareOperation},
secure_share::SecureShareDeserializer,
};
use massa_pool_exports::PoolChannels;
use massa_pool_exports::{PoolChannels, PoolController};
use massa_proto::massa::api::v1::{self as grpc, grpc_server::GrpcServer, FILE_DESCRIPTOR_SET};
use massa_serialization::{DeserializeError, Deserializer};

Expand All @@ -29,6 +30,8 @@ pub struct MassaService {
pub consensus_channels: ConsensusChannels,
/// link(channels) to the pool component
pub pool_channels: PoolChannels,
/// link to the pool component
pub pool_command_sender: Box<dyn PoolController>,
/// link(channels) to the protocol component
pub protocol_command_sender: ProtocolCommandSender,
/// link to the storage component
Expand Down Expand Up @@ -270,7 +273,7 @@ impl grpc::grpc_server::Grpc for MassaService {
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
"incomplete deserialization".to_owned(),
"there is data left after operation deserialization".to_owned(),
)
.await;
}
Expand Down Expand Up @@ -349,9 +352,167 @@ impl grpc::grpc_server::Grpc for MassaService {

async fn send_operations(
&self,
_request: tonic::Request<tonic::Streaming<grpc::SendOperationsRequest>>,
request: tonic::Request<tonic::Streaming<grpc::SendOperationsRequest>>,
) -> Result<tonic::Response<Self::SendOperationsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
let mut cmd_sender = self.pool_command_sender.clone();
let mut protocol_sender = self.protocol_command_sender.clone();
let config = self.grpc_config.clone();
let storage = self.storage.clone_without_refs();

let (tx, rx) = tokio::sync::mpsc::channel(config.max_channel_size);
let mut in_stream = request.into_inner();

tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(req_content) => {
if req_content.operations.is_empty() {
let _res = sendoperations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
"the request payload is empty".to_owned(),
)
.await;
} else {
let proto_operations = req_content.operations;
if proto_operations.len() as u32 > config.max_operations_per_message {
let _res = sendoperations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
"too many operations".to_owned(),
)
.await;
} else {
let operation_deserializer =
SecureShareDeserializer::new(OperationDeserializer::new(
config.max_datastore_value_length,
config.max_function_name_length,
config.max_parameter_size,
config.max_op_datastore_entry_count,
config.max_op_datastore_key_length,
config.max_op_datastore_value_length,
));
let verified_ops_res: Result<HashMap<String, SecureShareOperation>, ModelsError> = proto_operations
.into_iter()
.map(|proto_operation| {
let mut op_serialized = Vec::new();
op_serialized.extend(proto_operation.signature.as_bytes());
op_serialized.extend(proto_operation.creator_public_key.as_bytes());
op_serialized.extend(proto_operation.serialized_content);
let verified_op = match operation_deserializer.deserialize::<DeserializeError>(&op_serialized) {
Ok(tuple) => {
let (rest, res_operation): (&[u8], SecureShareOperation) = tuple;
if rest.is_empty() {
if let Ok(_verify_signature) = res_operation.verify_signature() {
Ok((res_operation.id.to_string(), res_operation))
} else {
Err(ModelsError::MassaSignatureError(massa_signature::MassaSignatureError::SignatureError(
format!("wrong signature: {}", res_operation.signature).to_owned())
))
}
} else {
Err(ModelsError::DeserializeError(
"there is data left after operation deserialization".to_owned()
))
}
},
Err(e) => {
Err(ModelsError::DeserializeError(format!("failed to deserialize operation: {}", e).to_owned()
))
}
};
verified_op
})
.collect();

match verified_ops_res {
Ok(verified_ops) => {
let mut operation_storage = storage.clone_without_refs();
operation_storage.store_operations(
verified_ops.values().cloned().collect(),
);
cmd_sender.add_operations(operation_storage.clone());

let _res = match protocol_sender
.propagate_operations(operation_storage)
{
Ok(()) => (),
Err(e) => {
let error =
format!("failed to propagate operations: {}", e);
let _res = sendoperations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::Internal,
error.to_owned(),
)
.await;
}
};

let result = grpc::OperationResult {
ids: verified_ops.keys().cloned().collect(),
};
let _res = match tx
.send(Ok(grpc::SendOperationsResponse {
id: req_content.id.clone(),
message: Some(
grpc::send_operations_response::Message::Result(
result,
),
),
}))
.await
{
Ok(()) => (),
Err(e) => {
error!(
"failed to send back operations response: {}",
e
)
}
};
}
Err(e) => {
let error = format!("invalid operations:{}", e);
let _res = sendoperations_notify_error(
req_content.id.clone(),
tx.clone(),
tonic::Code::InvalidArgument,
error.to_owned(),
)
.await;
}
}
}
}
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
warn!("client disconnected, broken pipe: {}", io_err);
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(e) => {
error!("failed to send back sendblocks error response: {}", e);
break;
}
}
}
}
}
});

let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

Ok(tonic::Response::new(
Box::pin(out_stream) as Self::SendOperationsStream
))
}
}

Expand Down Expand Up @@ -382,3 +543,31 @@ async fn sendblocks_notify_error(
}
}
}

async fn sendoperations_notify_error(
id: String,
sender: tokio::sync::mpsc::Sender<Result<grpc::SendOperationsResponse, tonic::Status>>,
code: tonic::Code,
error: String,
) -> Result<(), Box<dyn Error>> {
error!("{}", error);
match sender
.send(Ok(grpc::SendOperationsResponse {
id,
message: Some(grpc::send_operations_response::Message::Error(
massa_proto::google::rpc::Status {
code: code.into(),
message: error,
details: Vec::new(),
},
)),
}))
.await
{
Ok(()) => Ok(()),
Err(e) => {
error!("failed to send back sendoperations error response: {}", e);
Ok(())
}
}
}
4 changes: 4 additions & 0 deletions massa-grpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct GrpcConfig {
pub max_operations_per_block: u32,
/// endorsement count
pub endorsement_count: u32,
/// max endorsements per message
pub max_endorsements_per_message: u32,
/// max datastore value length
pub max_datastore_value_length: u64,
/// max op datastore entry
Expand All @@ -41,6 +43,8 @@ pub struct GrpcConfig {
pub max_function_name_length: u16,
/// max parameter size
pub max_parameter_size: u32,
/// max operations per message in the network to avoid sending to big data packet
pub max_operations_per_message: u32,
/// limits the maximum size of streaming channel
pub max_channel_size: usize,
}
1 change: 1 addition & 0 deletions massa-grpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use massa_protocol_exports::ProtocolError;
use massa_time::TimeError;
use massa_wallet::WalletError;

//TODO handle custom error
/// Errors of the gRPC component.
#[non_exhaustive]
#[derive(Display, thiserror::Error, Debug)]
Expand Down
5 changes: 4 additions & 1 deletion massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use massa_models::config::constants::{
POS_SAVED_CYCLES, PROTOCOL_CONTROLLER_CHANNEL_SIZE, PROTOCOL_EVENT_CHANNEL_SIZE, ROLL_PRICE,
T0, THREAD_COUNT, VERSION,
};
use massa_models::config::CONSENSUS_BOOTSTRAP_PART_SIZE;
use massa_models::config::{CONSENSUS_BOOTSTRAP_PART_SIZE, MAX_OPERATIONS_PER_MESSAGE};
use massa_network_exports::{Establisher, NetworkConfig, NetworkManager};
use massa_network_worker::start_network_controller;
use massa_pool_exports::{PoolChannels, PoolConfig, PoolManager};
Expand Down Expand Up @@ -581,19 +581,22 @@ async fn launch(
thread_count: THREAD_COUNT,
max_operations_per_block: MAX_OPERATIONS_PER_BLOCK,
endorsement_count: ENDORSEMENT_COUNT,
max_endorsements_per_message: MAX_ENDORSEMENTS_PER_MESSAGE,
max_datastore_value_length: MAX_DATASTORE_VALUE_LENGTH,
max_op_datastore_entry_count: MAX_OPERATION_DATASTORE_ENTRY_COUNT,
max_op_datastore_key_length: MAX_OPERATION_DATASTORE_KEY_LENGTH,
max_op_datastore_value_length: MAX_OPERATION_DATASTORE_VALUE_LENGTH,
max_function_name_length: MAX_FUNCTION_NAME_LENGTH,
max_parameter_size: MAX_PARAMETERS_SIZE,
max_operations_per_message: MAX_OPERATIONS_PER_MESSAGE,
max_channel_size: SETTINGS.grpc.max_channel_size,
};

let grpc_api = MassaService {
consensus_controller: consensus_controller.clone(),
consensus_channels: consensus_channels.clone(),
pool_channels,
pool_command_sender: pool_controller.clone(),
protocol_command_sender: ProtocolCommandSender(protocol_command_sender.clone()),
storage: shared_storage.clone(),
grpc_config: grpc_config.clone(),
Expand Down
6 changes: 2 additions & 4 deletions massa-proto/proto/massa/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package massa.api.v1;

import "common.proto";
import "operation.proto";
import "endorsement.proto";

import "google/api/annotations.proto";
Expand All @@ -19,7 +18,6 @@ option php_namespace = "Massa\\Api\\V1";

// Massa gRPC service
service Grpc {

// GetVersion
rpc GetVersion ( GetVersionRequest ) returns ( GetVersionResponse ) {
option (google.api.http) = { get: "/v1/version" };
Expand Down Expand Up @@ -103,7 +101,7 @@ message SendOperationsRequest {
// string field
string id = 1;
// object field
OperationInput operation = 2;
repeated SecureSharePayload operations = 2;
}

// SendOperationsResponse holds response from SendOperations
Expand All @@ -120,5 +118,5 @@ message SendOperationsResponse {
// Holds Operation response
message OperationResult {
// string field
string id = 1;
repeated string ids = 1;
}
Binary file modified massa-proto/src/api.bin
Binary file not shown.
Loading

0 comments on commit 485cc5e

Please sign in to comment.