Skip to content

Commit

Permalink
feat(middleware): add response extractor.
Browse files Browse the repository at this point in the history
  • Loading branch information
andysim3d committed Sep 26, 2024
1 parent 83ba4ef commit ba6d2d0
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 30 deletions.
13 changes: 10 additions & 3 deletions crates/pool/src/server/remote/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ use alloy_primitives::{Address, B256};
use async_trait::async_trait;
use futures_util::StreamExt;
use rundler_task::{
grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes},
metrics::{MetricsLayer, RequestMethodNameInfo},
grpc::{
grpc_metrics::{HttpMethodExtractor, HttpResponseCodeExtractor},
protos::from_bytes,
},
metrics::MetricsLayer,
};
use rundler_types::{
chain::ChainSpec,
Expand Down Expand Up @@ -81,7 +84,11 @@ pub(crate) async fn spawn_remote_mempool_server(
.set_serving::<OpPoolServer<OpPoolImpl>>()
.await;

let metrics_layer = MetricsLayer::<HttpMethodExtractor, http::Request>::new("op_pool_service".to_string(), "http-grpc".to_string());
let metrics_layer =
MetricsLayer::<HttpMethodExtractor, http::Request, HttpResponseCodeExtractor>::new(
"op_pool_service".to_string(),
"http-grpc".to_string(),
);
let handle = tokio::spawn(async move {
Server::builder()
.layer(metrics_layer)
Expand Down
19 changes: 16 additions & 3 deletions crates/provider/src/traits/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// If not, see https://www.gnu.org/licenses/.

/// Method extractor
use rundler_types::task::traits::RequestExtractor;
use alloy_json_rpc::RequestPacket;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};
use alloy_json_rpc::{RequestPacket, ResponsePacket};

#[derive(Clone, Copy)]
struct AlloyMethodExtractor;
pub struct AlloyMethodExtractor;

impl RequestExtractor<RequestPacket> for RPCMethodExtractor {
fn get_method_name(req: &RequestPacket) -> String {
Expand All @@ -31,3 +31,16 @@ impl RequestExtractor<RequestPacket> for RPCMethodExtractor {
}
}
}

#[derive(Clone, Copy)]
pub struct AlloyResponseCodeExtractor;

impl ResponseExtractor< ResponsePacket> for AlloyMethodExtractor{
fn get_response_code(response: &ResponsePacket) -> String {
if response.is_error(){
response.as_error()?.code.to_string()
} else {
"200".to_string()
}
}
}
1 change: 1 addition & 0 deletions crates/provider/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! Traits for the provider module.
mod error;
mod metrics;
pub use error::*;

mod entry_point;
Expand Down
20 changes: 17 additions & 3 deletions crates/rpc/src/rpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,28 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use jsonrpsee::types::Request;
use rundler_types::task::traits::RequestExtractor;
use jsonrpsee::{types::Request, MethodResponse};
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};

#[derive(Copy, Clone)]
struct RPCMethodExtractor;

impl RequestExtractor<Request<'static>> for RPCMethodExtractor {
fn get_method_name(req: & Request<'static>) -> String {
fn get_method_name(req: &Request<'static>) -> String {
req.method_name().to_string()
}
}

/// http response extractor.
#[derive(Copy, Clone)]
pub struct RPCResponseCodeExtractor;

impl ResponseExtractor<MethodResponse> for RPCResponseCodeExtractor {
fn get_response_code(response: &MethodResponse) -> String {
if response.is_error() {
response.as_error_code()?.to_string()
} else {
"200".to_string()
}
}
}
12 changes: 8 additions & 4 deletions crates/rpc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{bail, Context};
use async_trait::async_trait;
use jsonrpsee::{
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder}, types::Request, RpcModule
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder},
types::Request,
RpcModule,
};
use rundler_provider::{EntryPointProvider, Provider};
use rundler_sim::{
Expand All @@ -42,10 +44,9 @@ use crate::{
EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7,
},
health::{HealthChecker, SystemApiServer},
rpc_metrics,
rpc_metrics::{RPCMethodExtractor, RPCResponseCodeExtractor},
rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings},
types::ApiNamespace,
rpc_metrics::RPCMethodExtractor,
};

/// RPC server arguments.
Expand Down Expand Up @@ -188,7 +189,10 @@ where
.timeout(self.args.rpc_timeout);

let rpc_metric_middleware =
MetricsLayer::<RPCMethodExtractor, Request>::new("rundler-eth-service".to_string(), "rpc".to_string());
MetricsLayer::<RPCMethodExtractor, Request<'static>, RPCResponseCodeExtractor>::new(
"rundler-eth-service".to_string(),
"rpc".to_string(),
);

