Skip to content

Commit

Permalink
feat: add send_blocks method
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Mar 3, 2023
1 parent f56827e commit 0d49c8d
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 33 deletions.
22 changes: 10 additions & 12 deletions massa-grpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::config::GrpcConfig;
use massa_consensus_exports::{ConsensusChannels, ConsensusController};
use massa_models::{
block::{BlockDeserializer, BlockDeserializerArgs, SecureShareBlock},
config::{ENDORSEMENT_COUNT, MAX_OPERATIONS_PER_BLOCK, THREAD_COUNT},
error::ModelsError,
secure_share::SecureShareDeserializer,
};
Expand Down Expand Up @@ -169,22 +168,19 @@ impl grpc::grpc_server::Grpc for MassaService {
let consensus_controller = self.consensus_controller.clone();
let mut protocol_sender = self.protocol_command_sender.clone();
let storage = self.storage.clone_without_refs();

let config = self.grpc_config.clone();
let (tx, rx) = tokio::sync::mpsc::channel(config.max_channel_size);
let mut in_stream = request.into_inner();
//TODO add the configuration in API
let (tx, rx) = tokio::sync::mpsc::channel(128);

//TODO add futures cancellation
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(req_content) => {
if let Some(proto_block) = req_content.block {
//TODO add the configuration in API
let args = BlockDeserializerArgs {
thread_count: THREAD_COUNT,
max_operations_per_block: MAX_OPERATIONS_PER_BLOCK,
endorsement_count: ENDORSEMENT_COUNT,
thread_count: config.thread_count,
max_operations_per_block: config.max_operations_per_block,
endorsement_count: config.endorsement_count,
};
let _res: Result<(), DeserializeError> =
match SecureShareDeserializer::new(BlockDeserializer::new(args))
Expand Down Expand Up @@ -305,14 +301,16 @@ impl grpc::grpc_server::Grpc for MassaService {
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
warn!("\tclient disconnected: broken pipe");
warn!("client disconnected, broken pipe: {}", io_err);
break;
}
}
//TODO add better error handling
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break,
Err(e) => {
error!("failed to send back sendblocks error response: {}", e);
break;
}
}
}
}
Expand Down
32 changes: 26 additions & 6 deletions massa-grpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,40 @@ use std::net::SocketAddr;
/// the gRPC configuration
#[derive(Debug, Deserialize, Clone)]
pub struct GrpcConfig {
/// whether to enable gRPC.
/// whether to enable gRPC
pub enabled: bool,
/// whether to accept HTTP/1.1 requests.
pub accept_http1: bool,
/// whether to enable gRPC reflection(introspection).
/// whether to enable gRPC reflection
pub enable_reflection: bool,
/// bind for the Massa gRPC API
pub bind: SocketAddr,
/// which compression encodings does the server accept for requests.
/// which compression encodings does the server accept for requests
pub accept_compressed: Option<String>,
/// which compression encodings might the server use for responses.
/// which compression encodings might the server use for responses
pub send_compressed: Option<String>,
/// limits the maximum size of a decoded message. Defaults to 4MB.
/// limits the maximum size of a decoded message. Defaults to 4MB
pub max_decoding_message_size: usize,
/// limits the maximum size of an encoded message. Defaults to 4MB.
/// limits the maximum size of an encoded message. Defaults to 4MB
pub max_encoding_message_size: usize,
/// thread count
pub thread_count: u8,
/// max operations per block
pub max_operations_per_block: u32,
/// endorsement count
pub endorsement_count: u32,
/// max datastore value length
pub max_datastore_value_length: u64,
/// max op datastore entry
pub max_op_datastore_entry_count: u64,
/// max datastore key length
pub max_op_datastore_key_length: u8,
/// max datastore value length
pub max_op_datastore_value_length: u64,
/// max function name length
pub max_function_name_length: u16,
/// max parameter size
pub max_parameter_size: u32,
/// limits the maximum size of streaming channel
pub max_channel_size: usize,
}
16 changes: 9 additions & 7 deletions massa-node/base_config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,24 @@
enable_ws = false

