Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Service Discovery into Cloud and Mapping Adapters #148

Merged
merged 7 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .accepted_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ rustup
SampleGRPCDataAdapter
sdk
SDV
sdv
SecretUri
seiðr
ServiceDiscoveryAdapter
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions adapters/cloud/grpc_cloud_adapter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# gRPC Cloud Adapter

The gRPC Cloud Adapter is intended to function as a "standard cloud adapter", enabling integration with other services that implement the appropriate APIs. This reduces the need for custom adapter implementations and facilitates integration with non-Rust solutions for other parts of the vehicle system. This library contains an implementation of the `CloudAdapter` trait from the contracts.
The gRPC Cloud Adapter is intended to function as a "standard cloud adapter", enabling integration with other services that implement the appropriate APIs. This reduces the need for custom adapter implementations and facilitates integration with non-Rust solutions for other parts of the vehicle system. This library contains an implementation of the `CloudAdapter` trait from the contracts. This adapter also supports service discovery to integrate with service discovery systems such as [Chariott](https://github.com/eclipse-chariott/chariott/blob/main/service_discovery/README.md).

## Contract

Expand All @@ -10,8 +10,8 @@ This adapter utilizes a gRPC client for the `CloudConnector` service in the [clo

This adapter supports the following configuration settings:

- `target_uri`: The URI of the server to call.
- `service_discovery_id`: The ID of the cloud connector in your service discovery system. The default value is `sdv.cloud_connector/cloud_connector/1.0`.
- `max_retries`: The maximum number of times to retry failed attempts to send data to the server.
- `retry_interval_ms`: The interval between subsequent retry attempts, in milliseconds
- `retry_interval_ms`: The interval between subsequent retry attempts, in milliseconds.

This adapter supports [config overrides](../../../docs/tutorials/config-overrides.md). The override filename is `grpc_cloud_adapter_config.json`, and the default config is located at `res/grpc_cloud_adapter_config.default.json`.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"target_uri": "http://0.0.0.0:5176",
"service_discovery_id": "sdv.cloud_connector/cloud_connector/1.0",
"max_retries": 5,
"retry_interval_ms": 1000
}
4 changes: 2 additions & 2 deletions adapters/cloud/grpc_cloud_adapter/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize};
/// Config for the GRPCCloudAdapter
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The target uri for the gRPC server
pub target_uri: String,
/// The service discovery id of the cloud connector
pub service_discovery_id: String,

/// Max retries for contacting the server
pub max_retries: u32,
Expand Down
19 changes: 16 additions & 3 deletions adapters/cloud/grpc_cloud_adapter/src/grpc_cloud_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::str::FromStr;
use std::time::Duration;
use std::{str::FromStr, sync::Arc};

use async_trait::async_trait;
use log::debug;
use tokio::sync::Mutex;
use tonic::transport::Channel;