let server = ServerBuilder::default()
.set_http_middleware(http_middleware)
Expand Down
14 changes: 12 additions & 2 deletions crates/task/src/grpc/grpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use rundler_types::task::traits::RequestExtractor;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};
use tonic::codegen::http;

/// http request method extractor.
/// http request method extractor.
#[derive(Copy, Clone)]
struct HttpMethodExtractor;

Expand All @@ -24,3 +24,13 @@ impl<Body> RequestExtractor<http::Request<Body>> for HttpMethodExtractor {
method_name.to_string()
}
}

/// http response extractor.
#[derive(Copy, Clone)]
pub struct HttpResponseCodeExtractor;

impl<B> ResponseExtractor<http::Response<B>> for HttpResponseCodeExtractor {
fn get_response_code(response: &http::Response<B>) -> String {
response.status().to_string()
}
}
60 changes: 45 additions & 15 deletions crates/task/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ use std::{
};

use futures::{future::BoxFuture, FutureExt};
use rundler_types::task::traits::RequestExtractor;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};
use tower::{Layer, Service};

/// tower network layer: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md
#[derive(Debug, Clone)]
pub struct MetricsLayer<T, R> {
pub struct MetricsLayer<T, R, RE> {
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
_response_extractor_: PhantomData<RE>,
}

impl<T, R> MetricsLayer<T, R>
impl<T, R, RE> MetricsLayer<T, R, RE>
where
T: RequestExtractor<R>,
{
Expand All @@ -43,30 +44,36 @@ where
protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
_response_extractor_: PhantomData,
}
}
}

impl<S, T, R> Layer<S> for MetricsLayer<T, R>
impl<S, T, R, RE> Layer<S> for MetricsLayer<T, R, RE>
where
T: RequestExtractor<R>,
{
type Service = MetricsMiddleware<S, T, R>;
type Service = MetricsMiddleware<S, T, R, RE>;
fn layer(&self, service: S) -> Self::Service {
MetricsMiddleware::<S, T, R>::new(service, self.service_name.clone(), self.protocal.clone())
MetricsMiddleware::<S, T, R, RE>::new(
service,
self.service_name.clone(),
self.protocal.clone(),
)
}
}

/// Middleware implementation.
pub struct MetricsMiddleware<S, T, R> {
pub struct MetricsMiddleware<S, T, R, RE> {
inner: S,
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
_response_extractor_: PhantomData<RE>,
}

impl<S, T, R> MetricsMiddleware<S, T, R>
impl<S, T, R, RE> MetricsMiddleware<S, T, R, RE>
where
T: RequestExtractor<R>,
{
Expand All @@ -78,16 +85,18 @@ where
protocal: protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
_response_extractor_: PhantomData,
}
}
}

impl<S, T, Request> Service<Request> for MetricsMiddleware<S, T, Request>
impl<S, T, Request, RE> Service<Request> for MetricsMiddleware<S, T, Request, RE>
where
S: Service<Request> + Send + Sync + Clone + 'static,
S::Future: Send + Sync + 'static,
T: RequestExtractor<Request> + 'static,
Request: Send + Sync + 'static,
RE: ResponseExtractor<S::Response> + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
Expand Down Expand Up @@ -128,12 +137,24 @@ where
service_name.as_str(),
protocal.as_str(),
);
if rsp.is_err() {
MethodMetrics::increment_error_count(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);

match &rsp {
Ok(response) => {
let response_code = RE::get_response_code(response);
MethodMetrics::increment_response_code(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
response_code.as_str(),
);
}
Err(_) => {
MethodMetrics::increment_error_count(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);
}
}
rsp
}
Expand Down Expand Up @@ -161,6 +182,15 @@ impl MethodMetrics {
metrics::counter!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocal" => protocal.to_string()).increment(1)
}

fn increment_response_code(
method_name: &str,
service_name: &str,
protocal: &str,
response_code: &str,
) {
metrics::counter!("response_stats", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocal" => protocal.to_string(), "response_code" => response_code.to_string()).increment(1)
}

fn record_request_latency(
method_name: &str,
service_name: &str,
Expand Down
6 changes: 6 additions & 0 deletions crates/types/src/task/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ pub trait RequestExtractor<R>: Copy + Sync + Send {
/// Get method name.
fn get_method_name(request: &R) -> String;
}

/// Trait to extract response code.
pub trait ResponseExtractor<R>: Copy + Sync + Send {
/// Get response code.
fn get_response_code(response: &R) -> String;
}

0 comments on commit ba6d2d0

Please sign in to comment.