Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Agemo extension and extension model #42

Merged
merged 22 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2f72055
Add v1 of managed subscribe extension service
devkelley Sep 13, 2023
d0ad12f
Moved extension code into its own sub directory
devkelley Sep 13, 2023
2148909
start adding extension builder
devkelley Sep 15, 2023
0d91906
Merge remote-tracking branch 'origin/main' into devkelley/add_extensions
devkelley Sep 15, 2023
4a3e0d4
Integrated interceptors into extension library
devkelley Sep 16, 2023
47ab494
implemented interceptor request deconstruction logic
devkelley Sep 18, 2023
d9b64b9
v1 working extension
devkelley Sep 19, 2023
bf7c590
v2 working extension
devkelley Sep 19, 2023
34fdfe7
v3 optimized service creation, tonic upgraded to 0.10.0
devkelley Sep 19, 2023
dfaea95
fixed fmt and clippy errors
devkelley Sep 20, 2023
59e9258
Moved grpcService trait to common, simplified extension code
devkelley Sep 20, 2023
a45f5d7
Renamed grpcService to grpcExtension
devkelley Sep 20, 2023
7ba4178
Added param definition
devkelley Sep 20, 2023
d1c7954
Add retries, add constants
devkelley Sep 20, 2023
5d18926
remove hand added protos. will add new protos in separate step
devkelley Sep 20, 2023
0e05452
add agemo service as submodule
devkelley Sep 20, 2023
cca536b
update comment
devkelley Sep 20, 2023
0bdcc6a
addressed pr comments
devkelley Sep 21, 2023
a24f272
modified retry to match whats in cloud sync (looks more flexible)
devkelley Sep 21, 2023
d039325
push spelling change
devkelley Sep 21, 2023
8e1ddc4
fixed spelling
devkelley Sep 21, 2023
cba4b64
change yaml perms
devkelley Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ members = [
"core/protobuf_data_access",
"core/invehicle-digital-twin",

# extension
"core/extension",

# digital twin model
"digital-twin-model",

Expand All @@ -24,6 +27,7 @@ members = [
"samples/common",
"samples/protobuf_data_access",
"samples/command",
"samples/managed_subscribe",
"samples/mixed",
"samples/property",
"samples/seat_massager",
Expand All @@ -34,6 +38,7 @@ members = [
async-std = "^1.5"
bytes = "1.4.0"
config = "0.13.3"
dyn-clone = "1.0.14"
env_logger= "0.10.0"
futures = "0.3.28"
futures-core = "0.3.4"
Expand All @@ -48,8 +53,8 @@ lazy_static = "1.4.0"
log = "^0.4"
paho-mqtt = "0.12"
parking_lot = "0.12.1"
prost = "0.11"
prost-types = "0.11"
prost = "0.12"
prost-types = "0.12"
regex = " 1.9.3"
serde = "1.0.160"
serde_derive = "1.0.163"
Expand All @@ -59,8 +64,8 @@ strum = "0.25"
strum_macros = "0.25.1"
tokio = "1.29.1"
tokio-stream = "0.1.14"
tonic = "0.9.2"
tonic-build = "0.9.2"
tonic = "0.10.0"
tonic-build = "0.10.0"
tower = "0.4.13"
tower-http = "0.4.3"
url = "2.3.1"
Expand Down
2 changes: 2 additions & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ license = "MIT"

[dependencies]
bytes = { workspace = true }
dyn-clone = { workspace = true }
futures = { workspace = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
core-protobuf-data-access = { path = "../protobuf_data_access" }
regex = {workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions core/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ These documents/code were very helpful in developing this solution:
<li> https://github.com/linkerd/linkerd2-proxy/blob/0814a154ba8c8cc7af394ac3fa6f940bd01755ae/linkerd/stack/src/fail_on_error.rs#LL30-L69C2
</ul>

## gRPC Service

gRPC Service is a concept that allows for a service that implements it to add its gRPC services to a builder which is used to construct a gRPC server that hosts the services.

gRPC Service relies on the Tonic crate's RoutesBuilder construct to add the gRPC services when building the gRPC server instance.

## Sample gRPC Interceptor

A simple gRPC Interceptor sample has been provided. To use it with Ibeji:
Expand Down
23 changes: 12 additions & 11 deletions core/common/src/grpc_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use bytes::Bytes;
use core::future::Future;
use dyn_clone::DynClone;
use futures_core::task::{Context, Poll};
use http::uri::Uri;
use http_body::Body;
Expand All @@ -20,7 +21,7 @@ use tower::{Layer, Service};
const GRPC_HEADER_LENGTH: usize = 5;

/// This is the trait that a gRPC Interceptor needs to imnplement.
pub trait GrpcInterceptor: Sync {
pub trait GrpcInterceptor: Sync + DynClone {
/// Is this interceptor applicable?
///
/// # Arguments
Expand Down Expand Up @@ -61,38 +62,38 @@ pub trait GrpcInterceptor: Sync {
) -> Result<Bytes, Box<dyn Error + Send + Sync>>;
}

/// This is the type that represents the factory method for gRPC Interceptors.
type GrpcInterceptorFactory = fn() -> Box<dyn GrpcInterceptor + Send>;
// Macro that allows for clonable dynamic traits.
dyn_clone::clone_trait_object!(GrpcInterceptor);

/// The tower layer that hosts a service that hosts a gRPC Interceptor.
#[derive(Clone)]
pub struct GrpcInterceptorLayer {
interceptor_factory: GrpcInterceptorFactory,
interceptor: Box<dyn GrpcInterceptor + Send>,
}

impl GrpcInterceptorLayer {
/// Create the tower layer for a gRPC Interceptor.
///
/// # Arguments
/// * `interceptor_factory` - The factory method for creating the desired gRPC Interceptor.
pub fn new(interceptor_factory: GrpcInterceptorFactory) -> Self {
Self { interceptor_factory }
/// * `interceptor` - The boxed gRPC Interceptor.
pub fn new(interceptor: Box<dyn GrpcInterceptor + Send>) -> Self {
Self { interceptor }
}
}

impl<S> Layer<S> for GrpcInterceptorLayer {
type Service = GrpcInterceptorService<S>;

fn layer(&self, service: S) -> Self::Service {
GrpcInterceptorService { service, interceptor_factory: self.interceptor_factory }
GrpcInterceptorService { service, interceptor: self.interceptor.clone() }
}
}

/// The tower service that hosts a gRPC Interceptor.
#[derive(Clone)]
/// The tower service that hosts a gRPC Interceptor.
pub struct GrpcInterceptorService<S> {
service: S,
interceptor_factory: GrpcInterceptorFactory,
interceptor: Box<dyn GrpcInterceptor + Send>,
}

impl<S> GrpcInterceptorService<S> {
Expand Down Expand Up @@ -150,7 +151,7 @@ where
&mut self,
mut request: http::request::Request<tonic::transport::Body>,
) -> Self::Future {
let interceptor = (self.interceptor_factory)();
let interceptor = self.interceptor.clone();

let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri());
let is_applicable = interceptor.is_applicable(&service_name, &method_name);
Expand Down
11 changes: 11 additions & 0 deletions core/common/src/grpc_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use tonic::transport::server::RoutesBuilder;

/// Trait that must be implemented for a service to add a grpc service to the hosted server.
pub trait GrpcService {
/// Function to add necessary services to the server builder.
devkelley marked this conversation as resolved.
Show resolved Hide resolved
fn add_grpc_services(&self, builder: &mut RoutesBuilder);
devkelley marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions core/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
// SPDX-License-Identifier: MIT

pub mod grpc_interceptor;
pub mod grpc_service;
pub mod sample_grpc_interceptor;
34 changes: 34 additions & 0 deletions core/extension/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# SPDX-License-Identifier: MIT

[package]
name = "core-extension"
version = "0.1.0"
edition = "2021"
license = "MIT"

[dependencies]
bytes = { workspace = true }
config = { workspace = true }
common = { path = "../common" }
core-protobuf-data-access = { path = "../protobuf_data_access" }
dyn-clone = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_derive = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tokio = { workspace = true , features = ["macros", "rt-multi-thread"] }
tonic = { workspace = true }
tower = { workspace = true }
yaml-rust = { workspace = true }

[build-dependencies]
tonic-build = { workspace = true }

[features]
default = []
managed_subscribe = []
88 changes: 88 additions & 0 deletions core/extension/src/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::{convert::Infallible, future::Future, net::SocketAddr};

use tonic::codegen::http::{request::Request, response::Response};
use tonic::{
body::BoxBody,
transport::{server::RoutesBuilder, Body, NamedService, Server},
};
use tower::{Service, ServiceBuilder};

// Extension references behind feature flags. Add any necessary extension references here.
// Start: Extension references.

// Add a new feature to all() so the use statement is active for the feature.
// ex. #[cfg(all(feature = "feature_1", feature = "feature_2"))]
#[cfg(all(feature = "managed_subscribe"))]
use common::{grpc_interceptor::GrpcInterceptorLayer, grpc_service::GrpcService};

#[cfg(feature = "managed_subscribe")]
use crate::managed_subscribe::managed_subscribe_ext::ManagedSubscribeExt;

// End: Extension references.

/// Create and serve a tonic server with extensions enabled through the feature flags.
///
/// # Arguments
/// * `addr` - Socket address to host the services on.
/// * `base_service` - Core service that will be hosted.
///
/// # How to add an Extension service to this method:
/// 1. Add a block of code with the appropriate cfg feature flag.
/// 2. Create the `GrpcService` object within the block - if applicable.
/// 3. Create the `GrpocInterceptorLayer` object(s) within the block - if applicable.
/// 4. Call `.add_grpc_services()` on the `GrpcService` to add the gRPC server components to the
/// server builder.
/// 5. Call layer() for each `GrpcInterceptorLayer` object on the middleware chain and return that
/// from the block.
///
/// Note: It is expected that there is an extension service the implements `GrpcService`
/// (if there is a gRPC server component) and that a feature flag created for the extension.
#[allow(unused_assignments, unused_mut)] // Necessary when no interceptors are built.
pub fn serve_with_extensions<S>(
addr: SocketAddr,
base_service: S,
) -> impl Future<Output = Result<(), tonic::transport::Error>>
where
S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
// Initialize services builder.
devkelley marked this conversation as resolved.
Show resolved Hide resolved
let mut extensions_builder = RoutesBuilder::default();

// Initialize middleware stack.
let middleware = ServiceBuilder::new();

// (1) Initialize the extension (interceptor and services) if the feature is enabled.
#[cfg(feature = "managed_subscribe")]
let middleware = {
// (2) Initialize a new managed subscribe extension.
let managed_subscribe_ext = ManagedSubscribeExt::new();

// (3) Create interceptor layer to be added to the server.
let managed_subscribe_layer =
GrpcInterceptorLayer::new(Box::new(managed_subscribe_ext.create_interceptor()));

// (4) Add extension services to routes builder.
managed_subscribe_ext.add_grpc_services(&mut extensions_builder);

// (5) Add layer(s) to middleware and return result.
middleware.layer(managed_subscribe_layer)
};

// Construct the server.
let mut builder = Server::builder()
.layer(middleware.into_inner())
.add_routes(extensions_builder.routes())
.add_service(base_service);

// Start the server.
builder.serve(addr)
}
18 changes: 18 additions & 0 deletions core/extension/src/extension_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use config::{Config, File, FileFormat};

/// Load the settings.
pub fn load_settings<T>(config_filename: &str) -> T
devkelley marked this conversation as resolved.
Show resolved Hide resolved
where
T: for<'de> serde::Deserialize<'de>,
{
let config =
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build().unwrap();

let settings: T = config.try_deserialize().unwrap();

settings
}
15 changes: 15 additions & 0 deletions core/extension/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

pub mod extension;
pub mod extension_config;

#[cfg(feature = "managed_subscribe")]
/// Extension that communicates with a managed subscribe service to offer dynamically created
/// subscriptions on demand for Ibeji providers.
pub mod managed_subscribe {
pub mod managed_subscribe_ext;
pub mod managed_subscribe_interceptor;
pub mod managed_subscribe_store;
}
Loading