[grpc]
# whether to enable gRPC.
# whether to enable gRPC
enabled = true
# whether to add HTTP 1 layer.
# whether to add HTTP 1 layer
accept_http1 = true
# whether to enable gRPC reflection(introspection).
# whether to enable gRPC reflection(introspection)
enable_reflection = true
# bind for the Massa gRPC API
bind = "0.0.0.0:33037"
# which compression encodings does the server accept for requests.
# which compression encodings does the server accept for requests
accept_compressed = "Gzip"
# which compression encodings might the server use for responses.
# which compression encodings might the server use for responses
send_compressed = "Gzip"
# limits the maximum size of a decoded message. Defaults to 4MB.
# limits the maximum size of a decoded message. Defaults to 4MB
max_decoding_message_size = 4194304
# limits the maximum size of an encoded message. Defaults to 4MB.
# limits the maximum size of an encoded message. Defaults to 4MB
max_encoding_message_size = 4194304
# limits the maximum size of streaming channel
max_channel_size = 128

[execution]
# max number of generated events kept in RAM
Expand Down
10 changes: 10 additions & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,16 @@ async fn launch(
send_compressed: SETTINGS.grpc.send_compressed.clone(),
max_decoding_message_size: SETTINGS.grpc.max_decoding_message_size,
max_encoding_message_size: SETTINGS.grpc.max_encoding_message_size,
thread_count: THREAD_COUNT,
max_operations_per_block: MAX_OPERATIONS_PER_BLOCK,
endorsement_count: ENDORSEMENT_COUNT,
max_datastore_value_length: MAX_DATASTORE_VALUE_LENGTH,
max_op_datastore_entry_count: MAX_OPERATION_DATASTORE_ENTRY_COUNT,
max_op_datastore_key_length: MAX_OPERATION_DATASTORE_KEY_LENGTH,
max_op_datastore_value_length: MAX_OPERATION_DATASTORE_VALUE_LENGTH,
max_function_name_length: MAX_FUNCTION_NAME_LENGTH,
max_parameter_size: MAX_PARAMETERS_SIZE,
max_channel_size: SETTINGS.grpc.max_channel_size,
};

let grpc_api = MassaService {
Expand Down
18 changes: 10 additions & 8 deletions massa-node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,28 @@ pub struct ProtocolSettings {
pub max_endorsements_propagation_time: MassaTime,
}

/// gRPC settings.
/// gRPC settings
/// the gRPC settings
#[derive(Debug, Deserialize, Clone)]
pub struct GrpcSettings {
/// whether to enable gRPC.
/// whether to enable gRPC
pub enabled: bool,
/// whether to accept HTTP/1.1 requests.
/// whether to accept HTTP/1.1 requests
pub accept_http1: bool,
/// whether to enable gRPC reflection(introspection).
/// whether to enable gRPC reflection
pub enable_reflection: bool,
/// bind for the Massa gRPC API
pub bind: SocketAddr,
/// which compression encodings does the server accept for requests.
/// which compression encodings does the server accept for requests
pub accept_compressed: Option<String>,
/// which compression encodings might the server use for responses.
/// which compression encodings might the server use for responses
pub send_compressed: Option<String>,
/// limits the maximum size of a decoded message. Defaults to 4MB.
/// limits the maximum size of a decoded message. Defaults to 4MB
pub max_decoding_message_size: usize,
/// limits the maximum size of an encoded message. Defaults to 4MB.
/// limits the maximum size of an encoded message. Defaults to 4MB
pub max_encoding_message_size: usize,
/// limits the maximum size of streaming channel
pub max_channel_size: usize,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions massa-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use glob::glob;
use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
//TODO add download external protos files instead of doing it manually
let protos = find_protos("proto/massa");

tonic_build::configure()
Expand Down

0 comments on commit 0d49c8d

Please sign in to comment.