use cloud_connector_proto::{
Expand All @@ -18,6 +19,7 @@ use freyja_common::{
cloud_adapter::{CloudAdapter, CloudAdapterError, CloudMessageRequest, CloudMessageResponse},
config_utils, out_dir,
retry_utils::execute_with_retry,
service_discovery_adapter_selector::ServiceDiscoveryAdapterSelector,
};

use crate::config::Config;
Expand All @@ -34,7 +36,12 @@ pub struct GRPCCloudAdapter {
#[async_trait]
impl CloudAdapter for GRPCCloudAdapter {
/// Creates a new instance of a CloudAdapter with default settings
fn create_new() -> Result<Self, CloudAdapterError> {
///
/// # Arguments
/// - `selector`: the service discovery adapter selector to use
fn create_new(
selector: Arc<Mutex<dyn ServiceDiscoveryAdapterSelector>>,
) -> Result<Self, CloudAdapterError> {
let config: Config = config_utils::read_from_files(
config_file_stem!(),
config_utils::JSON_EXT,
Expand All @@ -43,11 +50,17 @@ impl CloudAdapter for GRPCCloudAdapter {
CloudAdapterError::deserialize,
)?;

let cloud_connector_uri = futures::executor::block_on(async {
let selector = selector.lock().await;
selector.get_service_uri(&config.service_discovery_id).await
})
.map_err(CloudAdapterError::communication)?;

let client = futures::executor::block_on(async {
execute_with_retry(
config.max_retries,
Duration::from_millis(config.retry_interval_ms),
|| CloudConnectorClient::connect(config.target_uri.clone()),
|| CloudConnectorClient::connect(cloud_connector_uri.clone()),
Some("Cloud adapter initial connection".into()),
)
.await
Expand Down
3 changes: 2 additions & 1 deletion adapters/cloud/in_memory_mock_cloud_adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ serde_json = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
time = { workspace = true }
time = { workspace = true }
mockall = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::sync::Arc;

use async_trait::async_trait;
use log::{debug, info};
use tokio::sync::Mutex;

use freyja_common::cloud_adapter::{
CloudAdapter, CloudAdapterError, CloudMessageRequest, CloudMessageResponse,
use freyja_common::{
cloud_adapter::{CloudAdapter, CloudAdapterError, CloudMessageRequest, CloudMessageResponse},
service_discovery_adapter_selector::ServiceDiscoveryAdapterSelector,
};

/// Mocks a cloud adapter in memory
Expand All @@ -15,7 +19,12 @@ pub struct InMemoryMockCloudAdapter {}
#[async_trait]
impl CloudAdapter for InMemoryMockCloudAdapter {
/// Creates a new instance of a CloudAdapter with default settings
fn create_new() -> Result<Self, CloudAdapterError> {
///
/// # Arguments
/// - `_selector`: the service discovery adapter selector to use (unused by this adapter)
fn create_new(
_selector: Arc<Mutex<dyn ServiceDiscoveryAdapterSelector>>,
) -> Result<Self, CloudAdapterError> {
Ok(Self {})
}

Expand All @@ -41,20 +50,41 @@ impl CloudAdapter for InMemoryMockCloudAdapter {
#[cfg(test)]
mod in_memory_mock_cloud_adapter_tests {
use super::*;
use mockall::*;

use std::collections::HashMap;

use time::OffsetDateTime;

use freyja_common::service_discovery_adapter::{
ServiceDiscoveryAdapter, ServiceDiscoveryAdapterError,
};

mock! {
pub ServiceDiscoveryAdapterSelectorImpl {}

#[async_trait]
impl ServiceDiscoveryAdapterSelector for ServiceDiscoveryAdapterSelectorImpl {
fn register(&mut self, adapter: Box<dyn ServiceDiscoveryAdapter + Send + Sync>) -> Result<(), ServiceDiscoveryAdapterError>;

async fn get_service_uri<'a>(&self, id: &'a str) -> Result<String, ServiceDiscoveryAdapterError>;
}
}

#[test]
fn can_get_new() {
let result = InMemoryMockCloudAdapter::create_new();
let result = InMemoryMockCloudAdapter::create_new(Arc::new(Mutex::new(
MockServiceDiscoveryAdapterSelectorImpl::new(),
)));
assert!(result.is_ok());
}

#[tokio::test]
async fn can_send_to_cloud() {
let cloud_adapter = InMemoryMockCloudAdapter::create_new().unwrap();
let cloud_adapter = InMemoryMockCloudAdapter::create_new(Arc::new(Mutex::new(
MockServiceDiscoveryAdapterSelectorImpl::new(),
)))
.unwrap();

let cloud_message = CloudMessageRequest {
metadata: HashMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion adapters/digital_twin/grpc_digital_twin_adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This adapter implements the [Ibeji In-Vehicle Digital Twin Service API](https://

This adapter supports the following configuration settings:

- `service_discovery_id`: The id of the in-vehicle digital twin service in your service discovery system.
- `service_discovery_id`: The id of the in-vehicle digital twin service in your service discovery system. The default value is `sdv.ibeji/invehicle_digital_twin/1.0`, which corresponds to Ibeji's service discovery ID.
- `max_retries`: The maximum number of times to retry failed attempts to send data to the server.
- `retry_interval_ms`: The interval between subsequent retry attempts, in milliseconds

Expand Down
4 changes: 2 additions & 2 deletions adapters/mapping/grpc_mapping_adapter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# gRPC Mapping Adapter

The gRPC Mapping Adapter is intended to function as a "standard mapping adapter", enabling integration with other services that implement the appropriate APIs. This reduces the need for custom adapter implementations and facilitates integration with non-Rust solutions for other parts of the vehicle system. This library contains an implementation of the `MappingAdapter` trait from the contracts.
The gRPC Mapping Adapter is intended to function as a "standard mapping adapter", enabling integration with other services that implement the appropriate APIs. This reduces the need for custom adapter implementations and facilitates integration with non-Rust solutions for other parts of the vehicle system. This library contains an implementation of the `MappingAdapter` trait from the contracts. This adapter also supports service discovery to integrate with service discovery systems such as [Chariott](https://github.com/eclipse-chariott/chariott/blob/main/service_discovery/README.md).

## Contract

Expand All @@ -10,7 +10,7 @@ This adapter utilizes a gRPC client for the `MappingService` in the [mapping ser

This adapter supports the following configuration settings:

- `target_uri`: The URI of the server to call.
- `service_discovery_id`: The ID of the mapping service in your service discovery system. The default value is `sdv.freyja/mapping_service/1.0`.
- `max_retries`: The maximum number of retry attempts when sending data to the server.
- `retry_interval_ms`: The interval between subsequent retry attempts, in milliseconds

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"target_uri": "http://127.0.0.1:8888",
"service_discovery_id": "sdv.freyja/mapping_service/1.0",
"max_retries": 5,
"retry_interval_ms": 10000
}
4 changes: 2 additions & 2 deletions adapters/mapping/grpc_mapping_adapter/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize};
/// Config for the GRPCMappingAdapter
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The target uri for the gRPC server
pub target_uri: String,
/// The service discovery id of the Mapping Service
pub service_discovery_id: String,

/// Max retries for contacting the server
pub max_retries: u32,
Expand Down
19 changes: 16 additions & 3 deletions adapters/mapping/grpc_mapping_adapter/src/grpc_mapping_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::time::Duration;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use log::debug;
use tokio::sync::Mutex;
use tonic::transport::Channel;

use freyja_build_common::config_file_stem;
Expand All @@ -17,6 +18,7 @@ use freyja_common::{
},
out_dir,
retry_utils::execute_with_retry,
service_discovery_adapter_selector::ServiceDiscoveryAdapterSelector,
};
use mapping_service_proto::v1::{
mapping_service_client::MappingServiceClient, CheckForWorkRequest as ProtoCheckForWorkRequest,
Expand All @@ -37,7 +39,12 @@ pub struct GRPCMappingAdapter {
#[async_trait]
impl MappingAdapter for GRPCMappingAdapter {
/// Creates a new instance of a CloudAdapter with default settings
fn create_new() -> Result<Self, MappingAdapterError> {
///
/// # Arguments
/// - `selector`: the service discovery adapter selector to use
fn create_new(
selector: Arc<Mutex<dyn ServiceDiscoveryAdapterSelector>>,
) -> Result<Self, MappingAdapterError> {
let config: Config = config_utils::read_from_files(
config_file_stem!(),
config_utils::JSON_EXT,
Expand All @@ -46,11 +53,17 @@ impl MappingAdapter for GRPCMappingAdapter {
MappingAdapterError::deserialize,
)?;

let mapping_service_uri = futures::executor::block_on(async {
let selector = selector.lock().await;
selector.get_service_uri(&config.service_discovery_id).await
})
.map_err(MappingAdapterError::communication)?;

let client = futures::executor::block_on(async {
execute_with_retry(
config.max_retries,
Duration::from_millis(config.retry_interval_ms),
|| MappingServiceClient::connect(config.target_uri.clone()),
|| MappingServiceClient::connect(mapping_service_uri.clone()),
Some("Mapping adapter initial connection".into()),
)
.await
Expand Down
5 changes: 4 additions & 1 deletion adapters/mapping/in_memory_mock_mapping_adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ serde = { workspace = true }
tokio = { workspace = true }

[build-dependencies]
freyja-build-common = { workspace = true }
freyja-build-common = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::config::Config;
use freyja_build_common::config_file_stem;
use freyja_common::mapping_adapter::*;
use freyja_common::{config_utils, out_dir};

use freyja_common::{
config_utils,
mapping_adapter::{
CheckForWorkRequest, CheckForWorkResponse, GetMappingRequest, GetMappingResponse,
MappingAdapter, MappingAdapterError,
},
out_dir,
service_discovery_adapter_selector::ServiceDiscoveryAdapterSelector,
};

/// Mocks a mapping provider in memory
pub struct InMemoryMockMappingAdapter {
Expand Down Expand Up @@ -37,7 +49,12 @@ impl InMemoryMockMappingAdapter {
#[async_trait]
impl MappingAdapter for InMemoryMockMappingAdapter {
/// Creates a new instance of an InMemoryMockMappingAdapter with default settings
fn create_new() -> Result<Self, MappingAdapterError> {
///
/// # Arguments
/// - `_selector`: the service discovery adapter selector to use (unused by this adapter)
fn create_new(
_selector: Arc<Mutex<dyn ServiceDiscoveryAdapterSelector>>,
) -> Result<Self, MappingAdapterError> {
let config = config_utils::read_from_files(
config_file_stem!(),
config_utils::JSON_EXT,
Expand Down Expand Up @@ -93,17 +110,35 @@ impl MappingAdapter for InMemoryMockMappingAdapter {

#[cfg(test)]
mod in_memory_mock_mapping_adapter_tests {
use crate::config::ConfigItem;

use super::*;
use mockall::*;

use std::collections::HashMap;

use freyja_common::{conversion::Conversion, digital_twin_map_entry::DigitalTwinMapEntry};
use freyja_common::{
conversion::Conversion,
digital_twin_map_entry::DigitalTwinMapEntry,
service_discovery_adapter::{ServiceDiscoveryAdapter, ServiceDiscoveryAdapterError},
};

use crate::config::ConfigItem;

mock! {
pub ServiceDiscoveryAdapterSelectorImpl {}

#[async_trait]
impl ServiceDiscoveryAdapterSelector for ServiceDiscoveryAdapterSelectorImpl {
fn register(&mut self, adapter: Box<dyn ServiceDiscoveryAdapter + Send + Sync>) -> Result<(), ServiceDiscoveryAdapterError>;

async fn get_service_uri<'a>(&self, id: &'a str) -> Result<String, ServiceDiscoveryAdapterError>;
}
}

#[test]
fn can_create_new() {
let result = InMemoryMockMappingAdapter::create_new();
let result = InMemoryMockMappingAdapter::create_new(Arc::new(Mutex::new(
MockServiceDiscoveryAdapterSelectorImpl::new(),
)));
assert!(result.is_ok());
}

Expand Down
Loading
Loading